flink对接kafka并且写进MySQL(只有代码)

package ToMysql

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import java.util.Properties


object MysqlTest {
  def main(args: Array[String]): Unit = {
    // 创建环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 并行度为1
    env.setParallelism(1)
    // 连接kafka
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","node2:9092")
    properties.setProperty("zookeeper.connect","node2:2181")
    properties.setProperty("group.id","jos")
    // 消费kafka
    val KafkaTest: DataStream[String] = env.addSource(new FlinkKafkaConsumer08[String]("test",new SimpleStringSchema(),properties))
    KafkaTest.print()


    val map: SingleOutputStreamOperator[student] = KafkaTest.map(new MapFunction[String, student]() {
      override def map(value: String): student = {
        val split: Array[String] = value.split(" ")
        // new 一个表的实体类
        val stu: student = new student
        // 赋值
        stu.setId(split(0))
        stu.setName(split(1))
        stu.setAge(split(2).toInt)
        stu
      }
    })
    map.addSink(new SinkToMySql())
    env.execute("MyJdbcSink2")
  }

  class SinkToMySql() extends RichSinkFunction[student] {

    var conn: Connection = null;
    var ps: PreparedStatement = null

    val driver = "com.mysql.jdbc.Driver"
    val url: String = "jdbc:mysql://node01:3306/test"

    val username = "root"
    val password = "root"
    val maxActive = "20"

    //初始化的操作
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      super.open(parameters)
      Class.forName("com.mysql.jdbc.Driver")
      conn = DriverManager.getConnection(url, username, password)
      conn.setAutoCommit(false)
    }
    //反复调用的函数
    override def invoke(value: student): Unit = {
      val sql: String = "insert into student(name,age) values(?,?)"
      ps = conn.prepareStatement(sql)
      //ps.setString(0,value.getId)
      ps.setString(1,value.getName)
      ps.setString(2,value.getAge.toString)
      ps.execute()
      conn.commit()
    }
    override def close(): Unit = {
      super.close()
      if (conn != null) {
        conn.close()
      }
    }
  }
}

版权声明:本文为weixin_44694973原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。