背景
最近工作中需要自定义开发一些flink sql的connector,因为官方提供的connector毕竟有限,在我们工作中可能会用到各种各样的中间件。所以官方没有提供的就需要我们自定义开发。
就是如:
CREATE TABLE XXX(
A STRING,
B BIGINT)
WITH(
‘connect.type’ = ‘kafka’,
…
)
所以开发一个自己的connector需要做哪些,本文就来总结一下开发的主要步骤,以及我遇到的问题怎么解决的。
开发
自定义Factory,根据需要实现StreamTableSourceFactory和StreamTableSinkFactory
根据需要继承ConnectorDescriptorValidator,定义自己的connector参数(with 后面跟的那些)
Factory中的requiredContext、supportedProperties都比较重要,框架中对Factory的过滤和检查需要他们
需要自定义个TableSink,根据你需要连接的中间件选择是AppendStreamTableSink、Upsert、Retract
重写consumeDataStream方法
- 自定义一个SinkFunction,在invoke方法中实现将数据写入到外部中间件。
以上5步基本上可以写一个简单的sql-connector了
问题
异常信息:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.address=localhost:9091 connector.job=testJob connector.metrics=testMetrics connector.type=xxxx schema.0.data-type=ROW<`val` DOUBLE> schema.0.name=value
有两个问题都导致上面的报错
- discoverFactory时找不到我自定义的Factory
解决方法:
添加如下目录及文件
- 根据properties过滤时过滤掉了我的Factory,代码在TableFactoryService#filterBySupportedProperties
解决方法:在自定义Factory#supportedProperties方法中添加schema的配置
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
2.序列化问题
异常信息:
xxx类不能被序列化
原因:开始在我的sinkFunction中有个TableSchema属性,该属性不能被序列化,TableSchema我是用来获取字段信息的,后来直接改成了fieldName数组,从TableSchema.getFieldNames()获取。
改完后又报了我使用的Util类不能序列化,我把util实现了Serializable接口解决
3.sink类型不匹配问题
异常如下:
org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.connector.prometheus.xxxTableSink doesn't implement this method.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:142)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at PrometheusConnectorTest.test(PrometheusConnectorTest.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
但是我xxxTableSink实现的是AppendStreamTableSink,AppendStreamTableSink继承了StreamTableSink
解决方法:
实现一个emitDataStream空方法,重写consumeDataStream即可
注意:flink 1.11版本emitDataStream方法已被移除
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream
.addSink(new xxxSinkFunction())
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}
暂时还没涉及到TableFormat的自定义,如果后面涉及到这方面的开发再来分享
关于调试
代码开发完后,整个编译一下flink的代码(如果之前编译过,那么只需要编译你的新工程即可)。
有两种方法进行测试
- 整个编译完后会在flink-dist下生成flink的运行包,并把你的connector包放到lib目录下,通过bin/start-cluster.sh启动单机环境,再通过sql client来测试。如果代码改动后只需要把你的工程重新打包即可。
- idea里面进行调试(比较推荐这种方式),可以一行行的debug。
如果对你帮助,帮忙点个star。
补充
Flink 1.11版本对TableSource/Sink接口进行了重构,引入了DynamicTable的概念,更进一步的融合了流和批的处理。并且在官网有了自定义connector的详细说明和样例。并且With参数的配置也有了比较大的变化,比如原来的connector.type
简化成了connector
参数,同时为了向后兼容保留了原来的实现。所以根据你的ddl里面的写法不同选择走不同的分支。具体源码在PlannerBase类的isLegacyConnectorOptions方法
private def isLegacyConnectorOptions(
objectIdentifier: ObjectIdentifier,
catalogTable: CatalogTable) = {
// normalize option keys
val properties = new DescriptorProperties(true)
properties.putProperties(catalogTable.getOptions)
if (properties.containsKey(ConnectorDescriptorValidator.CONNECTOR_TYPE)) {//如果配置包含connector.type 则是老的方式
true
} else {
val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
try {
// try to create legacy table source using the options,
// some legacy factories uses the new 'connector' key
TableFactoryUtil.findAndCreateTableSink(
catalog.orElse(null),
objectIdentifier,
catalogTable,
getTableConfig.getConfiguration,
isStreamingMode)
// success, then we will use the legacy factories
true
} catch {
// fail, then we will use new factories
case _: Throwable => false
}
}
}
对于1.11新版本的实现方式可以参考git上的demo
https://github.com/zhuxiaoshang/flink-be-god/tree/branch_1.11.0/flink-connector/flink-sql-connector-customized/src/main/java/sql/connector/customized/dynamic