Flink实时笔记(五):写数据到各种库(Sink环节)

目录

​​​​​

一  写数据到Kafka(KafkaSink)

二  RedisSink

三  ElasticsearchSink

四  自定义Sink(写数据到Mysql)

五  传数据到JDBCSink


一  写数据到Kafka(KafkaSink)

首先添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>

代码

import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class Flink12_Sink_Kafka {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.从端口获取数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        //3.把数据转为JSON字符串
        SingleOutputStreamOperator<String> waterSensorJsonStream = streamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] split = value.split(",");
                WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                return JSON.toJSONString(waterSensor);
            }
        });

        //TODO 4.将数据写入Kafka
        waterSensorJsonStream.addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "topic_sensor", new SimpleStringSchema()));

        env.execute();
    }
}

 查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_sensor

二  RedisSink

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>
import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class Flink13_Sink_Redis {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.从端口获取数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        //3.把数据转为JSON字符串
        SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] split = value.split(",");
                WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                return waterSensor;
            }
        });

        //TODO 4.将数据写入Redis
        waterSensorJsonStream.addSink(new RedisSink<>(new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build(), new RedisMapper<WaterSensor>() {
            /**
             * 指定命令
             * additionalKey在使用hash类型的时候,这个key指的是Redis的大key
             * @return
             */
            @Override
            public RedisCommandDescription getCommandDescription() {
//                return new RedisCommandDescription(RedisCommand.SET);
                return new RedisCommandDescription(RedisCommand.HSET,"520520");
//上面的就是hash类型,注释掉的是String
            }

            /**
             * 从数据中提取Redis的key
             *当使用的是String,set这些类型这个key指的是redis的key
             * 当使用的是Hash的话这个key指的是filed
             *
             * @param data
             * @return
             */
            @Override
            public String getKeyFromData(WaterSensor data) {
                return data.getId();
            }

            /**
             * 从数据中提取redis的Value
             *
             * @param data
             * @return
             */
            @Override
            public String getValueFromData(WaterSensor data) {
                return JSON.toJSONString(data);
            }
        }));

        env.execute();
    }
}

三  ElasticsearchSink

首先添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
    <version>1.13.0</version>
</dependency>

代码

import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;

public class Flink01_Sink_Es {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.从端口获取数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        //3.把数据转为JSON字符串
        SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] split = value.split(",");
                WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                return waterSensor;
            }
        });

        //TODO 4.将数据写入ES
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102", 9200));
        httpHosts.add(new HttpHost("hadoop103", 9200));
        httpHosts.add(new HttpHost("hadoop104", 9200));
        ElasticsearchSink.Builder<WaterSensor> sensorBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<WaterSensor>() {
            @Override
            public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {

                //指定索引、类型、文档Id
                IndexRequest indexRequest = new IndexRequest("220309flink", "_doc", element.getId());
                //将数据转为Json字符串
                String jsonStr = JSON.toJSONString(element);
                //将数据写入ES
                indexRequest.source(jsonStr, XContentType.JSON);
                indexer.add(indexRequest);
            }
        });
        //这个参数设置为1指的是来一条就往es写一条,生产场景不建议设置为1
        sensorBuilder.setBulkFlushMaxActions(1);
        waterSensorJsonStream.addSink(sensorBuilder.build());

        env.execute();
    }
}

注意

  • 如果出现如下错误: 

 添加log4j2的依赖:

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-to-slf4j</artifactId>
    <version>2.14.0</version>
</dependency>

  • 如果是无界流, 需要配置bulk的缓存 注意:生产中不要这样设置为1

esSinkBuilder.setBulkFlushMaxActions(1);

四  自定义Sink(写数据到Mysql)

目标:自定义一个到Mysql的Sink

首先必须在Mysql中创建表,它不会帮你自动创建的

create database test;
use test;
CREATE TABLE `sensor` (
  `id` varchar(20) NOT NULL,
  `ts` bigint(20) NOT NULL,
  `vc` int(11) NOT NULL,
  PRIMARY KEY (`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

导入驱动

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>

自定义


import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;

public class Flink02_Sink_Custom {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.从端口获取数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        //3.把数据转为JSON字符串
        SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] split = value.split(",");
                WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                return waterSensor;
            }
        });

        //TODO 4.自定义Sink将数据写入Mysql
        waterSensorJsonStream.addSink(new MySink());

        env.execute();
    }

    public static class MySink extends RichSinkFunction<WaterSensor>{

        private Connection connection;
        private PreparedStatement pstm;
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("创建连接");
            //创建链接
            connection = DriverManager
                    .getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false",
                            "root", "123456");
            //语句预执行者
            pstm = connection.prepareStatement("insert into sensor values (?,?,?)");
        }

        @Override
        public void invoke(WaterSensor value, Context context) throws Exception {
            //给占位符赋值
            pstm.setString(1, value.getId());
            pstm.setLong(2, value.getTs());
            pstm.setInt(3, value.getVc());

            //执行语句
            pstm.execute();
        }

        @Override
        public void close() throws Exception {
            System.out.println("关闭连接");
            pstm.close();
            connection.close();
        }
    }
}

五  传数据到JDBCSink

添加依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.12</artifactId>
  <version>1.13.0</version>
</dependency>

代码


import com.atguigu.bean.WaterSensor;
import com.mysql.jdbc.Driver;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Flink03_Sink_JDBC {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.从端口获取数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        //3.把数据转为JSON字符串
        SingleOutputStreamOperator<WaterSensor> waterSensorJsonStream = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] split = value.split(",");
                WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                return waterSensor;
            }
        });

        //TODO 4.使用JDBC将数据写入Mysql
        SinkFunction<WaterSensor> sink = JdbcSink.sink(
                "insert into sensor values (?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor)
                            throws SQLException {
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                new JdbcExecutionOptions.Builder()
                        //数据来一条写一条
                        .withBatchSize(1)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName(Driver.class.getName())
                        .withUsername("root")
                        .withPassword("123456")
                        .withUrl("jdbc:mysql://hadoop102:3306/test?useSSL=false")
                        .build());
        waterSensorJsonStream.addSink(sink);

        env.execute();
    }

}


版权声明:本文为YyyZzzLllx原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。