一、Flink
是一个分布式框架和处理引擎,用于处理无界和有界数据流。
Flink主要是解决以下问题的:
1、乱序数据、迟到数据
2、exactly-once(数据精准一次性处理)
3、高吞吐、低延迟、准确性
4、容错性
概念:
有界数据流
无界数据流
批处理
流处理
有状态计算
灵活的窗口
时间语义
二、Flink入门示例
引入依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
批处理
public class Flink01_WC_Batch {
public static void main(String[] args) throws Exception {
// 0.创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 1.读取文件
DataSource<String> lineDS = env.readTextFile("input/word.txt");
// 2.转换数据格式
FlatMapOperator<String, Tuple2<String, Integer>> wordsAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
});
// 3.按照word进行分组
UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGS = wordsAndOne.groupBy(0);
// 4.分组内聚合统计
AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGS.sum(1);
// 5.打印结果
sum.print();
}
}
流处理
public class Flink02_WC_BoundedStream {
public static void main(String[] args) throws Exception {
// 0.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.读取文件
DataStreamSource<String> fileDS = env.readTextFile("input/word.txt");
// 2.转换数据格式
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneTuple = fileDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
});
// 3.分组
KeyedStream<Tuple2<String, Integer>, Tuple> wordAndOneKS = wordAndOneTuple.keyBy(0);
// 4.求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);
// 5.打印
sum.print();
// 6.执行
env.execute();
}
}
三、Flink部署和运行模式
1、Local模式
2、standalone模式
3、yarn模式
1)per-job-cluster模式
这种方式每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
2)session-cluster模式
在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都提交到这里。这个flink集群会常驻在yarn集群中,除非手动关闭。
4、k8s&mesos模式
5、Windows模式
四、Flink运行架构
1、运行架构(主要说yarn)
采用了master-slave架构
1、客户端向Cluster Manager提交程序App
2、CM启动AM,AM包含Dispatcher、RM、JM
3、客户端向Dispatcher提交job
4、Dispatcher 将任务转发给JM
5、JM向RM申请slot
6、RM向CM申请资源
7、CM生产TaskManager
8、RM向TM请求slot
9、TM提供slot给JM
10、JM提交任务给TM。
Application Master 部分包含了三个组件:
- Dispatcher
负责接收用户提供的作业,并且负责为这个新提交的作业启动一个新的 JobManager 组件 - ResourceManager
负责资源的管理,在整个 Flink 集群中只有一个 ResourceManager - JobManager
负责管理作业的执行,在一个 Flink 集群中可能有多个作业同时执行,每个作业 都有自己的 JobManager 组件
还有其他组件: - TaskManager
主要负责执行具体的task任务,从JobManager处接收需要部署的 Task,部署 启 动后,与自己的上游建立连接,接收数据并处理。 - Cluster Manager
集群管理器,比如Standalone、YARN、K8s等,就是前面我们学习的不同环境 - Client
提交Job的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交Job后,Client可以结束进程(Streaming的任务),也可以不结束并等待结果返回。
2、核心概念
TaskManager与slot
1、JVM进程
2、slot可以共享
3、不会涉及cpu隔离
Parallelism(并行度)
1、某些数据源无法改变并行度,例如socket
2、一个特定算子的子任务的个数被成为并行度
3、一个流程序的并行度,可以认为就是算子的最大的并行度
流之间的传输形式
1、one-to-one
2、Redistributing
task与SubTask
1、多个subtask可以组成一个task,需要算子之间的传输状态时one-to-one
2、减少任务之间数据传输的性能消耗
Operator Chains(任务链)
1、任务链必须满足两个条件:one-to-one的数据传输并且并行度相同
ExecutionGraph(执行图)
1、StreamGraph-》JobGraph-》ExecutionGraph
提交流程
第五章Flink的核心编程

1、批处理和流处理环境
// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流式数据处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、Source
1)从集合中读取数据
public class Flink02_Source_Collection {
public static void main(String[] args) throws Exception {
// 0.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.从集合中读取数据
DataStreamSource<WaterSensor> collectionDS = env.fromCollection(
Arrays.asList(
new WaterSensor("ws_001", 1577844001L, 45),
new WaterSensor("ws_002", 1577844015L, 43),
new WaterSensor("ws_003", 1577844020L, 42)
)
);
// 2.打印
collectionDS.print();
// 3.执行
env.execute();
}
}
2)从文件中读取数据
public class Flink03_Source_File {
public static void main(String[] args) throws Exception {
// 0.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.从集合中读取数据
DataStreamSource<String> fileDS = env.readTextFile("input/sensor-data.log");
// 2.打印
fileDS.print();
// 3.执行
env.execute();
}
}
3)从kafka中读取数据
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
// 0.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.从kafka中读取
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> kafkaDS = env.addSource(
new FlinkKafkaConsumer011<String>(
"sensor",
new SimpleStringSchema(),
properties)
);
kafkaDS.print("kafka source");
env.execute();
4)自定义数据源
public static class MySource implements SourceFunction<WaterSensor> {
boolean flag = true;
@Override
public void run(SourceContext<WaterSensor> ctx) throws Exception {
Random random = new Random();
while (flag) {
ctx.collect(
new WaterSensor(
"sensor_" + (random.nextInt(3) + 1),
System.currentTimeMillis(),
random.nextInt(10) + 40
)
);
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
flag = false;
}
}
// 0.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.从kafka中读取
DataStreamSource<WaterSensor> inputDS = env.addSource(new MySource());
inputDS.print();
env.execute();
3、Transform
1)map
2)flatmap
3)filter
4)keyby
5)shuffle
6)spilt
7)select
8)connect
9)union
4、Operator
1)滚动聚合算子,sum、min、max
2)reduce
3)process
5、Sink
1)Kafka Sink
2)redis Sink
3)ElasticSearch Sink
4)自定义Sink
第六章 Flink高阶编程
1、Window窗口
1)流式计算是被用于处理无限数据集的数据处理引擎。
2)无限数据集本质上是一种不断增长的数据集。
3)window就是一种切割无限数据集为有限块进行处理的手段。(不可能等待有限数据集全部来之后再处理,因为本质上是无限的,所以数据没有结束边界,那么也需要计算,可以取其中一部分的数据进行计算,也可以不断累加,来一条处理一条进行计算,不用等到数据到来完全之后再计算)
2、window窗口划分
按照时间生成window,还可以按照数据条数生成window.
根据窗口实现原理,又可以划分为滚动窗口、滑动窗口、会话窗口。
3、窗口的API
两类,8个API
增量聚合窗口:reduce(累加,无初始值)、aggregate(累加,有初始值,离线和实时结合,每天用离线统计数据作为初始值累加)、process(全窗口处理函数,把窗口的数据收集完之后再处理,基本上不用,涉及数据太多)
其他:trigger(触发窗口提前计算和关闭)、evitor(移除器,定义移除某些数据的逻辑,就是过滤某些数据(和过滤还不一样,这个是直接把数据丢出流)、allowedLateness(允许处理迟到的数据)、sideOutputLateData(将迟到的数据放到侧输出流)、getsideOutput(获取侧输出流)。
4、时间语义与watermark
1)时间语义分为三类:事件时间(业务产生数据的时间)、进入时间(进入flink的服务器时间)、处理时间(计算数据的服务器时间)
2)watermark:
0:用来触发关窗和计算
1:水位标记
2:有序数据、乱序数据
2:处理乱序数据
3:周期性生成watermark
4:间接性增量生成watermark(就是来一条数据会生成一个watermark,影响性能)
5:如何知道数据是乱序的和迟到的?基于事件事件,才知道数据是乱序和迟到的
6:已经知道了数据乱序,做一个窗口的操作哦,用EventTime来触发窗口的计算和关闭合适不?不合适,因为数据已经乱序,如果第一条来的是窗口的最后一条则直接计算,会少计算很多数据,不能用来触发窗口的计算和关闭。因为事件事件不合适,所以引入了watermark:
用来衡量时间的进展。
用来触发窗口的计算和关闭。
解决乱序问题。
是单调递增的(不减的)。
是一个时间戳。
表示他之前的数据,都已经到齐了。
7:怎么知道当前来的数据属于哪一个窗口?也就是窗口是怎么划分的?
窗口划分:TumblingEventTimeWindows->assignWindows方法
窗口开始时间:timestamp - (timestamp - offset + windowSize) % windowSize;(窗口的开始时间肯定比当前的处理时间(这里的处理时间指的是提取的EventTime,或者基于系统的处理时间)要大)
窗口结束时间:new TimeWindows(start,start+size)=》 start+size
窗口时左闭右开的:属于窗口的最大时间戳为: maxTimestamp=end – 1
窗口触发条件:windows.maxTimestamp()<=ctx.fetCurrentWatermark()
=》由watermark触发窗口的计算,当watermark大于等于 窗口数据的最大时间
如何计算WaterMark的值?
Watermark = 进入 Flink 的最大的事件时间(mxtEventTime) - 指定的延迟时间(t)
有Watermark 的 Window 是怎么触发窗口函数?
如果有窗口的停止时间等于或者小于maxEventTime – t:指定的延迟时间 (计算出来的结果就是当时的 warkmark),那么 这个窗口被触发执行
8:乱序场景下,如何处理,就是watermark
有界流,文件,为了保证所有的数据都被计算,Flink会在最后,给一个Long的最大值的watermark,保证所有窗口都被触发计算。
9:watermark设置了等待时间,如果超过了等待时间,还有数据没到齐,怎么办?
窗口设置允许迟到=》allowedLateness.
10:如果窗口设置了延迟时间,但是到了真正关窗的时间,但是后面还有迟到的数据,怎么办?
放到侧输出流,进行处理,或者存起来。
11:在多并行度的时候,怎么确定watermark的取值?
木桶原理,取最小的watermark值
5、ProcessFunction API(过程API,就是低级API,原始级API,核心API,可获取很多处理的事件,比如时间戳、watermark、注册定时事件,输出特定事件,比如超时事件)
FlinkSQL就是基于process API实现的。
提供了8个process api:
processFunction
keyedProcessFunction
coprocessFunction
processjoinFunction
broadcastProcessFunction
keyedbroadcastProcessFunction
processwindowFunction
processallwindowFunction
6、KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:
processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(side outputs)。
onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
7、TimerService 和 定时器(Timers)
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
定时器源码分析
=》注册 eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(timr,(k
keyContext.getCurrentKey(),namespace));
=》触发 time(业务时间)<=timer.getTimerstamp()定时的时间 <=watermark
8、侧输出流(SideOutput)
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发送一个事件到一个或者多个side outputs。
9、CoProcessFunction
对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。CoProcessFunction提供了操作每一个输入流的方法: processElement1()和processElement2()。
类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。
10、状态编程和容错机制
流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。例如,计算过去一小时的平均水位,就是有状态的计算。所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
1 无状态计算
无状态(Stateless)计算特性:
1)不需要考虑历史数据
2)相同的输入得到相同的输出
3)如map、filter、flatMap等方法

首先举一个无状态计算的例子:消费延迟计算。 假设现在有一个消息队列,消息队列中有一个生产者持续往消费队列写入消息,多个消费者 分别从消息队列中读取消息。 从图上可以看出,生产者已经写入 16 条消息,Offset 停留在 15 ;有 3 个消费者,有的消 费快,而有的消费慢。消费快的已经消费了 13 条数据,消费者慢的才消费了 8 条数据。
如何实时统计每个消费者落后多少条数据,如图给出了输入输出的示例。可以了解到输入的 时间点有一个时间戳,生产者将消息写到了某个时间点的位置,每个消费者同一时间点分别读到 了什么位置。刚才也提到了生产者写入了 15 条,消费者分别读取了 10、7、12 条。那么问题来 了,怎么将生产者、消费者的进度转换为右侧示意图信息呢?

consumer 0 落后了 5 条,consumer 1 落后了 8 条,consumer 2 落后了 3 条,根据 Flink 的原理,此处需进行 Map 操作。Map 首先把消息读取进来,然后分别相减,即可知道每 个 consumer 分别落后了几条。Map 一直往下发,则会得出最终结果。 大家会发现,在这种模式的计算中,无论这条输入进来多少次,输出的结果都是一样的,因 为单条输入中已经包含了所需的所有信息。消费落后等于生产者减去消费者。生产者的消费在单条数据中可以得到,消费者的数据也可以在单条数据中得到,所以相同输入可以得到相同输出, 这就是一个无状态的计算。
2 有状态计算
有状态(State)计算特性:
)需要考虑历史数据
2)相同的输入得到不同的输出/不一定得到相同的输出
3)如sum/reduce等方法
以访问日志统计量的例子进行说明,比如当前拿到一个 Nginx 访问日志,一条日志表示一个 请求,记录该请求从哪里来,访问的哪个地址,需要实时统计每个地址总共被访问了多少次,也 即每个 API 被调用了多少次。可以看到下面简化的输入和输出,输入第一条是在某个时间点请求 GET 了 /api/a;第二条日志记录了某个时间点 Post /api/b ;第三条是在某个时间点 GET了一个 /api/a,总共有 3 个 Nginx 日志。
从这 3 条 Nginx 日志可以看出,第一条进来输出 /api/a 被访问了一次,第二条进来输出 /api/b 被访问了一次,紧接着又进来一条访问 api/a,所以 api/a 被访问了 2 次。不同的是, 两条 /api/a 的 Nginx 日志进来的数据是一样的,但输出的时候结果可能不同,第一次输出 count=1 ,第二次输出 count=2,说明相同输入可能得到不同输出。输出的结果取决于当前请求的 API 地址之前累计被访问过多少次。第一条过来累计是 0 次,count = 1,第二条过来 API的访问已经有一次了,所以 /api/a 访问累计次数 count=2。单条数据其实仅包含当前这次访问的信息,而不包含所有的信息。要得到这个结果,还需要依赖 API 累计访问的量,即状态。
这个计算模式是将数据输入算子中,用来进行各种复杂的计算并输出数据。这个过程中算子 会去访问之前存储在里面的状态。另外一方面,它还会把现在的数据对状态的影响实时更新,如果输入 200 条数据,最后输出就是 200 条结果。
有状态的算子
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
- 算子状态(operator state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
图 具有算子状态的任务
Flink为算子状态提供三种基本数据结构:
列表状态(List state)
将状态表示为一组数据的列表。
联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者 从保存点(savepoint)启动应用程序时如何恢复。
广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适 合应用广播状态。
2) 键控状态(keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
Flink的Keyed State支持以下数据类型:
ValueState[T]保存单个的值,值的类型为T。
get操作: ValueState.value()
set操作: ValueState.update(value: T)
ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:
ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.get()返回Iterable[T]
ListState.update(values: java.util.List[T])
MapState[K, V]保存Key-Value对。
MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K)
ReducingState[T]
AggregatingState[I, O]
State.clear()是清空操作。
通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。
在open()方法中创建state变量。
- 状态后端(state backend)
每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:
本地的状态管理
将检查点(checkpoint)状态写入远程存储
状态后端分类:
MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。
何时使用MemoryStateBackend?
建议使用MemoryStateBackend进行本地开发或调试,因为它的状态有限
MemoryStateBackend最适合具有小状态大小的用例和有状态流处理应用程序,例如仅包含一次记录功能(Map,FlatMap或Filter)的作业或使用Kafkaconsumer.。
FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
何时使用FsStateBackend?
FsStateBackend最适合处理大状态,长窗口或大键/值状态的Flink有状态流处理作业
FsStateBackend最适合每个高可用性设置
RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。
注意:RocksDB的支持并不直接包含在flink中,需要引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
</dependency>
何时使用RocksDBStateBackend?
RocksDBStateBackend最适合处理大状态,长窗口或大键/值状态的Flink有状态流处理作业
RocksDBStateBackend最适合每个高可用性设置
RocksDBStateBackend是目前唯一可用于支持有状态流处理应用程序的增量检查点的状态后端。
11、状态一致性
一致性级别:at-most-one(最多一次,会丢数据)、at-least-one(最少一次,会有数据重复)、exactly-once(有些仅有一次,不多不少)
端到端状态一致性:
source端:需要外部源可重设读取位置
flink内部:依赖checkpoint
sink端:需要保证从故障恢复,数据不会重复写入外部系统,有两种实现方式,幂等(幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了)和事务性写入(需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC))
DataStream API 提供了GenericWriteAheadSink(预写日志)模板类和TwoPhaseCommitSinkFunction (两阶段提交)接口,可以方便地实现这两种方式的事务性写入。
不同 Source 和 Sink 的一致性保证可以用下表说明:
12、检查点
Flink检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。
Flink检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于Chandy-Lamport分布式快照算法。检查点是Flink最有价值的创新之一,因为它使Flink可以保证exactly-once,并且不需要牺牲性能。
Flink的检查点算法
Flink检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于Chandy-Lamport分布式快照算法。
Flink检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。记住这一基本点之后,我们用一个例子来看检查点是如何运行的。Flink为用户提供了用来定义状态的工具。例如,以下这个Scala程序按照输入记录的第一个字段(一个字符串)进行分组并维护第二个字段的计数状态。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(record => record._1)
.mapWithState( (in: (String, Int), state: Option[Int]) =>
state match {
case Some(c) => ( (in._1, c + in._2), Some(c + in._2) )
case None => ( (in._1, in._2), Some(in._2) )
})
该程序有两个算子: keyBy算子用来将记录按照第一个元素(一个字符串)进行分组,根据该key将数据进行重新分区,然后将记录再发送给下一个算子: 有状态的map算子(mapWithState)。map算子在接收到每个元素后,将输入记录的第二个字段的数据加到现有总数中,再将更新过的元素发射出去。下图表示程序的初始状态: 输入流中的6条记录被检查点分割线(checkpoint barrier)隔开,所有的map算子状态均为0(计数还未开始)。所有key为a的记录将被顶层的map算子处理,所有key为b的记录将被中间层的map算子处理,所有key为c的记录则将被底层的map算子处理。
图 按key累加计数程序初始状态
上图是程序的初始状态。注意,a、b、c三组的初始计数状态都是0,即三个圆柱上的值。ckpt表示检查点分割线(checkpoint barriers)。每条记录在处理顺序上严格地遵守在检查点之前或之后的规定,例如[“b”,2]在检查点之前被处理,[“a”,2]则在检查点之后被处理。
当该程序处理输入流中的6条记录时,涉及的操作遍布3个并行实例(节点、CPU内核等)。那么,检查点该如何保证exactly-once呢?
检查点分割线和普通数据记录类似。它们由算子处理,但并不参与计算,而是会触发与检查点相关的行为。当读取输入流的数据源(在本例中与keyBy算子内联)遇到检查点屏障时,它将其在输入流中的位置保存到持久化存储中。如果输入流来自消息传输系统(Kafka),这个位置就是偏移量。Flink的存储机制是插件化的,持久化存储可以是分布式文件系统,如HDFS。下图展示了这个过程。

图 遇到checkpoint barrier时,保存其在输入流中的位置
当Flink数据源(在本例中与keyBy算子内联)遇到检查点分界线(barrier)时,它会将其在输入流中的位置保存到持久化存储中。这让 Flink可以根据该位置重启。
检查点像普通数据记录一样在算子之间流动。当map算子处理完前3条数据并收到检查点分界线时,它们会将状态以异步的方式写入持久化存储,如下图所示。

图 保存map算子状态,也就是当前各个key的计数值
位于检查点之前的所有记录([“b”,2]、[“b”,3]和[“c”,1])被map算子处理之后的情况。此时,持久化存储已经备份了检查点分界线在输入流中的位置(备份操作发生在barrier被输入算子处理的时候)。map算子接着开始处理检查点分界线,并触发将状态异步备份到稳定存储中这个动作。
当map算子的状态备份和检查点分界线的位置备份被确认之后,该检查点操作就可以被标记为完成,如下图所示。我们在无须停止或者阻断计算的条件下,在一个逻辑时间点(对应检查点屏障在输入流中的位置)为计算状态拍了快照。通过确保备份的状态和位置指向同一个逻辑时间点,后文将解释如何基于备份恢复计算,从而保证exactly-once。值得注意的是,当没有出现故障时,Flink检查点的开销极小,检查点操作的速度由持久化存储的可用带宽决定。回顾数珠子的例子: 除了因为数错而需要用到皮筋之外,皮筋会被很快地拨过。

图 检查点操作完成,继续处理数据
检查点操作完成,状态和位置均已备份到稳定存储中。输入流中的所有数据记录都已处理完成。值得注意的是,备份的状态值与实际的状态值是不同的。备份反映的是检查点的状态。
如果检查点操作失败,Flink可以丢弃该检查点并继续正常执行,因为之后的某一个检查点可能会成功。虽然恢复时间可能更长,但是对于状态的保证依旧很有力。只有在一系列连续的检查点操作失败之后,Flink才会抛出错误,因为这通常预示着发生了严重且持久的错误。
现在来看看下图所示的情况: 检查点操作已经完成,但故障紧随其后。

图 故障紧跟检查点,导致最底部的实例丢失
在这种情况下,Flink会重新拓扑(可能会获取新的执行资源),将输入流倒回到上一个检查点,然后恢复状态值并从该处开始继续计算。在本例中,[“a”,2]、[“a”,2]和[“c”,2]这几条记录将被重播。
下图展示了这一重新处理过程。从上一个检查点开始重新计算,可以保证在剩下的记录被处理之后,得到的map算子的状态值与没有发生故障时的状态值一致。

图 故障时的状态恢复
Flink将输入流倒回到上一个检查点屏障的位置,同时恢复map算子的状态值。然后,Flink从此处开始重新处理。这样做保证了在记录被处理之后,map算子的状态值与没有发生故障时的一致。
Flink检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于Chandy-Lamport分布式快照算法。
检查点是Flink最有价值的创新之一,因为它使Flink可以保证exactly-once,并且不需要牺牲性能。
第七章 CEP
1、一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。
特征:
目标:从有序的简单事件流中发现一些高阶特征
输入:一个或多个由简单事件构成的事件流
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
输出:满足规则的复杂事件
2 CEP特点
CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。
CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。
看起来很简单,但是它有很多不同的功能:
输入的流数据,尽快产生结果
在2个event流上,基于时间进行聚合类的计算
提供实时/准实时的警告和通知
在多样的数据源中产生关联并分析模式
高吞吐、低延迟的处理
市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。
3 CEP API
1 基本开发步骤
1) 如果想要使用Flink提供的CEP库,首先需要引入相关的依赖关系
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
2) 定义数据流
val env =
StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataDS = env.readTextFile("input/sensor-data.log")
val sensorDS = dataDS.map(
data => {
val datas = data.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
}
)
3) 定义规则
val pattern =
Pattern.begin[WaterSensor]("begin").where(_.id == "a")
4) 应用规则
val sensorPS =
CEP.pattern(sensorDS, pattern)
5) 获取结果
val ds = sensorPS.select(
map => {
map.toString
}
)
ds.print("cep>>>>")
2 匹配规则
每个匹配规则都需要指定触发条件,作为是否接受事件进入的判断依据
按不同的调用方式,可以分成以下几类
条件匹配
1) 简单条件
Pattern
.begin[(String, String)]("start")
.where(_._1 == "a")// 并且条件
2) 组合条件
Pattern
.begin[(String, String)]("start")
.where(_._1 == "a")
.or(_._1 == "b") // 或条件
3) 终止条件
Pattern
.begin[(String, String)]("start")
.where(_._1 == "b")
.oneOrMore.until(_._2 == "4")
模式序列
1) 严格近邻
严格的满足联合条件, 当且仅当数据为连续的a,b时,模式才会被命中。如果数据为a,c,b,由于a的后面跟了c,所以a会被直接丢弃,模式不会命中。
Pattern
.begin[(String, String)]("start")
.where(_._1 == "a")
.next("next")
.where(_._1 == "b")
2) 宽松近邻
松散的满足联合条件, 当且仅当数据为a,b或者为a,c,b,模式均被命中,中间的c会被忽略掉。
Pattern
.begin[(String, String)]("start")
.where(_._1 == "a")
.followedBy("followedBy")
.where(_._1 == "b")
3) 非确定性宽松近邻
非确定的松散满足条件, 当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b},对于followedByAny而言会有两次命中{a,b},{a,b}
Pattern
.begin[(String, String)]("start")
.where(_._1 == "a")
.followedByAny("followedByAny")
.where(_._1 == "b")
量词(Quantifier)
1) 固定次数(N)
Pattern
.begin[(String, String)]("start")
.where(_._1 == "sensor1")
.times(3)
2) 多次数(N1,N2,N3)
Pattern
.begin[(String, String)]("start")
.where(_._1 == "sensor1")
.times(1,3)
超时
Pattern
.begin[(String, String)]("start")
.where(_._1 == "sensor1")
.within(Time.minutes(5))