Flink 1.12.1 JDBC连接 Sink

Flink 1.12.1 JDBC连接 Sink

以MySQL为例,参考Flink官网,地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/jdbc.html

  • Flink版本:1.12.1
  • scala版本:2.12
  • MySQL版本:8.0.22

pom依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.1</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.22</version>
</dependency>

代码

package com.lhn.sink

import java.sql.PreparedStatement

import com.lhn.bean.SensorSource
import com.lhn.sink.SinkKafkaTest.SensorReading
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._

object JDBCSinkTest {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream: DataStream[SensorReading] = env.addSource(new SensorSource)


    val jdbcStmtBuilder: JdbcStatementBuilder[SensorReading] = new JdbcStatementBuilder[SensorReading] {
      override def accept(t: PreparedStatement, u: SensorReading) = {
        t.setString(1, u.sensor)
        t.setString(2, u.curTemp.toString)
        t.setString(3, u.curTime.toString)
      }
    }

    dataStream.addSink(JdbcSink.sink[SensorReading](
      "insert into lhn.lhn (id,temperature,timestamp) values (?,?,?)",
      jdbcStmtBuilder,
      JdbcExecutionOptions.builder()
        .withBatchIntervalMs(100)
        .withBatchSize(5)
        .withMaxRetries(0)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mysql://localhost:3306/lhn")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("root")
        .withPassword("00000000")
        .build()
    ))


    env.execute()
  }

}

输出结果

在这里插入图片描述


版权声明:本文为weixin_42066446原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。