1.pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.8.0</version>
</dependency>2. 写es
public static ElasticsearchSink<String> createEsProducer(ParameterTool params, String index, String type) {
List<HttpHost> httpHosts = new ArrayList<>();
String url = params.getProperties().getProperty("es.http.url");
int port = Integer.parseInt(params.getProperties().getProperty("es.http.port"));
httpHosts.add(new HttpHost(url, port, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map map = JSONObject.parseObject(element, Map.class);
System.out.println(map);
return Requests.indexRequest()
.index(index)
.type(type)
.source(map);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(5000);
requestConfigBuilder.setSocketTimeout(40000);
requestConfigBuilder.setConnectionRequestTimeout(1000);
return requestConfigBuilder;
}
}).setMaxRetryTimeoutMillis(5*60*1000);
}
);
return esSinkBuilder.build();
}3.主程序
public static void main(String[] args) throws Exception {
/*if (args == null || args.length == 0) {
throw new RuntimeException("config file name must be config, config is args[0]");
}*/
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataStream<String> userStream = env.addSource(new RandomUserSource());
ParameterTool parameterTool = Configs.loadConfig("config-test.properties");
env.getConfig().setGlobalJobParameters(parameterTool);
//String topic = parameterTool.getProperties().getProperty("kafka.user.info.topic");
String topic="test1115";
DataStream<String> userStream = env.addSource(Utils.createConsumers(parameterTool, topic));
DataStream<String> userMapperStream = userStream.flatMap(new UserMapper());
String index = "sys_user_test";
String type = "user";
userMapperStream.addSink(Utils.createEsProducer(parameterTool, index, type));
env.execute("user-to-es");
}版权声明:本文为weixin_43291055原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。