流批一体计算引擎-1-[Flink]的调度方式和流式计算的应用特征

1 流式计算

1.1 背景

在日常生活中,我们通常会先把数据存储在一张表中,然后再进行加工、分析,这里就涉及到一个时效性的问题。

场景一:如果我们处理以年、月为单位的级别的数据,针对这些大量数据的实时性要求并不高。
场景二:如果我们处理的是以天、小时,甚至分钟为单位的数据,那么对数据的时效性要求就比较高。

在第二种场景下,如果我们仍旧采用传统的数据处理方式,统一收集数据,存储到数据库中,之后在进行分析,就可能无法满足时效性的要求

1.2 流式计算与批量计算

大数据的计算模式主要分为:
批量计算(batch computing)、
流式计算(stream computing)、
交互计算(interactive computing)、
图计算(graph computing)等。

其中,流式计算和批量计算是两种主要的大数据计算模式,分别适用于不同的大数据应用场景。

流数据(或数据流)是指在时间分布和数量上无限的一系列动态数据集合体,数据的价值随着时间的流逝而降低,因此必须实时计算给出秒级响应。

流式计算,就是对数据流进行处理,是实时计算。
批量计算则统一收集数据,存储到数据库中,然后对数据进行批量处理的数据计算方式。

两者的区别主要体现在以下几个方面:

(1)数据时效性不同
流式计算实时、低延迟;
批量计算非实时、高延迟。

(2)数据特征不同
流式计算的数据一般是动态的、没有边界的;
批处理的数据一般则是静态数据。

(3)应用场景不同
流式计算应用在实时场景,时效性要求比较高的场景,如实时推荐、业务监控…。
批量计算一般说批处理,应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。

(4)运行方式不同
流式计算的任务持续进行的;
批量计算的任务则一次性完成。

1.3 流式计算框架平台与相关产品

第一类,商业级流式计算平台(IBM InfoSphere Streams、IBM StreamBase等);
第二类,开源流式计算框架(Twitter Storm、S4等);
第三类,公司为支持自身业务开发的流式计算框架。

(1)Strom:Twitter 开发的第一代流处理系统。
(2)Heron:Twitter 开发的第二代流处理系统。
(3)Spark streaming:是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。
(4)Flink:是一个针对流数据和批数据的分布式处理引擎。
(5)Apache Kafka:由Scala写成。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

1.4 流式计算主要应用场景

流式处理可以用于两种不同场景: 事件流和持续计算。
(1)事件流
事件流具能够持续产生大量的数据,这类数据最早出现与传统的银行和股票交易领域,也在互联网监控、无线通信网等领域出现、需要以近实时的方式对更新数据流进行复杂分析如趋势分析、预测、监控等。

简单来说,事件流采用的是查询保持静态,语句是固定的,数据不断变化的方式。

(2)持续计算

比如对于大型网站的流式数据:网站的访问PV/UV、用户访问了什么内容、搜索了什么内容等,实时的数据计算和分析可以动态实时地刷新用户访问数据,展示网站实时流量的变化情况,分析每天各小时的流量和用户分布情况;比如金融行业,毫秒级延迟的需求至关重要。
一些需要实时处理数据的场景也可以应用Storm,比如根据用户行为产生的日志文件进行实时分析,对用户进行商品的实时推荐等。

1.5 大数据流式计算

大数据流式计算可以广泛应用于金融银行、互联网、物联网等诸多领域,如股市实时分析、插入式广告投放、交通流量实时预警等场景,主要是为了满足该场景下的实时应用需求。数据往往以数据流的形式持续到达数据计算系统,计算功能的实现是通过有向任务图的形式进行描述,数据流在有向任务图中流过后,会实时产生相应的计算结果。整个数据流的处理过程往往是在毫秒级的时间内完成的。

通常情况下,大数据流式计算场景具有以下鲜明特征。
1)在流式计算环境中,数据是以元组为单位,以连续数据流的形态,持续地到达大数据流式计算平台。数据并不是一次全部可用,不能够一次得到全量数据,只能在不同的时间点,以增量的方式,逐步得到相应数据。

2)数据源往往是多个,在进行数据流重放的过程中,数据流中各个元组间的相对顺序是不能控制的。也就是说,在数据流重放过程中,得到完全相同的数据流(相同的数据元组和相同的元组顺序)是很困难的,甚至是不可能的。

3)数据流的流速是高速的,且随着时间在不断动态变化。这种变化主要体现在两个方面,一个方面是数据流流速大小在不同时间点的变化,这就需要系统可以弹性、动态地适应数据流的变化,实现系统中资源、能耗的高效利用;另一方面是数据流中各个元组内容(语义)在不同时间点的变化,即概念漂移,这就需要处理数据流的有向任务图可以及时识别、动态更新和有效适应这种语义层面上的变化。

4)实时分析和处理数据流是至关重要的,在数据流中,其生命周期的时效性往往很短,数据的时间价值也更加重要。所有数据流到来后,均需要实时处理,并实时产生相应结果,进行反馈,所有的数据元组也仅会被处理一次。虽然部分数据可能以批量的形式被存储下来,但也只是为了满足后续其他场景下的应用需求。

5)数据流是无穷无尽的,只要有数据源在不断产生数据,数据流就会持续不断地到来。这也就需要流式计算系统永远在线运行,时刻准备接收和处理到来的数据流。在线运行是流式计算系统的一个常态,一旦系统上线后,所有对该系统的调整和优化也将在在线环境中开展和完成。

6)多个不同应用会通过各自的有向任务图进行表示,并将被部署在一个大数据计算平台中,这就需要整个计算平台可以有效地为各个有向任务图分配合理资源,并保证满足用户服务级目标。同时各个资源间需要公平地竞争资源、合理地共享资源,特别是要满足不同时间点各应用间系统资源的公平使用。

1.6 流式计算的价值

通过大数据处理我们获取了数据的价值,但是数据的价值是恒定不变的吗?显然不是,一些数据在事情发生后不久就有了更高的价值,而且这种价值会随着时间的推移而迅速减少。流处理的关键优势在于它能够更快地提供洞察力,通常在毫秒到秒之间。

流式计算的价值在于业务方可在更短的时间内挖掘业务数据中的价值,并将这种低延迟转化为竞争优势。

比方说,在使用流式计算的推荐引擎中,用户的行为偏好可以在更短的时间内反映在推荐模型中,推荐模型能够以更低的延迟捕捉用户的行为偏好以提供更精准、及时的推荐。

流式计算能做到这一点的原因在于,传统的批量计算需要进行数据积累,在积累到一定量的数据后再进行批量处理;而流式计算能做到数据随到随处理,有效降低了处理延时。

2 流式计算的三种框架

在大数据处理领域,批处理任务流处理任务一般被认为是两种不同的任务,一个大数据框架一般会被设计为只能处理其中一种任务。

(1)框架设计区别
Storm只支持流处理任务。

MapReduce、Spark只支持批处理任务。

Spark Streaming是采用了一种micro-batch的架构,即把输入的数据流且分为细粒度的batch,并为每一个batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Storm等完全流式的数据处理方式完全不同。

(2)执行引擎区别
在执行引擎这一层,流处理系统与批处理系统最大的不同在于节点间的数据传输方式

对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。

而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点。

这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求。

(3)Flink通过灵活的执行引擎,能够同时支持批处理任务和流处理任务
Flink以固定的缓存块为单位进行网络数据传输,用户可以通过设置缓存块超时值指定缓存块的传输时机。

如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟。

如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量。

同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活的权衡系统延迟和吞吐量。

2.1 Apache Storm

在Storm中,需要先设计一个实时计算结构,我们称之为拓扑(topology)。之后,这个拓扑结构会被提交给集群,其中主节点(master node)负责给工作节点(worker node)分配代码,工作节点负责执行代码。在一个拓扑结构中,包含spout和bolt两种角色。数据在spouts之间传递,这些spouts将数据流以tuple元组的形式发送;而bolt则负责转换数据流。

在这里插入图片描述

2.2 Apache Spark

Spark Streaming,即核心Spark API的扩展,不像Storm那样一次处理一个数据流。相反,它在处理数据流之前,会按照时间间隔对数据流进行分段切分。

Spark针对连续数据流的抽象,我们称为DStream(Discretized Stream)。

DStream是小批处理的RDD(弹性分布式数据集), RDD则是分布式数据集,可以通过任意函数和滑动数据窗口(窗口计算)进行转换,实现并行操作。

在这里插入图片描述

2.3 Apache Flink

针对流数据+批数据的计算框架。把批数据看作流数据的一种特例,延迟性较低(毫秒级),且能够保证消息传输不丢失不重复。

在这里插入图片描述
Flink创造性地统一了流处理和批处理,作为流处理看待时输入数据流是无界的,而批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。

Flink程序由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

Apache Flink是一个开源的分布式、高性能、高可用的流处理框架。
主要有Java代码实现,支持scala和java API。
支持实时流(stream)处理和批(batch)处理,批数据只是流数据的一个极限特例。
Flink原生支持了迭代计算、内存管理和程序优化。

2.4 三种框架对比

在这里插入图片描述

在这里插入图片描述

2.5 如何选择实时框架

(1)需要关注流数据是否需要进行状态管理;
(2)At-least-once或者Exectly-once消息投递模式是否有特殊要求;
(3)对于小型独立的项目,并且需要低延迟的场景,建议使用storm;
(4)如果你的项目已经使用了spark,并且秒级别的实时处理可以满足需求的话,建议使用spark streaming;
(5)要求消息投递语义为Exactly Once的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink;

3 flink调度方式

Flink存在两种窗口操作,分别为滚动操作和滑动操作,两者主要的区别在于,滚动操作不会重叠,而滑动操作则会让数据产生重叠 。

窗口需求,在Stream应用程序的情况下,数据是连续的,因此我们不能等待在开始处理之前流式传输整个数据。

当然,我们可以处理每个传入的事件,然后转移到下一个事件,但在某些情况下,我们需要对传入的数据进行某种聚合,例如,有多少用户在过去10分钟内点击了您网页上的链接。在这种情况下,我们必须定义一个窗口并对窗口内的数据进行处理。

3.1 调度概述

无界数据于有界数据是一个比较于模糊的概念,无界与有界之间是可以进行转换的。无界数据流在进行某些计算的时候例如每分钟、每小时、每天等操作时都可以看做是有界数据集。Apache Flink使用Windows方式实现了对于无界数据集到有界数据集的计算

Windows是流式计算中最常用的计算方式之一,通过固定的时长(分钟,小时,天)与固定的长度(X条)的方式把无界的数据集划分到一个固定的空间中进行计算,从而得到该范围内的结果。例如常见的五分钟内登陆用户数,1000条数据内的错误比例等。
在这里插入图片描述
(1)Apache Flink在DataStreaming API中内置实现了一些窗口的算子。每个窗口中都包含:

Window Assigners(窗口分配器)
Triggers(窗口触发器)
Evitor(数据剔除器)
Lateness(时延)等。

完整的来看,Windows Assigners会在属于窗口的第一个元素到来的时候就会创建窗口,当时间或数量或自定义的Trigger触发时候会进行窗口的聚合计算,允许数据的Lateness。

(2)每个窗口都会有一个Trigger与ProcessWindowFunction、ReduceFunction、AggreateFunction或FoldFunction用于实现窗口内容的计算

(3)Window Assigners指定了数据应该分配给哪个窗口

例如基于时间的窗口提供基于时间进行窗口的创建,同样窗口也就是包含了时间的属性:开始时间戳与结束时间戳。

还有基于数量的窗口,例如前面提到的1000条数据。那么窗口就会把每1000条数据作为一个窗口。

3.2 滚动窗口Tumbling window

滚动窗口滚动数据流。
这种类型的窗口是不重叠的,即一个窗口中的事件/数据不会在其他窗口中重叠/出现。
在这里插入图片描述
(1)可以配置滚动窗口,基于数量,例如基于每5条数据。
(2)可以配置滚动窗口,基于时间,例如基于每十秒钟。

在这里插入图片描述

滚动窗口根据名字来看就是滚动进行计算的,而滚动的就是时间或者大小。按照固定的时间或者大小进行拆分。这种计算比较简单,适合于比较固定时间的计算。

例如计算01点的用户点击次数。12点的用户点击次数。这种计算前后窗口之间不会产生交集。没有产生前后的关系。

3.3 滑动窗口Sliding Window

滑动窗口与翻滚窗口相对,滑过数据流。
因此,滑动窗口可以重叠,它可以对输入的数据流进行更平滑的聚合,因为您不是从一组输入跳转到下一组输入,而是滑过输入的数据流。
在这里插入图片描述
(1)可以配置滑动窗口,基于数量,例如基于每5条数据。
(2)可以配置滑动窗口,基于时间,例如基于每十秒钟。

滑动窗口也是Apache Flink提供的一种简单的窗口计算方式,滑动窗口与滚动窗口特点差不多同样是基于时间或大小进行的计算。滑动窗口在滚动窗口的基础上增加了窗口的滑动时间,允许窗口的数据发生重叠。简单来看,例如实现五分钟内的异常数量统计,统计异常五分钟内异常个数大于50就产生告警行为。那么看下面的案例。
在这里插入图片描述
根据当前的情况,如果使用滚动的窗口来进行计算,那么这个时间的数据不会产生告警,但是其实实际的情况是当在第五分钟跟第六分钟的数据加起来时已经是50了,2min~6min的时候需要进行告警。

也就像是在传统的计算中,我们计算五分钟内发生告警,基于当前时间往前五分钟,但是计算是每分钟一次。

滑动窗口实现的就是这个功能,我们能够设置Slide Time使其进行滑动,窗口之间的数据重叠通过Window Time和Slide Size决定的。Slide Size就是我们计算的时间间隔,Window Time就是我们要计算的数据的时间间隔。

Window Time大于Slide Size也就是数据会重叠到多个窗口,比如1到5分钟的窗口会包含第2分钟的数据。2到6分钟的窗口也会包含第2分钟的数据。Window Time小于Slide Size就会出现数据不存在与任何窗口,也就是数据没有产生计算。例如 Window Size依然为5分钟,但是Slide Size为一个小时,那么窗口再产生计算的时候就只会计算计算时间前5分钟的数据,其他的数据没有产生计算。

滑动窗口帮助我们实现的业务场景也就是刚才上面讲到的案例。在实际的业务中会有大量的业务场景选用。
在这里插入图片描述

3.4 会话窗口Session Window

Session Window就是一种会话形态的窗口,主要是将在某个时间段活跃度较高的相关数据聚合在一起。与滚动窗口和滑动窗口不同的是,Session Window不需要Window Size和Slide Time,Session Window与MySql Session或其他Session的机制很像,窗口的触发条件是Session Gap,指在某个时间内没有活跃的数据时就会进行触发。也就是说,如果一直都有数据进来窗口,那么该窗口就不会产生触发计算。

例如统计用户的在线时长信息,用户会定时上报相关数据,从用户首次上报开始创建窗口,用户定期产生打点数据会进入该窗口,如果5分钟没有收到用户的数据则判断该用户退出,即触发该用户的在线时长计算。
在这里插入图片描述

滑动窗口与前两个窗口一样也可以设置基于Event Time,Process Time的Session Window。会话窗口其实与前两个不一样,其实本身是没有起止时间的。它是针对于进入的数据创建的窗口。最后基于Session Gap的逻辑计算的结果。

3.5 全局窗口Global Window

与其他的窗口均不同,Global Window是把所有相同的key都会生成一个相关的窗口,所以窗口没有起止时间,需要自己实现Trigger的触发计算,如果不实现Trigger则窗口永远不会进行计算。同时还需要指定相应的数据清理机制,如果不进行数据清理数据一直会停留在内存中。所以使用Global Window要较为慎重。
在这里插入图片描述

4 Flink中的各个窗口时间的概念

Apache Flink中提供了基于时间的窗口计算,例如计算五分钟内的用户数量或每一分钟计算之前五分钟的服务器异常日志占比等。

因此Apache Flink在流处理中提供了不同时间的支持。

在这里插入图片描述

4.1 处理时间(Processing Time)

处理时间是执行相应的操作时的系统时间。一般来说就是Apache Flink在执行某条数据的计算的时刻的系统时间。

处理时间是最简单的时间概念,基于处理时间能够实现最佳的性能与延迟,例如计算五分钟的用户数量,无需设置其他相关的项目直接可以通过系统的当前时间进行计算即可。

但是也会有某些影响,例如基于网络或者其他原因造成某些数据无法按照预计的时间到到,或者说在Apache Flink任务重启时都会造成计算结果与预期的结果不符的情况出现。

4.2 摄取时间(Ingestion Time)

摄取时间是指Apache Flink读取某条数据的时间,摄取时间是基于事件时间与处理时间之间的,因为摄取时间会在数据到来的时候给予一次时间戳,基于时间的计算需要按照时间戳去进行。

所以在操作时会把数据分配到不同的不同的窗口进行计算。但是相对于事件时间来说,它更加简单一些,不需要设置Watermarks。

4.3 事件时间(Event Time)

事件时间是比较好理解的一个时间,就是类似于上面展示的log4j输出到日志中的时间,在大部分的场景中我们在进行计算时都会利用这个时间。例如计算五分钟内的日志错误占比等。

Apache Flink能够支持基于事件的时间设置,事件时间是最接近于事实需求的时间。我们通常的数据处理大部分是基于事件时间的处理。
那么在流式计算中做事件时间的处理,基于某些原因可能就会存在问题,流处理在事件产生过程中,通过消息队列,到Flink的Source获取、再到Operator。中间的过程都会产生时间消耗。还有一些其他的情况,例如网络抖动造成的数据延迟等就会存在数据乱序。

但是对于数据乱序我们又不能无限期的等待事件到来,(谁知道它还来不来)。那么Apache Flink就有一个Watermark用来解决该问题,Watermark就是保证在一个特定的时间后进行触发window计算的机制。

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置引擎的执行为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost",9999)
    //设置时间戳与Watermark
    val eventText = text.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
      val maxOutOfOrderTime = 10000L  //设置10s的时间,意思是超过10s到达的数据将不会被处理
      var currentTimestamp:Long = _    // 从数据上获取到的当前时间
      override def getCurrentWatermark: Watermark = {
        //根据可容忍的最大延迟时间获取watermark
        new Watermark(currentTimestamp-maxOutOfOrderTime)
      }
      //从String中提取出事件时间
      override def extractTimestamp(str: String, l: Long): Long = {
        val sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,S")
        //获取到数据的事件时间
        currentTimestamp = sdf.parse(str.split("\\|")(0)).getTime
        currentTimestamp
      }
    })

    val count = eventText.map(res=>{
      val ress  = res.split("\\|")
      (ress(1),1)
    }).keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
    //输出结果
    count.print()
    env.execute("Apache Flink Event Time Watermark")
  }

版权声明:本文为qq_20466211原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。