使用flink-cdc采集mysql数据

前言:在使用flink-cdc采集mysql数据时,会遇到各种问题,本文记录了使用flink-cdc采集mysql的流程操作。

1.版本选择:

​ 本人使用的是flink1.15.0 和 flink-connector-mysql-cdc 2.2.0

2.冲突问题:

​ 直接引用会有版本冲突:flink-shaded-guava30和flink-shaded-guava18冲突,因为flink15使用的是guava30,而flink-connector-mysql-cdc的2.2版本用的是guava18

3.解决冲突:

​ 这里我选用重新编译flink-cdc-connectors,在git上下载2.2版本的flink-cdc-connectors源码,修改代码中用到flink-shaded-guava18的代码。

3.1 修改项目pom

​ 把flink-shaded-guava18.0-13.0版本升级到30.1.1-jre-15.0

3.2 修改源码

import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder改成import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder用到guava18的地方均改为guava30

3.3 重新编译:
mvn spotless:apply
mvn clean install -Dmaven.test.skip=true
3.4 其他:

​ 最新master的2.3版本的flink-connector-mysql-cdc直接使用的就是flink-shaded-guava:30.1.1-jre-15.0,也可以替换成2.3版本

4.开发流程

​ 1.导入重新编译的flink-connector-mysql-cdc依赖

​ 2.直接上完整代码

public class FlinkMysqlCDC {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("hc")
                .tableList("hc.student")
                .username("flinkuser")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySql Source")
                .setParallelism(4)
                .print().setParallelism(1);

        env.execute("MySql Source Reader");
    }
}

3.mysql权限和设置

# 创建mysql用户:
CREATE USER 'flinkuser'@'localhost' IDENTIFIED BY '123456';

# 授予用户所需权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser' IDENTIFIED BY '123456';

#最终确定用户的权限
FLUSH PRIVILEGES;

# 设置server id (目前没发现需要的实际操作)
SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;

#mysql会话超时(大型作业可能会用到)
interactive_timeout
wait_timeout

#增量快照原理
#将表拆分成块(chunk),并行读取chunk;
#1.记录binlog的low_offset,2.读取快照,3.binlog为high_offset,4.读取low-high的binlog合到chunk中输出,最后单个读取high后的binlog

欢迎分享讨论。


版权声明:本文为weixin_44990574原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。