flink-19 flink之Time概念

flink中Time的概念

对流失数据处理,最大的特点是数据上具有时间的属性特征
flink根据时间产生的位置不同,可以将时间区分为三种时间概念

  • Event Time(事件生成时间)
    • 事件产生的时间,它通常由事件中的时间戳描述
  • Ingestion time(事件接入时间)
    • 事件进入Flink程序的时间
  • Processing Time(事件处理时间)
    • 事件被处理时当前系统的时间

在这里插入图片描述

  • Flink在流处理程序中支持不同的时间概念。

EventTime

  • 1、事件生成时的时间,在进入Flink之前就已经存在,可以从event的字段中抽取
  • 2、必须指定watermarks(水位线)的生成方式
  • 3、优势:确定性,乱序、延时、或者数据重放等情况,都能给出正确的结果
  • 4、弱点:处理无序事件时性能和延迟受到影响

IngestTime

  • 1、事件进入flink的时间,即在source里获取的当前系统的时间,后续操作统一使用该时间。

  • 2、不需要指定watermarks的生成方式(自动生成)

  • 3、弱点:不能处理无序事件和延迟数据

ProcessingTime(默认时间)

  • 1、执行操作的机器的当前系统时间(每个算子都不一样)

  • 2、不需要流和机器之间的协调

  • 3、优势:最佳的性能和最低的延迟

  • 4、弱点:不确定性 ,容易受到各种因素影像(event产生的速度、到达flink的速度、在算子之间传输速度等),压根就不管顺序和延迟

三种时间的综合比较

  • 性能

    • ProcessingTime > IngestTime > EventTime
  • 延迟

    • ProcessingTime < IngestTime < EventTime
  • 确定性

    • EventTime > IngestTime > ProcessingTime

设置 Time 类型

可以你的流处理程序是以哪一种时间为标志的。

  • 在我们创建StreamExecutionEnvironment的时候可以设置Time类型,不设置Time类型,默认是ProcessingTime。
  • 如果设置Time类型为EventTime或者IngestTime,需要在创建StreamExecutionEnvironment中调用setStreamTimeCharacteristic() 方法指定。
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

//不设置Time 类型,默认是processingTime。
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

//指定流处理程序以IngestionTime为准
//environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

//指定流处理程序以EventTime为准
//environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

示例:ProcessWindowFunction实现时间确定

  • 通过process实现处理时间的确定,包括数据时间、window时间等
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TimeWindowWordCount {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)
    //对数据进行处理
    socketSource.flatMap(x => x.split(" "))
      .map(x =>(x,1))
      .keyBy(0)
      .timeWindow(Time.seconds(2),Time.seconds(1))
      .process(new SumProcessFunction)
      .print()

    environment.execute()
  }

}

/*
*  * @tparam IN The type of the input value.
  * @tparam OUT The type of the output value.
  * @tparam KEY The type of the key.
  * @tparam W The type of the window.
  * abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
  */
class SumProcessFunction extends ProcessWindowFunction[(String,Int),(String,Int),Tuple,TimeWindow]{
  private val format: FastDateFormat = FastDateFormat.getInstance("HH:MM:SS")

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    *
    *def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
    */

  override def process(key: Tuple, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
    println("当前系统时间为:"+format.format(System.currentTimeMillis()))
    println("window的处理时间为:"+format.format(context.currentProcessingTime))
    println("window的开始时间为:"+format.format(context.window.getStart))
    println("window的结束时间为:"+format.format(context.window.getEnd))
    var sum:Int = 0
    //统计每个窗口的数据出现次数
    for(eachElement <- elements){
      sum += eachElement._2
    }
    //输出 key和key出现的次数
    out.collect((key.getField(0),sum))
  }
}

输入
在这里插入图片描述

输出
在这里插入图片描述

分析
因为是滑动窗口,没1秒处理2秒的数据,所以数据处理有重叠。


版权声明:本文为a3125504x原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。