1、时间语义

在流式数据处理的过程中,有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。
2、水位线(Watermark)
用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。
在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。当产生于2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线,随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。
水位线就像它的名字所表达的,是数据流中的一部分,随着数据一起流动,在不同任务之
间传输。

注意:只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

设置延长时间
在乱序的情况下,我们无法正确处理“迟到”的数据。为了让窗口能正确收集到迟到的数据,我们可以等上几秒,也就是用当前已有数据的最大时间戳减去几秒,就是要插入的水位线的时间戳。

如何设置水位线
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5)) //设置5秒延迟
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = element._2
})
)水位线的传递
在实际应用中往往上下游都有多个并行子任务,为了统一推进事件时间的进展,我们要求上游任务处理完水位线、时钟改变之后,要把当前的水位线广播给所有的下游任务。这样,后续任务就不需要依赖原始数据中的时间戳,也可以知道当前事件时间了。
上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。

水位线的总结
- 1、水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 2、水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 3、水位线是基于数据的时间戳生成的
- 4、水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
- 5、水位线可以通过设置延迟,来保证正确处理乱序数据
- 5、一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据
- 6、水位线在事件时间的世界里面,承担了时钟的角色,是唯一的时间尺度。
- 7、水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。
- 8、在数据流开始之前,Flink 会插入一个大小是负无穷大的水位线,而在数据流结束时,Flink 会插入一个正无穷大)的水位线,保 证所有的窗口闭合以及所有的定时器都被触发。
- 9、对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种 情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水 位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了。
3、窗口(Window)

然而如果我们采用事件时间语义,就会有些费解了。由于有乱序数据,我们需要设置一个
延迟时间来等所有数据到齐。我们可以设置延迟时间为 2 秒。

但是这样一来,0~10 秒的窗口不光包含了迟到的 9 秒数据,连 11 秒和 12 秒的数据也包含进去了。我们为了正确处理迟到数据,结果把早到的数据划分到了错误的窗口——最终结果都是错误的。
在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”。在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计
算处理。
4、窗口的分类
4.1、按照驱动类型分类
我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”(Time Window)。这在实际应用中最常见。除了由时间驱动之外,窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计数窗口”(Count Window)。

4.2、 按照窗口分配数据的规则分类
(1)滚动窗口(Tumbling Windows)
滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。

(2)滑动窗口(Sliding Windows)
滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。

(3)会话窗口(Session Windows)
数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。会话窗口只能基于时间来定义。在 Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge)操作。在 Window 算子中,对会话窗口会有单独的处理逻辑。

(4)全局窗口(Global Windows)
“全局窗口”,这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。
5、窗口 API的使用
5.1、 按键分区(Keyed)和非按键分区(Non-Keyed)
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream
来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,
是否有 keyBy 操作。
(1)按键分区窗口(Keyed Windows)
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...)
.window(...)(2)非按键分区(Non-Keyed Windows)
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
stream.windowAll(...)5.2、 代码中窗口 API 的调用
有了前置的基础,接下来我们就可以真正在代码中实现一个窗口操作了。简单来说,窗口
操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)6、 窗口分配器(Window Assigners)
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。窗口分配器其实就是在指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。
6.1、时间窗口
(1)滚动处理时间窗口
窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)(2)滑动处理时间窗口
窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)(3)处理时间会话窗口
窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)(4)滚动事件时间窗口
窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)(5)滑动事件时间窗口
窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)(6)事件时间会话窗口
窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)6.2、 计数窗口
(1)滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。
stream.keyBy(...)
.countWindow(10)(2)滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...)
.countWindow(10,3)6.3、 全局窗口
stream.keyBy(...)
.window(GlobalWindows.create());7、窗口函数(Window Functions)

7. 1、ReduceFunction
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// reduce 返回的类型,应该和输入的类型一样
// 这里统计的是每个窗口,每个userId 出现的次数,timestamp 是没用的,给了0值
.reduce { (v1, v2) => User(v1.userId, v1.count + v2.count, 0) }
.print()
7.2、AggregateFunction
AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)。输入类型,就是输入流的类型。接口中有一个方法,可以把输入的元素和累加器累加。并且可以初始化一个累加器,然后把两个累加器合并成一个累加器,获得输出结果。
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 使用 aggregate 来计算
.aggregate(new MyAggregateFunction)
.print()
class MyAggregateFunction extends AggregateFunction[User, User, (String, Int)] {
override def createAccumulator(): User = User("", 0, 0)
override def add(value: User, accumulator: User): User = User(value.userId, value.count + accumulator.count, 0)
override def getResult(accumulator: User): (String, Int) = (accumulator.userId, accumulator.count)
override def merge(a: User, b: User): User = User(a.userId, a.count + b.count, 0)
}
7.3、ProcessWindowFunction
ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性。
但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 使用 ProcessFunction 来处理整个窗口数据
.process(new MyProcessFunction())
.print()
class MyProcessFunction extends ProcessWindowFunction[User, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
var count = 0
// 遍历,获得窗口所有数据
for (user <- elements) {
println(user)
count += 1
}
out.collect(s"Window ${context.window} , count : ${count}")
}
}
7.4、ProcessWindowFunction 结合 其他 函数一起计算
使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction,然后可以使用 context 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。
如下:我们使用 ReduceFunction 来计算 每个窗口的 count 最小值,然后输出最小值和这个窗口的开始时间:
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 使用 reduce 和 processWindowFunction
.reduce(new MyReduceFunction, new MyProcessFunction)
.print()
class MyReduceFunction extends ReduceFunction[User] {
override def reduce(value1: User, value2: User): User = {
if (value1.count > value2.count) value2
else value1
}
}
class MyProcessFunction extends ProcessWindowFunction[User, (Long, User), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[User], out: Collector[(Long, User)]): Unit = {
val min = elements.iterator.next
out.collect((context.window.getStart, min))
}
}
8、其他 API
8.1、 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗
口函数,所以可以认为是计算得到结果并输出的过程。基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())8.2、 移除器(Evictor)
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())Evictor 接口定义了两个方法:
evictBefore():定义执行窗口函数之前的移除数据操作
evictAfter():定义执行窗口函数之后的以处数据操作
默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。
8.3、允许延迟(Allowed Lateness)
在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线
(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))8.4、将迟到的数据放入侧输出流
我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。基于 WindowedStream 用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);8.5、窗口的生命周期
1. 窗口的创建
2. 窗口计算的触发
3. 窗口的销毁
8.6、解决迟到数据
设置水位线延迟时间
允许窗口处理迟到数据
将迟到数据放入窗口侧输出流