Flink快速回忆之Streaming (DataStream API)

DataSource(数据源)

数据源是程序读取数据的来源,⽤户可以通env.addSource(SourceFunction),将SourceFunction添加到程序中。Flink内置许多已知实现的SourceFunction,但是⽤户可以⾃定义实现SourceFunction (⾮并⾏化的接⼝)接⼝或者实现 ParallelSourceFunction (并⾏化)接⼝,如果需要有状态管理还可以继承 RichParallelSourceFunction .

File-based(以文件为基础的来源)

readTextFile(path) - Reads(once) text files, i.e. files that respect the TextInputFormatspecification, line-by-line and returns them as Strings.
readTextFile(path)逐行读取(一次)文本文件,即遵循文本文件输入格式规格并将其作为字符串返回的文件。

 //1.创建流计算执⾏环境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化  从HDFS 里读取文本文件
 val text:DataStream[String] = env.readTextFile("hdfs://CentOS:9000/demo/words")
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file inputformat.
readFile(fileInputFormat, path) -根据指定的文件输入格式读取(一次)文件。

 //1.创建流计算执⾏环境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建文件输入格式
 var inputFormat:FileInputFormat[String]=new TextInputFormat(null)
  //2.创建DataStream - 细化
   val text:DataStream[String] =
env.readFile(inputFormat,"hdfs://CentOS:9000/demo/words")
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)//以0下标为key
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)–This is the method called internally by the two previous ones. It reads files in the path based on thegiven fileInputFormat . Depending on the provided watchType , this source may periodicallymonitor (every interval ms) the path for new data( FileProcessingMode.PROCESS_CONTINUOUSLY ), or process once the data currently in the pathand exit (FileProcessingMode.PROCESS_ONCE ). Using the pathFilter , the user can furtherexclude files from being processed.

readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)–这是前两个方法在内部调用的方法。它根据给定的**文本文件输入格式读取路径中的文件。根据所提供的 观看型号,此源可以定期调用监视(每隔ms)新数据的路径( FileProcessingMode.PROCESS_CONTINUOUSLY( 文件处理模式.过程连续))或处理当前路径中的数据并退出( FileProcessingMode.PROCESS_ONCE(文件处理模式.读取一次))。使用路径过滤器**,用户可以进一步排除正在处理的文件。
在这里插入图片描述
补充:该⽅法会检查采集⽬录下的⽂件,如果⽂件发⽣变化系统会重新采集。此时可能会导致⽂件的重复计算。⼀般来说不建议修改⽂件内容,直接上传新⽂件即可

//1.创建流计算执⾏环境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 var inputFormat:FileInputFormat[String]=new TextInputFormat(null)
 val text:DataStream[String] = env.readFile(inputFormat,
 "hdfs://CentOS:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

Socket Based(基于套接字的来源)

socketTextStream - Reads from a socket. Elements can be separated by a delimiter.
socketTextStream -从套接字读取。元素可以用分隔符分隔
在这里插入图片描述

//1.创建流计算执⾏环境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val text = env.socketTextStream("CentOS", 9999,'\n',3)
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

Collection-based 基于集合

//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val text = env.fromCollection(List("this is a demo","hello word"))
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

UserDefinedSource 用户定义的来源

SourceFunction

import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
 @volatile //防⽌线程拷⻉变量
 var isRunning:Boolean=true
 val lines:Array[String]=Array("this is a demo","hello world","ni hao ma")
 //在该⽅法中启动线程,通过sourceContext的collect⽅法发送数据
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
 while(isRunning){
 Thread.sleep(100)
 //输送数据给下游
 sourceContext.collect(lines(new Random().nextInt(lines.size)))
 }
 }
 //释放资源
 override def cancel(): Unit = {
 isRunning=false
 }
}

ParallelSourceFunction

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction,
SourceFunction}
import scala.util.Random
class UserDefinedParallelSourceFunction extends ParallelSourceFunction[String]{
 @volatile //防⽌线程拷⻉变量
 var isRunning:Boolean=true
 val lines:Array[String]=Array("this is a demo","hello world","ni hao ma")
 //在该⽅法中启动线程,通过sourceContext的collect⽅法发送数据
 override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
 while(isRunning){
 Thread.sleep(100)
 //输送数据给下游
 sourceContext.collect(lines(new Random().nextInt(lines.size)))
 }
 }
 //释放资源
 override def cancel(): Unit = {
 isRunning=false
 }
}

下游来接收

//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(4)
 //2.创建DataStream - 细化
 val text = env.addSource[String](⽤户定义的SourceFunction)
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
  counts.print()
 println(env.getExecutionPlan) //打印执⾏计划
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

Kafka集成

引⼊maven

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.10.0</version>
</dependency>

SimpleStringSchema–简单的字符串模式
该SimpleStringSchema⽅案只会反序列化kafka中的value

//1.创建流计算执⾏环境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val props = new Properties() //kafka 的连接属性
 props.setProperty("bootstrap.servers", "CentOS:9092")
 props.setProperty("group.id", "g1")
 //                            创建Flink与kafka的连接通道
 val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

KafkaDeserializationSchema–kafka反序列化模式

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._

class UserDefinedKafkaDeserializationSchema  extends KafkaDeserializationSchema[(String, String, Int, Long)]{
  //是否结束流计算                 因为是流计算是持续的不能结束
  override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false
    //反序列化
  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
    if(consumerRecord.key()!=null){ //如果消费记录的key不为空   则将消费记录的k,v,分区数,偏移量等返回
      (new String(consumerRecord.key()),new
          String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }else{//如果消费记录的key为空  则返回一个k=null  ,v,分区数,偏移量照常返回
      (null,new
          String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }
  }
//  获得生产类型
  override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
    //提醒 : 如何要 create 创建 需要导import org.apache.flink.api.scala._
    createTypeInformation[(String, String, Int, Long)]
  }
}

获取数据并打印输出

def main(args: Array[String]): Unit = {
    //1.创建流计算执⾏环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream - 细化
    val props = new Properties()
    props.setProperty("bootstrap.servers", "SparkTwo:9092")
    props.setProperty("group.id", "g1")
    val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]//通道传递的类型
    ("topic01",new UserDefinedKafkaDeserializationSchema(),props))
    //3.执⾏DataStream的转换算⼦
    val counts = text.flatMap(t=> t._2.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)
    //4.将计算的结果在控制打印
    counts.print()
    //5.执⾏流计算任务
    env.execute("Window Stream WordCount")
  }

补充下 拿kafka 消费者记录的不同信息的方法

//拿出不同的数据信息
    private static void shum(ConsumerRecord<String, String> next){
        String topic = next.topic();//信息
        int partition = next.partition();//分区数
        long offset = next.offset();//偏移量
        String key = next.key();//key
        String value = next.value();//value
        long timestamp = next.timestamp();//时间戳
        System.out.println("信息"+topic+"分区数"+partition+"偏移量"+offset+"key"+"value"+value+"时间戳"+timestamp);
    }

JSONKeyValueNodeDeserializationSchema–JSON键值节点反序列化模式
要求Kafka中的topic的key和value都必须是json格式,也可以在使⽤的时候,指定是否读取元数据(topic、分区、offset等)

//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val props = new Properties()
 props.setProperty("bootstrap.servers", "CentOS:9092")
 props.setProperty("group.id", "g1")
 //{"id":1,"name":"zhangsan"}
  val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new
JSONKeyValueDeserializationSchema(true),props))
 //t:{"value":{"id":1,"name":"zhangsan"},"metadata":
{"offset":0,"topic":"topic01","partition":13}}
 text.map(t=> (t.get("value").get("id").asInt(),t.get("value").get("name").asText()))
 .print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

kafka 与 Flink 的集成 文档参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

Data Sinks(数据输出)

Data Sink使⽤DataStreams并将其转发到⽂件,Socket,外部系统或打印它们。 Flink带有多种内置输出格式,这些格式封装在DataStreams的操作后⾯。

File-based(基于文件输出)

writeAsText() / TextOutputFormat(文本格式的文件输出格式)–将元素按行写入为字符串。这些字符串是通过调用每个元素的toString()方法获得的
writeAsCsv(...) / CsvOutputFormat(Csv输出格式)–将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
writeUsingOutputFormat/ FileOutputFormat(文件输出格式)–方法和自定义文件输出的基类。支持自定义对象到字节的转换。

writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtainedby calling the toString() method of each element.
writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field
delimiters are configurable. The value for each field comes from the toString() method of the objects.
writeUsingOutputFormat/ FileOutputFormat - Method and base class for custom file outputs.Supports custom object-to-bytes conversion.

注意:DataStream上的write*()⽅法主要⽤于调试⽬的。

//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val text = env.socketTextStream("CentOS", 9999)
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new
Path("file:///Users/admin/Desktop/flink-results")))
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

注意:如果改成HDFS,需要⽤户⾃⼰产⽣⼤量数据,才能看到测试效果,原因是因为HDFS⽂
件系统写⼊时的缓冲区⽐较⼤。以上写⼊⽂件系统的Sink不能够参与系统检查点,如果在⽣产环境下通常使⽤flink-connector-filesystem写⼊到外围系统。

生产环境下使用flink-connector-filesystem写⼊到外围系统。

首先要导依赖

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-filesystem_2.11</artifactId>
 <version>1.10.0</version>
</dependency>
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
 var bucketingSink=StreamingFileSink.forRowFormat(new
Path("hdfs://CentOS:9000/bucket-results"),
 new
SimpleStringEncoder[(String,Int)]("UTF-8"))
 .withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd"))//动态产⽣写⼊的路径
  .build()
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 counts.addSink(bucketingSink)
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

老版写法

//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(4)
 //2.创建DataStream - 细化
 val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
 var bucketingSink=new BucketingSink[(String,Int)]("hdfs://CentOS:9000/bucketresults")
 bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
 bucketingSink.setBatchSize(1024)
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 counts.addSink(bucketingSink)
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

UserDefinedSinkFunction --用户定义接收函数

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{

 override def open(parameters: Configuration): Unit = {
 println("打开链接...")
 }
 override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit =
{
 println("输出:"+value)
 }
  override def close(): Unit = {
 println("释放连接")
 }
}
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(1)
 //2.创建DataStream - 细化
 val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
 var bucketingSink=new BucketingSink[(String,Int)]("hdfs://CentOS:9000/bucketresults")
 bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
 bucketingSink.setBatchSize(1024)
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 counts.addSink(new UserDefinedSinkFunction)
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")

RedisSink

参考文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

首先还是导入依赖

<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object FlinkOne {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //2.创建DataStream - 细化
    val text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")
    var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
      .setHost("SparkTwo")
      .setPort(6379)
      .build()
    //3.执⾏DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)
    counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedRedisMapper()))
    //5.执⾏流计算任务
    env.execute("Window Stream WordCount")
  }
}
class UserDefinedRedisMapper extends RedisMapper[(String,Int)]{
  //获取命令的描述
  override def getCommandDescription: RedisCommandDescription = {
    //                           redis命令 . 设置一个散列值    附加/额外的k
    new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
  }
  //获取数据中的key
  override def getKeyFromData(t: (String, Int)): String = {
    t._1
  }
  //获取数据中的v
  override def getValueFromData(t: (String, Int)): String = {
    t._2.toString
  }
}

Kafka集成

首先还是依赖

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.10.0</version>
</dependency>

方法一:

package com.baizhi.flinkKafka

import java.lang
import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord

object KafkaAndFlink {
  def main(args: Array[String]): Unit = {
    //创建流计算环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //连接kafka 的属性
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","SparkTwo:9092")//服务程序
    properties.setProperty("group.id", "g1")//组
    val text = env.addSource(new FlinkKafkaConsumer[(String, String, Int, Long)]("topics01", new userde()
      , properties))
    //输入到kafka
    val value= new FlinkKafkaProducer[(String, Int)]("defult_topic"
      , new UserDefinedKafkaSerializationSchema(), properties, Semantic.AT_LEAST_ONCE)

    val counts = text.flatMap(t=> t._2.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
        .sum(1)

    //counts.print()
    counts.addSink(value)
    env.execute("Window Stream WordCount")
  }
}
//输出到kafka 的那个topic 输出什么数据
class UserDefinedKafkaSerializationSchema extends KafkaSerializationSchema[(String,Int)]{
  override def serialize(t: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
     new ProducerRecord("topic01",t._1.getBytes(),t._2.toString.getBytes())
  }
}

//kafka反序列化模式 看前面的输入源kafka
class userde extends  KafkaDeserializationSchema[(String, String, Int, Long)]{
  override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
    if (!(consumerRecord.key()==null)){
      (consumerRecord.key().toString,consumerRecord.value().toString,consumerRecord.partition(),consumerRecord.offset())
    }else{
      (null,consumerRecord.value().toString,consumerRecord.partition(),consumerRecord.offset())
    }
  }
  override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
    createTypeInformation[(String, String, Int, Long)]
  }
}

提醒:上面的 defult_topic 没有任何意义

方法二:

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
import org.apache.kafka.clients.producer.ProducerConfig

object KafkaAndFlinkTwo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")
    val props = new Properties()
    props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SparkTwo:9092")
    props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")//一次读取的长度
    props.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")//逗留的时间ms
    //Semantic.EXACTLY_ONCE:开启kafka幂等写特性
    //Semantic.AT_LEAST_ONCE:开启Kafka Retries机制
    val kafakaSink = new FlinkKafkaProducer[(String, Int)]("defult_topic",
      new UserDefinedKeyedSerializationSchema, props, Semantic.AT_LEAST_ONCE)
    //3.执⾏DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)
    counts.addSink(kafakaSink)
   // counts.print()
    //5.执⾏流计算任务
    env.execute("Window Stream WordCount")
  }
}
class UserDefinedKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
  override def serializeKey(t: (String, Int)): Array[Byte] = {t._1.getBytes()}

  override def serializeValue(t: (String, Int)): Array[Byte] = {t._2.toString.getBytes()}
//可以覆盖 默认是topic,如果返回null,则将数据写⼊到默认的topic中
  override def getTargetTopic(t: (String, Int)): String = {
    "topic01"
  }
}

版权声明:本文为weixin_46001623原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。