State的实现
Flink通过异步的checkpoint机制来实现流式处理过程中的容错,简单来讲就是定时地将本地的状态序列化到一个持久存储中,当出现错误是通过恢复检查点的状态来实现容错的,对于机制的详细介绍可以参见这个链接,本章主要讲述flink源码中state的实现。
StateBackend
flink将我们代码中的操作转化为一个个的task放在taskmanager中执行。其中每个task一个线程,每个task中包含了一个AbstractInvokable对象,task中的主要的逻辑就是调用AbstractInvokable.invoke()的方法。在流式处理中对应的实现都是继承自StreamTask。StreamTask中包含一个OperatorChain,并规定了一些hook函数来定义生命周期。AbstractStateBackend就是在这里进行初始化的。Flink的实现一共提供了3种state backend:MemoryStateBackend,FsStateBackend, 和RocksDBStateBackend。其中MemoryStateBackend主要用于调试开发中使用,后面2者适合于生产环境中使用。这三种实现均继承自AbstractStateBackend类。在StreamTask的初始化过程中会初始化OperatorChain中所有的operator,而AbstractKeyedBackend也是在这个过程中初始化的,而且一个StreamTask中只有一个,其实这个也符合常理,因为多个key by操作产生的operator必然在不同的线程中。
AbstractStateBackend
AbstractStateBackend的定义比较简单,它要求子类实现三个接口:
createStreamFactory: 为某个job的某个operator创建CheckpointStreamFactory,实际上只有FsStateBackend实现了这个接口,RocksDBStateBackend的实现需要传入一个AbstractStateBackend,通常情况下是FsStateBackendcreateKeyedStateBackend: 创建一个keyed state backend用来管理keyed statecreateOperatorStateBackend: 创建一个OperateStateBackend,AbstractStateBackend提供了一个实现,就是在内存中的一个Map,key是state名字,value就是list state,之所以是list state的原因请看State类型。
FsStateBackend
FsStateBackend会在checkpoint的时候将state存储到一个持久化的存储中,比如hdfs。对于keyed state,FsStateBackend简单地将其放置在内存中,因此对于比较大的state,FsStateBackend有可能会引起比较严重的GC。而且snapshot的过程是一个同步的过程,也就是说将state序列化并写入文件系统的过程是一个同步的过程,过大的state同样可能引起阻塞。
RocksDBStateBackend
RocksDBStateBackend与FsStateBackend不同,它将key state存储在rocksdb中。这种做法有2个好处:首先,比较大的state不会引起GC;其次,由于rocksdb支持snapshot操作,因此snapshot的过程是一个异步的过程,不会阻塞。但是rocksdb实现的state也会有几个可能的缺点:首先,state的update和get操作都会有一个序列化和反序列化的过程,因此效率会比直接在内存中低;其次,rocksdb使用LSM-Tree作为存储结构,compaction过程需要大量的读写磁盘,因此也有可能引起阻塞,对于这个问题一个可能优化是使用memory filesystem,将所有的存储放在内存中;最后,rocksdb的tuning比较复杂,在普通的SATA硬盘上表现如何还需要确认。
使用State
State类型
flink中的state可以从2个纬度来划分:是否属于某个key(key state或者operator state),是否受flink管理(raw state或者managed state)。key state用于在KeyedStream中保存状态,operater state用于在普通的非key中保存状态。managed state是指被flink所管理的状态。raw state是被应用程序自己管理,flink会调用相应的接口方法来实现状态的restore和snapshot。
flink自从1.2.0开始加入了一个新的功能:dynamic scalable state。它的目的就是当flink的operator的parallism改变之后仍然能从上一次的checkpoint或者savepoint恢复。为了达到这个目的,key state被按照key group组织起来,其实这是一个与pre-sharding类似的想法,比如说key group有128个,那么flink就会将key state分成128份来存储,这样只要你的processor的并行度是小于128的,总能分到一部分的key group state。对于operator state,flink会将state按照list组织起来,从而当processor的并行度改变的时候仍然能得以恢复。
Managed Key State
我们首先看看flink 是如何处理key state的。首先要说明一点,在flink中所有的key state都是managed,而且通过RuntimeContext中的getState方法来获得。如果在普通的没有经过key by的stream中使用这个RuntimeContext.getState方法则会抛出异常。前面我们讲过不同的state backend是如何存储key state的,这里不再赘述。在flink中,每当一个新的数据到来时,系统会调用setCurrentKey方法,这样当我们访问state的时候可以知道系统对应的哪个key。
Managed Operator State
要获得managed operator state,用户需要实现CheckpointedFunction接口,并在initializeState方法中初始化state,这里获得list state。前文中我们说到过使用operator state都是保存在内存中。
Raw Operator State
对于需要自己管理operator state的用户可以实现ListCheckpointed接口,这个接口要求用户提供的state都是list。在实际的实现中,这个state在snapshot的时候仍然会被放入OperatorStateBackend中去。
Legacy State
在flink 1.2.0之前对于用户自定义的state需要实现Checkpointed接口,由于这个接口无法被partition,因此这个接口已经被标记为Deprecated。
State 的snapshot和restore
snapshot
本节我们讲述flink是如何将state存储到持久化的存储中。有了上面给大家讲述的概念之后,snapshot这部分的代码就比较直观了。代码的主要逻辑在StreamTask.performCheckpoint这个函数里,基本上就是对每个AbstractStreamOperator调用snapshot函数来。大家要注意的是,虽然里面用了大量的Future来抽象不同的snapshot过程,但是基本上只有对于RocksDBStateBackend的key state是异步的,这是因为只有rocksdb支持snapshot操作,其他的backend本质都是map,职能同步进行。当snapshot结束之后,会将state handle发送给job manager。
restore
restore的过程就比较简单,基本上就是在task初始化的时候通过state handle拉取文件,然后恢复state。