flink 如何通过JdbcDialect扩展jdbc connector

1 前言

JDBC connect允许使用JDBC驱动程序从任何关系数据库读取数据或将数据写入数据库
Flink jdbc connector已经为依赖于jdbc进行操作的数据库系统提供了非常完善的功能。对于扩展只需要扩展其JdbcDialect即可。
目前官方版本只实现了三种JdbcDialect:
详见参见:flink jdbc connectors

  • MySQL
  • PostgreSQL
  • Derby

2 如何扩展

注意: 因为AbstractJdbcRowConverter是package可见的,所以继承此类的RowConverter需要放在相同的package中,但如果不继承此类,则没有此限制。

  1. 参照MySQLDialect写自定义的JDBCDialect,
  2. 在JdbcDialects类的DIALECTS列表中增加自定义的JDBCDialect,如ClickhouseJDBCDialect
  3. 编写自定义的RowConverter: 如Clickhouse会将UInt64作为BigInteger类型,将UInt32作为Long类型,所以需要重写createInternalConverter方法
  4. 将编译后的JdbcDialects class文件放到flink-connector-jdbc_2.11-1.13.2.jar中并替换flink中的此jar
  5. 将编写的*Dialect、*RowConverter等代码打包,放在flink任务的扩展jar目录下

clickhouse的扩展实现源代码请参见:flink-streaming-udf

2.1 自定义JDBCDialect的修改点

  • MAX_TIMESTAMP_PRECISION
  • MIN_TIMESTAMP_PRECISION
  • MAX_DECIMAL_PRECISION
  • MIN_DECIMAL_PRECISION
  • dialectName()
  • canHandle(String url) → “jdbc:xxx:” 如:“jdbc:clickhouse:”
  • getRowConverter(RowType rowType)→ 定义ClickhouseRowConverter,参考MySQLRowConverter
  • defaultDriverName() 驱动名称 如:“ru.yandex.clickhouse.ClickHouseDriver”
  • getLimitClause(long limit)
  • getUpsertStatement() 根据是否需要主键更新而定
  • getUpdateStatement 根据数据库特性进行调整
  • getDeleteStatement 根据数据库特性进行调整
  • unsupportedTypes

2.2 自定义RowConverter的修改点

  • 默认情况下,可以参照MySQLRowConverter的写法,直接调用父类
  • 如果出现转换错误,则需要重写createInternalConverter或createExternalConverter的方法,定义每种类型的字段的转换函数
  • 也可以直接重写toInternal,将ResultSet根据数据库的特性转换为RowData
  • 也可以重写toExternal,将RowData根据数据库的特性转换为FieldNamedPreparedStatement

3 测试

把自定义的jdbc扩展的源代码打成的jar放在sql-client的用户jar目录下

bin/sql-client.sh embedded -l ../jars

3.1 准备

在clickhouse数据库中创建表

DROP TABLE test;
CREATE TABLE IF NOT EXISTS test (
	INFO_ID String,
	DEVICE_ID String,
	SHOT_TIME UInt64,
	SHOT_DATE Nullable(UInt32)
)ENGINE = MergeTree()  PARTITION BY (toInt32(SHOT_TIME/1000000)) ORDER BY (DEVICE_ID,SHOT_TIME) SETTINGS index_granularity = 8192;


INSERT INTO test VALUES('1','10000001',20210918150000,20210918),('1','10000001',20210918150001,20210918),('1','10000001',20210918150002,20210918);

3.2 sql-client中测试

CREATE TABLE test (
   INFO_ID STRING,
   DEVICE_ID STRING,
   SHOT_TIME BIGINT,
   SHOT_DATE INT,
   PRIMARY KEY (INFO_ID) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:snowball://172.25.10.66:8123/bigdata?socket_timeout=3000000',
    'table-name' = 'test',
    'username' = 'root',
    'password' = '123456'
  );


select * from test limit 2;

select SHOT_DATE,COUNT(1) AS TOTAL FROM test where SHOT_TIME BETWEEN 20210901000000 AND 20210930000000 group by SHOT_DATE;

INSERT INTO test VALUES('1','10000001',20210921150000,20210921),('1','10000001',20210921150001,20210921),('1','10000001',20210921150002,20210921);


版权声明:本文为penriver原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。