Flink 写es

1.pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

2. 写es

public static ElasticsearchSink<String> createEsProducer(ParameterTool params, String index, String type) {
    List<HttpHost> httpHosts = new ArrayList<>();
    String url = params.getProperties().getProperty("es.http.url");
    int port = Integer.parseInt(params.getProperties().getProperty("es.http.port"));
    httpHosts.add(new HttpHost(url, port, "http"));

    // use a ElasticsearchSink.Builder to create an ElasticsearchSink
    ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,
            new ElasticsearchSinkFunction<String>() {
                public IndexRequest createIndexRequest(String element) {
                    Map map = JSONObject.parseObject(element, Map.class);
                    System.out.println(map);
                    return Requests.indexRequest()
                            .index(index)
                            .type(type)
                            .source(map);
                }

                @Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            }
    );


    // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
    esSinkBuilder.setBulkFlushMaxActions(1);
    esSinkBuilder.setRestClientFactory(
            restClientBuilder -> {
                restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                    @Override
                    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                        requestConfigBuilder.setConnectTimeout(5000);
                        requestConfigBuilder.setSocketTimeout(40000);
                        requestConfigBuilder.setConnectionRequestTimeout(1000);
                        return requestConfigBuilder;
                    }
                }).setMaxRetryTimeoutMillis(5*60*1000);
            }
    );
    return esSinkBuilder.build();
}

3.主程序

public static void main(String[] args) throws Exception {
    /*if (args == null || args.length == 0) {
        throw new RuntimeException("config file name must be config, config is args[0]");
    }*/
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //DataStream<String> userStream = env.addSource(new RandomUserSource());

    ParameterTool parameterTool = Configs.loadConfig("config-test.properties");
    env.getConfig().setGlobalJobParameters(parameterTool);
    //String topic = parameterTool.getProperties().getProperty("kafka.user.info.topic");
    String topic="test1115";
    DataStream<String> userStream = env.addSource(Utils.createConsumers(parameterTool, topic));
    DataStream<String> userMapperStream = userStream.flatMap(new UserMapper());
    String index = "sys_user_test";
    String type = "user";
    userMapperStream.addSink(Utils.createEsProducer(parameterTool, index, type));

    env.execute("user-to-es");
}

版权声明:本文为weixin_43291055原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。