flink状态的深入研究

state存储的数据

ValueState 会存储 key、namespace、value,缩写为 <K, N, V>。
MapState 会存储 key、namespace、userKey、userValue,缩写为 <K, N, UK, UV>。

下面解释这些名词

key

ValueState 和 MapState 都是 KeyedState,也就是 keyBy 后才能使用 ValueState 和 MapState。
所以 State 中肯定要保存 key。

group by uid。假设uid有id1、id2这两个值,id1、id2就是key。

Namespace

Namespace 用于区分窗口。

假设需要统计 app1 和 app2 每个小时的 pv 指标,则需要使用小时级别的窗口。
状态引擎为了区分 app1 在 7 点和 8 点的 pv 值,就必须新增一个维度用来标识窗口。
Flink 用 Namespace 来标识窗口,这样就可以在状态引擎中区分出 app1 在 7 点和 8 点的状态信息。

Value、UserKey、UserValue

ValueState 中存储具体的状态值。也就是上述例子中对应的 pv 值。
MapState 类似于 Map 集合,存储的是一个个 KV 键值对。
为了与 keyBy 的 key 进行区分,所以 Flink 中把 MapState 的 key、value 分别叫 UserKey、UserValue。

state的读写

Flink 支持三种 StateBackend,分别是:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。

其中 MemoryStateBackend、FsStateBackend 两种 StateBackend 在任务运行期间都会将 State 存储在内存中,
两者在 Checkpoint 时将快照存储的位置不同。
RocksDBStateBackend 在任务运行期间将 State 存储在本地的 RocksDB 数据库中。

所以下文将 MemoryStateBackend、FsStateBackend 统称为 heap 模式,RocksDBStateBackend 称为 RocksDB 模式。

heap模式

Heap 模式表示所有的状态数据都存储在 TM 的堆内存中,所有的状态都存储的原始对象,不会做序列化和反序列化。
(注:Checkpoint 的时候会涉及到序列化和反序列化,数据的正常读写并不会涉及,所以这里先不讨论。)

Heap 模式下,无论是 ValueState 还是 MapState 都存储在 CopyOnWriteStateMap<K, N, V> 中。

  • key 、 Namespace 分别对应 CopyOnWriteStateMap 的 K、N。
  • ValueState 的 value 对应 CopyOnWriteStateMap 的 V。

MapState 将会把整个 Map 作为 CopyOnWriteStateMap 的 V,
相当于 Flink 引擎创建了一个 HashMap 用于存储 MapState 的 KV 键值对。

heap模式下,ValueState 中存 Map 与 MapState 有什么区别?
没有区别。实质上 ValueState 中存 Map 与 MapState 都是一样的,
存储结构都是 CopyOnWriteStateMap<K, N, HashMap>。
区别在于 ValueState 是用户手动创建 HashMap,MapState 是 Flink 引擎创建 HashMap。

RocksDB模式

RocksDB 模式表示所有的状态数据存储在 TM 本地的 RocksDB 数据库中。RocksDB 是一个 KV 数据库,且所有的 key 和 value 都是 byte 数组。
所以无论是 ValueState 还是 MapState,存储到 RocksDB 中都必须将对象序列化成二进制当前 kv 存储在 RocksDB 中。

在rocksdb中,每次state的度操作,都需要反序列化key、value;每次state写操作,都需要序列化key、value。
而heap模式是不需要序列化的,就这来说heap模式具有更高的性能。

ValueState的存储

ValueState 有 key、namespace、value 需要存储

  • 将 ValueState 的 key 序列化成 byte 数组
  • 将 ValueState 的 namespace 序列化成 byte 数组
  • 将两个 byte 数组拼接起来做为 RocksDB 的 key
  • 将 ValueState 的 value 序列化成 byte 数组做为 RocksDB 的 value

MapState的存储

MapState 有 key、namespace、userKey、userValue 需要存储,所以最简单的思路:

  • 将 MapState 的 key 序列化成 byte
  • 将 MapState 的 namespace 序列化成 byte
  • 将 MapState 的 userKey 序列化成 byte
  • 将三个 byte 数组拼接起来做为 RocksDB 的
  • 将 MapState 的 value 序列化成 byte 数组做为 RocksDB 的 value

mapState不是保存整个map,而是map中的每个元素都单独保存到rocksdb中。

RocksDB 模式下,ValueState 中存 Map 与 MapState 有什么区别?

ValueState 中存 Map,Flink 引擎会把整个 Map 当做一个大 Value,存储在 RocksDB 的 1 行数据中。
MapState 会根据 userKey,将 100 个 KV 键值对分别存储在 RocksDB 的 100 行中。

ValueState中的map,每次读写都需要序列化整个map,这会极大的降低性能(非常耗 CPU)。

udaf和state

自定义聚合函数都被封装成了:GroupAggProcessFunction,,执行processElement

  override def processElement(
      inputC: CRow,
      ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
      out: Collector[CRow]): Unit = {

    val currentTime = ctx.timerService().currentProcessingTime()
    // register state-cleanup timer
    // 注册状态清理timer
    processCleanupTimer(ctx, currentTime)

    val input = inputC.row

    // get accumulators and input counter
    // 获取状态
    var accumulators = state.value()
    var inputCnt = cntState.value()

    if (null == accumulators) {
      // Don't create a new accumulator for a retraction message. This
      // might happen if the retraction message is the first message for the
      // key or after a state clean up.
      if (!inputC.change) {
        return
      }
      // first accumulate message
      firstRow = true
      // 创建状态
      accumulators = function.createAccumulators()
    } else {
      firstRow = false
    }

    if (null == inputCnt) {
      inputCnt = 0L
    }

    // Set group keys value to the final output
    function.setForwardedFields(input, newRow.row)
    function.setForwardedFields(input, prevRow.row)

    // Set previous aggregate result to the prevRow
    function.setAggregationResults(accumulators, prevRow.row)

    // update aggregate result and set to the newRow
    if (inputC.change) {
      inputCnt += 1
      // accumulate input
      function.accumulate(accumulators, input)
      function.setAggregationResults(accumulators, newRow.row)
    } else {
      inputCnt -= 1
      // retract input
      function.retract(accumulators, input)
      function.setAggregationResults(accumulators, newRow.row)
    }

    if (inputCnt != 0) {
      // we aggregated at least one record for this key

      // update the state
      state.update(accumulators)
      cntState.update(inputCnt)

      // if this was not the first row
      if (!firstRow) {
        if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) {
          // newRow is the same as before and state cleaning is not enabled.
          // We emit nothing
          // If state cleaning is enabled, we have to emit messages to prevent too early
          // state eviction of downstream operators.
          return
        } else {
          // retract previous result
          if (generateRetraction) {
            out.collect(prevRow)
          }
        }
      }
      // emit the new result
      out.collect(newRow)

    } else {
      // we retracted the last record for this key
      // sent out a delete message
      out.collect(prevRow)
      // and clear all state
      state.clear()
      cntState.clear()
    }
  }

从上面看出:rocksdb的状态每次使用都需要序列化和反序列化,如果对象状态太大,必然会带来性能问题。

flink distinct的实现

按照上述描述distinct去重函数也应该会是一个大对象,需要收集所有数据才对,实际使用过程中并没有感知到很慢,这是怎么做到的呢?
Flink操作distinct是通过类DistinctAccumulator完成的,其内部使用的是MapView。
可以发现,MapView会被翻译成RocksDBMapState,accumulator序列化的时候会忽略掉这个字段,
使用的时候都是操作的RocksDBMapState,对单条数据进行操作。

AggregationCodeGenerator这个就是用来包装聚合相关代码的了,其中有个函数addAccumulatorDataViews()会将MapView替换成StateMapView。

// create DataViews
  val descFieldTerm = s"${dataViewFieldTerm}_desc"
  val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName
  val descDeserializeCode =
    s"""
       |    $descClassQualifier $descFieldTerm = ($descClassQualifier)
       |      ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
       |        "$serializedData",
       |        $descClassQualifier.class,
       |        $contextTerm.getUserCodeClassLoader());
       |""".stripMargin
  val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) {
    s"""
       |    $descDeserializeCode
       |    $dataViewFieldTerm = new ${classOf[StateMapView[_, _]].getCanonicalName}(
       |      $contextTerm.getMapState(
       |        (${classOf[MapStateDescriptor[_, _]].getCanonicalName}) $descFieldTerm));
       |""".stripMargin
  } else if (dataViewField.getType == classOf[ListView[_]]) {
    s"""
       |    $descDeserializeCode
       |    $dataViewFieldTerm = new ${classOf[StateListView[_]].getCanonicalName}(
       |      $contextTerm.getListState(
       |        (${classOf[ListStateDescriptor[_]].getCanonicalName}) $descFieldTerm));
       |""".stripMargin
  } else {
    throw new CodeGenException(s"Unsupported dataview type: $dataViewTypeTerm")
  }
  reusableOpenStatements.add(createDataView)

udaf 建议

  • 定义的accumulator尽量小,否则在rocksdb情况下每次序列化会消耗大量时间
  • accumulator无法变小,考虑使用MapView最终生成的MapState尽量减少序列化的内容
  • 开启mini batch,可以降低state访问频率。
  • 极限追求速度,可以考虑使用FsStateBackend

flink sql 状态清理

timer

Flink Streaming API提供的用于感知并利用处理时间/事件时间变化的机制。
Timer会由Flink按key+timestamp自动去重的,也就是说如果你的key有N个,
并且注册的timestamp相同的话,那么实际会注册N个Timer

在KeyedProcessFunction实现类里定义定时器:

  • 重写processElement(),对每个输入元素注册定时器,但会自动去重
  • 重写onTimer(),定时器触发时执行的逻辑

timer注册:

  • 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
  • 事件时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。

timer的注册和状态清理

以GroupAggProcessFunction定义state清理timer为例,来说明timer的注册过程,
这个过程也是flink sql 状态清理的过程。

在org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement,
每次数据到达都会注册一个timer servvice。

// 这个获取flink当前process time
val currentTime = ctx.timerService().currentProcessingTime()
// register state-cleanup timer
processCleanupTimer(ctx, currentTime)

org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState.processCleanupTimer:

  /*
   * 如果minRetentionTime为0, 那么stateCleaningEnabled为flase。
   * minRetentionTime、maxRetentionTime使用通过tableConfig.setIdleStateRetentionTime设置的
  */
  protected def processCleanupTimer(
    ctx: KeyedProcessFunction[KEY, IN, OUT]#Context,
    currentTime: Long): Unit = {
    if (stateCleaningEnabled) {
      registerProcessingCleanupTimer(
        cleanupTimeState,
        currentTime,
        minRetentionTime,
        maxRetentionTime,
        ctx.timerService()
      )
    }
  }

org.apache.flink.table.runtime.aggregate.CleanupState.registerProcessingCleanupTimer:

  // cleanupTimeState,是保存timer的state
  // 简单的说,timer是会保存都后端存储的,支持重启恢复。
  def registerProcessingCleanupTimer(
      cleanupTimeState: ValueState[JLong],
      currentTime: Long,
      minRetentionTime: Long,
      maxRetentionTime: Long,
      timerService: TimerService): Unit = {

    // last registered timer
    // 获取当前timer的时间戳值
    val curCleanupTime = cleanupTimeState.value()

    // check if a cleanup timer is registered and
    // that the current cleanup timer won't delete state we need to keep
    // 判断是否需要更新timer的时间戳。
    if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
      // we need to register a new (later) timer
      val cleanupTime = currentTime + maxRetentionTime
      // register timer and remember clean-up time
      // 注册新的timer
      timerService.registerProcessingTimeTimer(cleanupTime)
      // delete expired timer
      if (curCleanupTime != null) {
         //删除上一个timer。聚合函数这个key这个key需要清理一次。
        timerService.deleteProcessingTimeTimer(curCleanupTime)
      }
      cleanupTimeState.update(cleanupTime)
    }
  }

org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer:

	@Override
	public void registerProcessingTimeTimer(N namespace, long time) {
		InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
		if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
			// check if we need to re-schedule our timer to earlier
			if (time < nextTriggerTime) {
				if (nextTimer != null) {
					nextTimer.cancel(false);
				}
				nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
			}
		}
	}

org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer:

	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {
        // 获取timestamp和当前时间的最大值。意味着如果timestamp < 当前时间,timer需要立即执行(+了1ms)。
		long delay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, getCurrentProcessingTime());

		// we directly try to register the timer and only react to the status on exception
		// that way we save unnecessary volatile accesses for each timer
		try {
	        //timerService本质是ScheduledThreadPoolExecutor的实例。
	        // 就是一个java调度线程(线程数量:1)
			return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException e) {
			final int status = this.status.get();
			if (status == STATUS_QUIESCED) {
				return new NeverCompleteFuture(delay);
			}
			else if (status == STATUS_SHUTDOWN) {
				throw new IllegalStateException("Timer service is shut down");
			}
			else {
				// something else happened, so propagate the exception
				throw e;
			}
		}
	}

java 调度调用的方法是org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime

	private void onProcessingTime(long time) throws Exception {
		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
		// inside the callback.
		nextTimer = null;

		InternalTimer<K, N> timer;

		while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
			processingTimeTimersQueue.poll();
			keyContext.setCurrentKey(timer.getKey());
			triggerTarget.onProcessingTime(timer);
		}

		if (timer != null && nextTimer == null) {
			nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
		}
	}

最后调用,org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction

	private void invokeUserFunction(
			TimeDomain timeDomain,
			InternalTimer<K, VoidNamespace> timer) throws Exception {
		onTimerContext.timeDomain = timeDomain;
		onTimerContext.timer = timer;
		// 执行onTimer,清理状态。
		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
		onTimerContext.timeDomain = null;
		onTimerContext.timer = null;
	}

org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.onTimer

	@Override
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<RowData> out) throws Exception {
		if (stateCleaningEnabled) {
		    // 完成状态清理
			cleanupState(accState);
			function.cleanup();
		}
	}

cleanupState调用的是:org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState.cleanupState

	protected void cleanupState(State... states) {
		for (State state : states) {
		    // 调用state对应的清理方法。
			state.clear();
		}
		this.cleanupTimeState.clear();
	}

mini batch 状态访问和清理

mini batch 是在缓存一定量的数据后,在执行聚合操作,可以明显减低对state的访问频率。
开启mini batch后,sql编译后的聚合函数都是MapBundleFunction的子类。
MapBundleFunction 并没有对状态的清理机制。

目前flink所有版本 mini batch都没有状态清理机制,所以一定要开启增量checkpoint,否则会oom。
但是checkpoint的目录会越来越大,任务重启和checkpoint管理一个比较麻烦的问题,执行savepoint超时的概率也比较大。

据说flink 1.12会实现该功能,官方bug说明:https://issues.apache.org/jira/browse/FLINK-16581

datastream 状态清理

Apache Flink 的 1.6.0 版本引入了状态生存时间特性,现在只支持process time上的状态ttl定义。
在 Flink 的DataStream API 中,应用程序状态是由状态描述符(state descriptor)来定义的。
状态生存时间是通过将StateTtlConfiguration对象传递给状态描述符来配置的

如下代码:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;
 
 
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7)) // 这个定义的是状态生存时间。
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
 
 
ValueStateDescriptor<Long> lastUserLogin =
    new ValueStateDescriptor<>("lastUserLogin", Long.class);
 
// 设置状态的ttl配置。是针对每个状态来设置的。
lastUserLogin.enableTimeToLive(ttlConfig);

setUpdateType方法用于设置重置生存时间的策略,可以取的值如下:

public enum UpdateType {
		Disabled,//禁用过期,状态不过期
		OnCreateAndWrite,// 写的时候更新ttl
		OnReadAndWrite// 读写都会更新ttl
	}

setStateVisibility 设置是否可以返回过期的用户值,可以取的值如下:

public enum StateVisibility {
		ReturnExpiredIfNotCleanedUp,// 如果还没有清除,那么返回
		NeverReturnExpired// 不返回过期的值
	}

清理策略

保证完整快照中不包含过期数据

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupFullSnapshot()
.build();

在执行full checkpoint才会清理状态,task manager本地状态不会清理,checkpoint不会保存过期数据。

堆内存状态后端的增量清理

此方法只适用于堆内存状态后端(FsStateBackend和MemoryStateBackend)。
其基本思路是在存储后端的所有状态条目上维护一个全局的惰性迭代器。
某些事件(例如状态访问)会触发增量清理,而每次触发增量清理时,迭代器都会向前遍历删除已遍历的过期数据

以下代码示例展示了如何启用增量清理:

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
// check 10 keys for every state access
.cleanupIncrementally(10, false)
.build();

如果启用该功能,则每次状态访问都会触发清除。而每次清理时,都会检查一定数量的状态条目是否过期。

第一个定义了每次清理时要检查的状态条目数。第二个参数是一个标志位,用于表示是否在每条记录处理(Record processed)之后(而不仅仅是访问状态,State accessed),都还额外触发清除逻辑

关于这种方法有两个重要的注意事项:首先是增量清理所花费的时间会增加记录处理的延迟。
其次,如果没有状态被访问(State accessed)或者没有记录被处理(Record processed),过期的状态也将不会被删除。

RocksDB 状态后端利用后台压缩来清理过期状态

RocksDB 会定期运行异步的压缩流程以合并数据并减少相关存储的数据量,
该定制的压缩过滤器使用生存时间检查状态条目的过期时间戳,并丢弃所有过期值。

使用此功能的第一步,需要设置以下配置选项:state.backend.rocksdb.ttl.compaction.filter.enabled。
一旦配置使用 RocksDB 状态后端后,如以下代码示例将会启用压缩清理策略:

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInRocksdbCompactFilter()
.build();

需要注意的是启用 Flink 的生存时间压缩过滤机制后,会放缓 RocksDB 的压缩速度。

使用定时器进行状态清理

目前flink sql使用的是这个方法。前面已经有分析

参考文章

从udaf谈flink的state
Flink State 误用之痛,你中招了吗?
flink sql状态清理问题


版权声明:本文为xuen198721原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。