目录
一 写数据到Kafka(KafkaSink)
首先添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
代码
import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class Flink12_Sink_Kafka {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.把数据转为JSON字符串
SingleOutputStreamOperator<String> waterSensorJsonStream = streamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] split = value.split(",");
WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
return JSON.toJSONString(waterSensor);
}
});
//TODO 4.将数据写入Kafka
waterSensorJsonStream.addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "topic_sensor", new SimpleStringSchema()));
env.execute();
}
}
查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_sensor
二 RedisSink
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class Flink13_Sink_Redis {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.把数据转为JSON字符串
SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
return waterSensor;
}
});
//TODO 4.将数据写入Redis
waterSensorJsonStream.addSink(new RedisSink<>(new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build(), new RedisMapper<WaterSensor>() {
/**
* 指定命令
* additionalKey在使用hash类型的时候,这个key指的是Redis的大key
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
// return new RedisCommandDescription(RedisCommand.SET);
return new RedisCommandDescription(RedisCommand.HSET,"520520");
//上面的就是hash类型,注释掉的是String
}
/**
* 从数据中提取Redis的key
*当使用的是String,set这些类型这个key指的是redis的key
* 当使用的是Hash的话这个key指的是filed
*
* @param data
* @return
*/
@Override
public String getKeyFromData(WaterSensor data) {
return data.getId();
}
/**
* 从数据中提取redis的Value
*
* @param data
* @return
*/
@Override
public String getValueFromData(WaterSensor data) {
return JSON.toJSONString(data);
}
}));
env.execute();
}
}
三 ElasticsearchSink
首先添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.13.0</version>
</dependency>
代码
import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
public class Flink01_Sink_Es {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.把数据转为JSON字符串
SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
return waterSensor;
}
});
//TODO 4.将数据写入ES
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop102", 9200));
httpHosts.add(new HttpHost("hadoop103", 9200));
httpHosts.add(new HttpHost("hadoop104", 9200));
ElasticsearchSink.Builder<WaterSensor> sensorBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<WaterSensor>() {
@Override
public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {
//指定索引、类型、文档Id
IndexRequest indexRequest = new IndexRequest("220309flink", "_doc", element.getId());
//将数据转为Json字符串
String jsonStr = JSON.toJSONString(element);
//将数据写入ES
indexRequest.source(jsonStr, XContentType.JSON);
indexer.add(indexRequest);
}
});
//这个参数设置为1指的是来一条就往es写一条,生产场景不建议设置为1
sensorBuilder.setBulkFlushMaxActions(1);
waterSensorJsonStream.addSink(sensorBuilder.build());
env.execute();
}
}
注意
- 如果出现如下错误:
添加log4j2的依赖:
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
- 如果是无界流, 需要配置bulk的缓存 注意:生产中不要这样设置为1
esSinkBuilder.setBulkFlushMaxActions(1);
四 自定义Sink(写数据到Mysql)
目标:自定义一个到Mysql的Sink
首先必须在Mysql中创建表,它不会帮你自动创建的
create database test;
use test;
CREATE TABLE `sensor` (
`id` varchar(20) NOT NULL,
`ts` bigint(20) NOT NULL,
`vc` int(11) NOT NULL,
PRIMARY KEY (`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
导入驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
自定义
import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
public class Flink02_Sink_Custom {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.把数据转为JSON字符串
SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
return waterSensor;
}
});
//TODO 4.自定义Sink将数据写入Mysql
waterSensorJsonStream.addSink(new MySink());
env.execute();
}
public static class MySink extends RichSinkFunction<WaterSensor>{
private Connection connection;
private PreparedStatement pstm;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("创建连接");
//创建链接
connection = DriverManager
.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false",
"root", "123456");
//语句预执行者
pstm = connection.prepareStatement("insert into sensor values (?,?,?)");
}
@Override
public void invoke(WaterSensor value, Context context) throws Exception {
//给占位符赋值
pstm.setString(1, value.getId());
pstm.setLong(2, value.getTs());
pstm.setInt(3, value.getVc());
//执行语句
pstm.execute();
}
@Override
public void close() throws Exception {
System.out.println("关闭连接");
pstm.close();
connection.close();
}
}
}
五 传数据到JDBCSink
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.0</version>
</dependency>
代码
import com.atguigu.bean.WaterSensor;
import com.mysql.jdbc.Driver;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class Flink03_Sink_JDBC {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.把数据转为JSON字符串
SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
return waterSensor;
}
});
//TODO 4.使用JDBC将数据写入Mysql
SinkFunction<WaterSensor> sink = JdbcSink.sink(
"insert into sensor values (?,?,?)",
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor)
throws SQLException {
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
}
},
new JdbcExecutionOptions.Builder()
//数据来一条写一条
.withBatchSize(1)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(Driver.class.getName())
.withUsername("root")
.withPassword("123456")
.withUrl("jdbc:mysql://hadoop102:3306/test?useSSL=false")
.build());
waterSensorJsonStream.addSink(sink);
env.execute();
}
}
版权声明:本文为YyyZzzLllx原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。