flink 将数据写入到kafka

下面展示一些 内联代码片

package io.github.flink.test

/**
  * Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的
  * 输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
  * stream.addSink(new MySink(xxxx))
  */
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

object KafkaSinkTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //2、从文件读取数据
    val inputStream = env.readTextFile("*****.txt")

    // Transform操作

    val dataStream = inputStream
      .map(data => {
        val dataArray = data.split(",")
        CameraSpeedData( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString  // 转成String方便序列化输出
      }
      )

    // sink
    dataStream.addSink( new FlinkKafkaProducer011[String]( "localhost:9092","sinkTest", new SimpleStringSchema()) )
    dataStream.print()

    env.execute("kafka sink test")
  }
}


版权声明:本文为weixin_43847900原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。