Flink 1.12.1 JDBC连接 Sink
以MySQL为例,参考Flink官网,地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/jdbc.html
- Flink版本:1.12.1
- scala版本:2.12
- MySQL版本:8.0.22
pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
代码
package com.lhn.sink
import java.sql.PreparedStatement
import com.lhn.bean.SensorSource
import com.lhn.sink.SinkKafkaTest.SensorReading
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._
object JDBCSinkTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream: DataStream[SensorReading] = env.addSource(new SensorSource)
val jdbcStmtBuilder: JdbcStatementBuilder[SensorReading] = new JdbcStatementBuilder[SensorReading] {
override def accept(t: PreparedStatement, u: SensorReading) = {
t.setString(1, u.sensor)
t.setString(2, u.curTemp.toString)
t.setString(3, u.curTime.toString)
}
}
dataStream.addSink(JdbcSink.sink[SensorReading](
"insert into lhn.lhn (id,temperature,timestamp) values (?,?,?)",
jdbcStmtBuilder,
JdbcExecutionOptions.builder()
.withBatchIntervalMs(100)
.withBatchSize(5)
.withMaxRetries(0)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/lhn")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("00000000")
.build()
))
env.execute()
}
}
输出结果
版权声明:本文为weixin_42066446原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。