Streaming (DataStream API(数据流接口)
DataSource(数据源)
数据源是程序读取数据的来源,⽤户可以通env.addSource(SourceFunction),将SourceFunction添加到程序中。Flink内置许多已知实现的SourceFunction,但是⽤户可以⾃定义实现SourceFunction (⾮并⾏化的接⼝)接⼝或者实现 ParallelSourceFunction (并⾏化)接⼝,如果需要有状态管理还可以继承 RichParallelSourceFunction .
File-based(以文件为基础的来源)
readTextFile(path)- Reads(once) text files, i.e. files that respect the TextInputFormatspecification, line-by-line and returns them as Strings.
readTextFile(path)逐行读取(一次)文本文件,即遵循文本文件输入格式规格并将其作为字符串返回的文件。
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化 从HDFS 里读取文本文件
val text:DataStream[String] = env.readTextFile("hdfs://CentOS:9000/demo/words")
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
readFile(fileInputFormat, path)- Reads (once) files as dictated by the specified file inputformat.
readFile(fileInputFormat, path)-根据指定的文件输入格式读取(一次)文件。
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建文件输入格式
var inputFormat:FileInputFormat[String]=new TextInputFormat(null)
//2.创建DataStream - 细化
val text:DataStream[String] =
env.readFile(inputFormat,"hdfs://CentOS:9000/demo/words")
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)//以0下标为key
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)–This is the method called internally by the two previous ones. It reads files in the path based on thegivenfileInputFormat. Depending on the providedwatchType, this source may periodicallymonitor (every interval ms) the path for new data(FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the pathand exit (FileProcessingMode.PROCESS_ONCE ). Using thepathFilter, the user can furtherexclude files from being processed.
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)–这是前两个方法在内部调用的方法。它根据给定的**文本文件输入格式读取路径中的文件。根据所提供的观看型号,此源可以定期调用监视(每隔ms)新数据的路径( FileProcessingMode.PROCESS_CONTINUOUSLY(文件处理模式.过程连续。))或处理当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE(文件处理模式.读取一次))。使用路径过滤器**,用户可以进一步排除正在处理的文件。
补充:该⽅法会检查采集⽬录下的⽂件,如果⽂件发⽣变化系统会重新采集。此时可能会导致⽂件的重复计算。⼀般来说不建议修改⽂件内容,直接上传新⽂件即可
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
var inputFormat:FileInputFormat[String]=new TextInputFormat(null)
val text:DataStream[String] = env.readFile(inputFormat,
"hdfs://CentOS:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
Socket Based(基于套接字的来源)
socketTextStream- Reads from a socket. Elements can be separated by a delimiter.
socketTextStream-从套接字读取。元素可以用分隔符分隔
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999,'\n',3)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
Collection-based 基于集合
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.fromCollection(List("this is a demo","hello word"))
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
UserDefinedSource 用户定义的来源
SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
@volatile //防⽌线程拷⻉变量
var isRunning:Boolean=true
val lines:Array[String]=Array("this is a demo","hello world","ni hao ma")
//在该⽅法中启动线程,通过sourceContext的collect⽅法发送数据
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while(isRunning){
Thread.sleep(100)
//输送数据给下游
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//释放资源
override def cancel(): Unit = {
isRunning=false
}
}
ParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction,
SourceFunction}
import scala.util.Random
class UserDefinedParallelSourceFunction extends ParallelSourceFunction[String]{
@volatile //防⽌线程拷⻉变量
var isRunning:Boolean=true
val lines:Array[String]=Array("this is a demo","hello world","ni hao ma")
//在该⽅法中启动线程,通过sourceContext的collect⽅法发送数据
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while(isRunning){
Thread.sleep(100)
//输送数据给下游
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//释放资源
override def cancel(): Unit = {
isRunning=false
}
}
下游来接收
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
//2.创建DataStream - 细化
val text = env.addSource[String](⽤户定义的SourceFunction)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
println(env.getExecutionPlan) //打印执⾏计划
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
Kafka集成
引⼊maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
SimpleStringSchema–简单的字符串模式
该SimpleStringSchema⽅案只会反序列化kafka中的value
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val props = new Properties() //kafka 的连接属性
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")
// 创建Flink与kafka的连接通道
val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
KafkaDeserializationSchema–kafka反序列化模式
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class UserDefinedKafkaDeserializationSchema extends KafkaDeserializationSchema[(String, String, Int, Long)]{
//是否结束流计算 因为是流计算是持续的不能结束
override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false
//反序列化
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
if(consumerRecord.key()!=null){ //如果消费记录的key不为空 则将消费记录的k,v,分区数,偏移量等返回
(new String(consumerRecord.key()),new
String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}else{//如果消费记录的key为空 则返回一个k=null ,v,分区数,偏移量照常返回
(null,new
String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}
}
// 获得生产类型
override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
//提醒 : 如何要 create 创建 需要导import org.apache.flink.api.scala._
createTypeInformation[(String, String, Int, Long)]
}
}
获取数据并打印输出
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val props = new Properties()
props.setProperty("bootstrap.servers", "SparkTwo:9092")
props.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]//通道传递的类型
("topic01",new UserDefinedKafkaDeserializationSchema(),props))
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(t=> t._2.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
}
补充下拿kafka 消费者记录的不同信息的方法
//拿出不同的数据信息
private static void shum(ConsumerRecord<String, String> next){
String topic = next.topic();//信息
int partition = next.partition();//分区数
long offset = next.offset();//偏移量
String key = next.key();//key
String value = next.value();//value
long timestamp = next.timestamp();//时间戳
System.out.println("信息"+topic+"分区数"+partition+"偏移量"+offset+"key"+"value"+value+"时间戳"+timestamp);
}
JSONKeyValueNodeDeserializationSchema–JSON键值节点反序列化模式
要求Kafka中的topic的key和value都必须是json格式,也可以在使⽤的时候,指定是否读取元数据(topic、分区、offset等)
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")
//{"id":1,"name":"zhangsan"}
val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new
JSONKeyValueDeserializationSchema(true),props))
//t:{"value":{"id":1,"name":"zhangsan"},"metadata":
{"offset":0,"topic":"topic01","partition":13}}
text.map(t=> (t.get("value").get("id").asInt(),t.get("value").get("name").asText()))
.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
kafka 与 Flink 的集成 文档参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
Data Sinks(数据输出)
Data Sink使⽤DataStreams并将其转发到⽂件,Socket,外部系统或打印它们。 Flink带有多种内置输出格式,这些格式封装在DataStreams的操作后⾯。
File-based(基于文件输出)
writeAsText() / TextOutputFormat(文本格式的文件输出格式)–将元素按行写入为字符串。这些字符串是通过调用每个元素的toString()方法获得的
writeAsCsv(...) / CsvOutputFormat(Csv输出格式)–将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
writeUsingOutputFormat/ FileOutputFormat(文件输出格式)–方法和自定义文件输出的基类。支持自定义对象到字节的转换。
writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtainedby calling the toString() method of each element.
writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field
delimiters are configurable. The value for each field comes from the toString() method of the objects.
writeUsingOutputFormat/ FileOutputFormat - Method and base class for custom file outputs.Supports custom object-to-bytes conversion.
注意:DataStream上的write*()⽅法主要⽤于调试⽬的。
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new
Path("file:///Users/admin/Desktop/flink-results")))
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
注意:如果改成HDFS,需要⽤户⾃⼰产⽣⼤量数据,才能看到测试效果,原因是因为HDFS⽂
件系统写⼊时的缓冲区⽐较⼤。以上写⼊⽂件系统的Sink不能够参与系统检查点,如果在⽣产环境下通常使⽤flink-connector-filesystem写⼊到外围系统。
生产环境下使用flink-connector-filesystem写⼊到外围系统。
首先要导依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.10.0</version>
</dependency>
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
var bucketingSink=StreamingFileSink.forRowFormat(new
Path("hdfs://CentOS:9000/bucket-results"),
new
SimpleStringEncoder[(String,Int)]("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd"))//动态产⽣写⼊的路径
.build()
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
老版写法
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
var bucketingSink=new BucketingSink[(String,Int)]("hdfs://CentOS:9000/bucketresults")
bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
bucketingSink.setBatchSize(1024)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
UserDefinedSinkFunction--用户定义接收函数
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{
override def open(parameters: Configuration): Unit = {
println("打开链接...")
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit =
{
println("输出:"+value)
}
override def close(): Unit = {
println("释放连接")
}
}
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://CentOS:9000/demo/words")
var bucketingSink=new BucketingSink[(String,Int)]("hdfs://CentOS:9000/bucketresults")
bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
bucketingSink.setBatchSize(1024)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new UserDefinedSinkFunction)
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
RedisSink
参考文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
首先还是导入依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object FlinkOne {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")
var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
.setHost("SparkTwo")
.setPort(6379)
.build()
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedRedisMapper()))
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
}
}
class UserDefinedRedisMapper extends RedisMapper[(String,Int)]{
//获取命令的描述
override def getCommandDescription: RedisCommandDescription = {
// redis命令 . 设置一个散列值 附加/额外的k
new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
}
//获取数据中的key
override def getKeyFromData(t: (String, Int)): String = {
t._1
}
//获取数据中的v
override def getValueFromData(t: (String, Int)): String = {
t._2.toString
}
}
Kafka集成
首先还是依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
方法一:
package com.baizhi.flinkKafka
import java.lang
import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
object KafkaAndFlink {
def main(args: Array[String]): Unit = {
//创建流计算环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接kafka 的属性
val properties = new Properties()
properties.setProperty("bootstrap.servers","SparkTwo:9092")//服务程序
properties.setProperty("group.id", "g1")//组
val text = env.addSource(new FlinkKafkaConsumer[(String, String, Int, Long)]("topics01", new userde()
, properties))
//输入到kafka
val value= new FlinkKafkaProducer[(String, Int)]("defult_topic"
, new UserDefinedKafkaSerializationSchema(), properties, Semantic.AT_LEAST_ONCE)
val counts = text.flatMap(t=> t._2.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//counts.print()
counts.addSink(value)
env.execute("Window Stream WordCount")
}
}
//输出到kafka 的那个topic 输出什么数据
class UserDefinedKafkaSerializationSchema extends KafkaSerializationSchema[(String,Int)]{
override def serialize(t: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("topic01",t._1.getBytes(),t._2.toString.getBytes())
}
}
//kafka反序列化模式 看前面的输入源kafka
class userde extends KafkaDeserializationSchema[(String, String, Int, Long)]{
override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
if (!(consumerRecord.key()==null)){
(consumerRecord.key().toString,consumerRecord.value().toString,consumerRecord.partition(),consumerRecord.offset())
}else{
(null,consumerRecord.value().toString,consumerRecord.partition(),consumerRecord.offset())
}
}
override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
createTypeInformation[(String, String, Int, Long)]
}
}
提醒:上面的 defult_topic 没有任何意义
方法二:
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
import org.apache.kafka.clients.producer.ProducerConfig
object KafkaAndFlinkTwo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")
val props = new Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SparkTwo:9092")
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")//一次读取的长度
props.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")//逗留的时间ms
//Semantic.EXACTLY_ONCE:开启kafka幂等写特性
//Semantic.AT_LEAST_ONCE:开启Kafka Retries机制
val kafakaSink = new FlinkKafkaProducer[(String, Int)]("defult_topic",
new UserDefinedKeyedSerializationSchema, props, Semantic.AT_LEAST_ONCE)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(kafakaSink)
// counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
}
}
class UserDefinedKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
override def serializeKey(t: (String, Int)): Array[Byte] = {t._1.getBytes()}
override def serializeValue(t: (String, Int)): Array[Byte] = {t._2.toString.getBytes()}
//可以覆盖 默认是topic,如果返回null,则将数据写⼊到默认的topic中
override def getTargetTopic(t: (String, Int)): String = {
"topic01"
}
}

