我们之前学习的 转换算子 是无法访问事件的时间戳信息和水位线信息的。而这
在一些应用场景下,极为重要。例如 MapFunction 这样的 map 转换算子就无法访问
时间戳或者当前事件的事件时间。
基于此, DataStream API 提供了一系列的 Low-Level 转换算子。可以 访问时间
戳、 watermark 以及注册定时事件 。还可以输出 特定的一些事件 ,例如超时事件等。
Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑 ( 使用之前的
window 函数和转换算子无法实现 ) 。例如, Flink SQL 就是使用 Process Function 实
现的。
Flink 提供了 8 个 Process Function :
• ProcessFunction
• KeyedProcessFunction
• CoProcessFunction
• ProcessJoinFunction
• BroadcastProcessFunction
• KeyedBroadcastProcessFunction
• ProcessWindowFunction
• ProcessAllWindowFunction
1 KeyedProcessFunction
这里我们重点介绍 KeyedProcessFunction 。
KeyedProcessFunction 用来操作 KeyedStream 。 KeyedProcessFunction 会处理流
的每一个元素,输出为 0 个、 1 个或者多个元素。所有的 Process Function 都继承自
RichFunction 接口,所以都有 open() 、 close() 和 getRuntimeContext() 等方法。而
KeyedProcessFunction<K, I, O> 还额外提供了两个方法 :
• processElement(I value, Context ctx, Collector<O> out), 流中的每一个元素都
会调用这个方法,调用结果将会放在 Collector 数据类型中输出。 Context 可


以访问元素的时间戳,元素的 key ,以及 TimerService 时间服务。 Context 还
可以将结果输出到别的流 (side outputs) 。
• onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 是一个回调
函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的
触发的时间戳。 Collector 为输出结果的集合。 OnTimerContext 和
processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器
触发的时间信息 ( 事件时间或者处理时间 ) 。
2 TimerService 和 定时器( Timers )
Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法 :
• long currentProcessingTime() 返回当前处理时间
• long currentWatermark() 返回当前 watermark 的时间戳
• void registerProcessingTimeTimer(long timestamp) 会注册当前 key 的
processing time 的定时器。当 processing time 到达定时时间时,触发 timer 。
• void registerEventTimeTimer(long timestamp) 会注册当前 key 的 event time 定
时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
• void deleteProcessingTimeTimer(long timestamp) 删除之前注册处理时间定时
器。如果没有这个时间戳的定时器,则不执行。
• void deleteEventTimeTimer(long timestamp) 删除之前注册的事件时间定时
器,如果没有此时间戳的定时器,则不执行。
当定时器 timer 触发时,会执行回调函数 onTimer() 。注意定时器 timer 只能在
keyed streams 上面使用
下面举个例子说明 KeyedProcessFunction 如何操作 KeyedStream 。
需求:监控温度传感器的温度值,如果温度值在 10 秒钟之内 (processing time)
连续上升,则报警。

看一下 TempIncreaseWarning 如何实现 , 程序中使用了 ValueState 状态变量来保
存上次的温度值和定时器时间戳。

3侧输出流(SideOutput)
大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。
除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。 process
function 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。
一个 side output 可以定义为 OutputTag[X] 对象, X 是输出流的数据类型。 process
function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs 。
下面是一个示例程序,用来监控传感器温度值,将温度值低于 30 度的数据输出
到 side output 。

4 CoProcessFunction
对于两条输入流, DataStream API 提供了 CoProcessFunction 这样的 low-level
操作。 CoProcessFunction 提供了操作每一个输入流的方法 : processElement1() 和
processElement2() 。
类似于 ProcessFunction ,这两种方法都通过 Context 对象来调用。这个 Context
对象可以访问事件数据,定时器时间戳, TimerService ,以及 side outputs 。
CoProcessFunction 也提供了 onTimer() 回调函数。
版权声明:本文为song_quan_原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。