Streaming (DataStream API(数据流接口)
DataSource(数据源)
数据源是程序读取数据的来源,⽤户可以通env.addSource(SourceFunction)
,将SourceFunction
添加到程序中。Flink内置许多已知实现的SourceFunction
,但是⽤户可以⾃定义实现SourceFunction (⾮并⾏化的接⼝)
接⼝或者实现 ParallelSourceFunction (并⾏化)
接⼝,如果需要有状态管理还可以继承 RichParallelSourceFunction
.
File-based(以文件为基础的来源)
readTextFile(path)
- Reads(once) text files, i.e. files that respect the TextInputFormatspecification, line-by-line and returns them as Strings.
readTextFile(path)
逐行读取(一次)文本文件,即遵循文本文件输入格式规格并将其作为字符串返回的文件。
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化 从HDFS 里读取文本文件
val text:DataStream[String] = env.readTextFile("hdfs://CentOS:9000/demo/words")
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
readFile(fileInputFormat, path)
- Reads (once) files as dictated by the specified file inputformat.
readFile(fileInputFormat, path)
-根据指定的文件输入格式读取(一次)文件。
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建文件输入格式
var inputFormat:FileInputFormat[String]=new TextInputFormat(null)
//2.创建DataStream - 细化
val text:DataStream[String] =
env.readFile(inputFormat,"hdfs://CentOS:9000/demo/words")
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)//以0下标为key
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
–This is the method called internally by the two previous ones. It reads files in the path based on thegivenfileInputFormat
. Depending on the providedwatchType
, this source may periodicallymonitor (every interval ms) the path for new data(FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the pathand exit (FileProcessingMode.PROCESS_ONCE ). Using thepathFilter
, the user can furtherexclude files from being processed.
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
–这是前两个方法在内部调用的方法。它根据给定的**文本文件输入格式
读取路径中的文件。根据所提供的观看型号
,此源可以定期调用监视(每隔ms)新数据的路径( FileProcessingMode.PROCESS_CONTINUOUSLY(文件处理模式.过程连续
。))或处理当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE
(文件处理模式.读取一次
))。使用路径过滤器
**,用户可以进一步排除正在处理的文件。
补充:
该⽅法会检查采集⽬录下的⽂件,如果⽂件发⽣变化系统会重新采集。此时可能会导致⽂件的重复计算。⼀般来说不建议修改⽂件内容,直接上传新⽂件即可
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
var inputFormat:FileInputFormat[String]=new TextInputFormat(null)
val text:DataStream[String] = env.readFile(inputFormat,
"hdfs://CentOS:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
Socket Based(基于套接字的来源)
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.
socketTextStream
-从套接字读取。元素可以用分隔符分隔
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999,'\n',3)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
Collection-based 基于集合
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.fromCollection(List("this is a demo","hello word"))
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
UserDefinedSource 用户定义的来源
SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
@volatile //防⽌线程拷⻉变量
var isRunning:Boolean=true
val lines:Array[String]=Array("this is a demo","hello world","ni hao ma")
//在该⽅法中启动线程,通过sourceContext的collect⽅法发送数据
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while(isRunning){
Thread.sleep(100)
//输送数据给下游
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//释放资源
override def cancel(): Unit = {
isRunning=false
}
}
ParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction,
SourceFunction}
import scala.util.Random
class UserDefinedParallelSourceFunction extends ParallelSourceFunction[String]{
@volatile //防⽌线程拷⻉变量
var isRunning:Boolean=true
val lines:Array[String]=Array("this is a demo","hello world","ni hao ma")
//在该⽅法中启动线程,通过sourceContext的collect⽅法发送数据
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while(isRunning){
Thread.sleep(100)
//输送数据给下游
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//释放资源
override def cancel(): Unit = {
isRunning=false
}
}
下游来接收
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
//2.创建DataStream - 细化
val text = env.addSource[String](⽤户定义的SourceFunction)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
println(env.getExecutionPlan) //打印执⾏计划
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
Kafka集成
引⼊maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
SimpleStringSchema
–简单的字符串模式
该SimpleStringSchema⽅案只会反序列化kafka中的value
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val props = new Properties() //kafka 的连接属性
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")
// 创建Flink与kafka的连接通道
val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
KafkaDeserializationSchema
–kafka反序列化模式
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class UserDefinedKafkaDeserializationSchema extends KafkaDeserializationSchema[(String, String, Int, Long)]{
//是否结束流计算 因为是流计算是持续的不能结束
override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false
//反序列化
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
if(consumerRecord.key()!=null){ //如果消费记录的key不为空 则将消费记录的k,v,分区数,偏移量等返回
(new String(consumerRecord.key()),new
String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}else{//如果消费记录的key为空 则返回一个k=null ,v,分区数,偏移量照常返回
(null,new
String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}
}
// 获得生产类型
override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
//提醒 : 如何要 create 创建 需要导import org.apache.flink.api.scala._
createTypeInformation[(String, String, Int, Long)]
}
}
获取数据并打印输出
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val props = new Properties()
props.setProperty("bootstrap.servers", "SparkTwo:9092")
props.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]//通道传递的类型
("topic01",new UserDefinedKafkaDeserializationSchema(),props))
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(t=> t._2.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
}
补充下
拿kafka 消费者记录的不同信息的方法
//拿出不同的数据信息
private static void shum(ConsumerRecord<String, String> next){
String topic = next.topic();//信息
int partition = next.partition();//分区数
long offset = next.offset();//偏移量
String key = next.key();//key
String value = next.value();//value
long timestamp = next.timestamp();//时间戳
System.out.println("信息"+topic+"分区数"+partition+"偏移量"+offset+"key"+"value"+value+"时间戳"+timestamp);
}
JSONKeyValueNodeDeserializationSchema
–JSON键值节点反序列化模式
要求Kafka中的topic的key和value都必须是json格式,也可以在使⽤的时候,指定是否读取元数据(topic、分区、offset等)
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")
//{"id":1,"name":"zhangsan"}
val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new
JSONKeyValueDeserializationSchema(true),props))
//t:{"value":{"id":1,"name":"zhangsan"},"metadata":
{"offset":0,"topic":"topic01","partition":13}}
text.map(t=> (t.get("value").get("id").asInt(),t.get("value").get("name").asText()))
.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
kafka 与 Flink 的集成 文档参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
Data Sinks(数据输出)
Data Sink使⽤DataStreams并将其转发到⽂件,Socket,外部系统或打印它们。 Flink带有多种内置输出格式,这些格式封装在DataStreams的操作后⾯。
File-based(基于文件输出)
writeAsText() / TextOutputFormat(文本格式的文件输出格式)
–将元素按行写入为字符串。这些字符串是通过调用每个元素的toString()方法获得的
writeAsCsv(...) / CsvOutputFormat(Csv输出格式)
–将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
writeUsingOutputFormat/ FileOutputFormat(文件输出格式)
–方法和自定义文件输出的基类。支持自定义对象到字节的转换。
writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtainedby calling the toString() method of each element.
writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field
delimiters are configurable. The value for each field comes from the toString() method of the objects.
writeUsingOutputFormat/ FileOutputFormat - Method and base class for custom file outputs.Supports custom object-to-bytes conversion.
注意:DataStream上的write*()⽅法主要⽤于调试⽬的。
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new
Path("file:///Users/admin/Desktop/flink-results")))
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
注意:
如果改成HDFS,需要⽤户⾃⼰产⽣⼤量数据,才能看到测试效果,原因是因为HDFS⽂
件系统写⼊时的缓冲区⽐较⼤。以上写⼊⽂件系统的Sink不能够参与系统检查点,如果在⽣产环境下通常使⽤flink-connector-filesystem写⼊到外围系统。
生产环境下使用flink-connector-filesystem写⼊到外围系统。
首先要导依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.10.0</version>
</dependency>
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
var bucketingSink=StreamingFileSink.forRowFormat(new
Path("hdfs://CentOS:9000/bucket-results"),
new
SimpleStringEncoder[(String,Int)]("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd"))//动态产⽣写⼊的路径
.build()
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
老版写法
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
var bucketingSink=new BucketingSink[(String,Int)]("hdfs://CentOS:9000/bucketresults")
bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
bucketingSink.setBatchSize(1024)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
UserDefinedSinkFunction
--用户定义接收函数
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{
override def open(parameters: Configuration): Unit = {
println("打开链接...")
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit =
{
println("输出:"+value)
}
override def close(): Unit = {
println("释放连接")
}
}
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
var bucketingSink=new BucketingSink[(String,Int)]("hdfs://CentOS:9000/bucketresults")
bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
bucketingSink.setBatchSize(1024)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new UserDefinedSinkFunction)
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
RedisSink
参考文档
:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
首先还是导入依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object FlinkOne {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")
var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
.setHost("SparkTwo")
.setPort(6379)
.build()
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedRedisMapper()))
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
}
}
class UserDefinedRedisMapper extends RedisMapper[(String,Int)]{
//获取命令的描述
override def getCommandDescription: RedisCommandDescription = {
// redis命令 . 设置一个散列值 附加/额外的k
new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
}
//获取数据中的key
override def getKeyFromData(t: (String, Int)): String = {
t._1
}
//获取数据中的v
override def getValueFromData(t: (String, Int)): String = {
t._2.toString
}
}
Kafka集成
首先还是依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
方法一:
package com.baizhi.flinkKafka
import java.lang
import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
object KafkaAndFlink {
def main(args: Array[String]): Unit = {
//创建流计算环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接kafka 的属性
val properties = new Properties()
properties.setProperty("bootstrap.servers","SparkTwo:9092")//服务程序
properties.setProperty("group.id", "g1")//组
val text = env.addSource(new FlinkKafkaConsumer[(String, String, Int, Long)]("topics01", new userde()
, properties))
//输入到kafka
val value= new FlinkKafkaProducer[(String, Int)]("defult_topic"
, new UserDefinedKafkaSerializationSchema(), properties, Semantic.AT_LEAST_ONCE)
val counts = text.flatMap(t=> t._2.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//counts.print()
counts.addSink(value)
env.execute("Window Stream WordCount")
}
}
//输出到kafka 的那个topic 输出什么数据
class UserDefinedKafkaSerializationSchema extends KafkaSerializationSchema[(String,Int)]{
override def serialize(t: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("topic01",t._1.getBytes(),t._2.toString.getBytes())
}
}
//kafka反序列化模式 看前面的输入源kafka
class userde extends KafkaDeserializationSchema[(String, String, Int, Long)]{
override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
if (!(consumerRecord.key()==null)){
(consumerRecord.key().toString,consumerRecord.value().toString,consumerRecord.partition(),consumerRecord.offset())
}else{
(null,consumerRecord.value().toString,consumerRecord.partition(),consumerRecord.offset())
}
}
override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
createTypeInformation[(String, String, Int, Long)]
}
}
提醒:
上面的 defult_topic 没有任何意义
方法二:
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
import org.apache.kafka.clients.producer.ProducerConfig
object KafkaAndFlinkTwo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")
val props = new Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SparkTwo:9092")
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")//一次读取的长度
props.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")//逗留的时间ms
//Semantic.EXACTLY_ONCE:开启kafka幂等写特性
//Semantic.AT_LEAST_ONCE:开启Kafka Retries机制
val kafakaSink = new FlinkKafkaProducer[(String, Int)]("defult_topic",
new UserDefinedKeyedSerializationSchema, props, Semantic.AT_LEAST_ONCE)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(kafakaSink)
// counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
}
}
class UserDefinedKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
override def serializeKey(t: (String, Int)): Array[Byte] = {t._1.getBytes()}
override def serializeValue(t: (String, Int)): Array[Byte] = {t._2.toString.getBytes()}
//可以覆盖 默认是topic,如果返回null,则将数据写⼊到默认的topic中
override def getTargetTopic(t: (String, Int)): String = {
"topic01"
}
}