flink|Flink的State与Rescale

本文是关于对flink状态在任务rescale下面对的问题以及解决这些问题的设计以及代码实现
状态 状态类别 flink中的状态主要分为两类, Keyed State和Operator State。其中Opertor State是以Operator 实例为粒度的状态,而Keyed State是以数据的key为粒度的,该key被划分到哪个Operator实例决定了这个Key的state在哪个Operator实例,而一个Operator实例是可以同时管理多个Keyed State
state与restore 为了保证数据的本地花(Data locally),目前flink的状态都是operator实例粒度的,这样相当于flink operator对于状态的操作都能通过本地IO完成,而不需要和其他node进行通信,保证了flink的吞吐效率,同时通过将本地state定期的flush到dfs等外部存储上用于状态的持久化。
而个人认为计算也可以在宏观上分成计算逻辑(Execution Graph)和中间状态(State)。所以只要这两者都被持久化后,那么自然可以让任务在停止后重新续跑的时候可以从停止时的状态继续执行。即状态恢复(restore)
restore与rescale 在flink计算任务开发过程中,因为上游数据源的变动,对任务进行rescale是一件非常常见的情况,那么在对任务进行了rescale之后重新提交认为,那么我们自然也会希望能够继承停止时消费到的状态继续消费数据,这里就引出了state restore在rescale时的问题。即同一个operator在rescale之后产生的新实例如何去分配老实例的遗留下来的状态。
Operator State 首先以一个常见的例子作为切入点,一个读取外部数据源的source(kafka, rocketmq…),它的状态应该是什么,抽象的来说应该是对外部数据源offset的记录,从而可以得知我重启后的任务应该从哪个offset继续读取数据。那么放在分布式场景下,就是每个Source Operator实例记录自己负责的外部数据源的分区(Partition)的Offset。那么设想这样一种情况,当我们的Source Operator实例发生了rescale,实例数量由原本的2变成了3,这个时候原本Source Operator的实例应该如何被分配,假设原有Source Operator的实例状态如下

Operator 1 Partition 1, 1000 Partition 2, 1500Operator 2 Partition 3, 2000 Partition 4, 2200

如果由人为的来进行分配,那么很简单我们可能会可以按照我们自己的了解将state分成如下
Operator 1 Partition 1, 1000 Partition 2, 1500Operator 2 Partition 3, 2000Operator 3 Partition 4, 2200

那么现在的问题就是如何能够在代码编写上将这种用户的知识传递出去,那么这样其实就相当于要抽出一层原子性(Atomic)的状态,即不能将整个Operator级别的State作为一个snapshot和restore的最小单位,而是将整个Operator级别的State看作一个List,其中List里的每个Element作为snapshot和restore时可分配的最小单位,映射到现在的Flink实现中其实就是ListState
public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFunction implements CheckpointListener, ResultTypeQueryable, CheckpointedFunction { /** Accessor for state in the operator state backend. */ private transient ListState> unionOffsetStates; // ------------------------------------------------------------------------ //Checkpoint and restore // ------------------------------------------------------------------------ @Override public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore(); this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME, createStateSerializer(getRuntimeContext().getExecutionConfig()))); if (context.isRestored()) { restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); // populate actual holder for restored state for (Tuple2 kafkaOffset : unionOffsetStates.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); }LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else { LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } } @Override public final void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("snapshotState() called on closed source"); } else { unionOffsetStates.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the // originally restored offsets or the assigned partitions for (Map.Entry subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue())); }if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); } } else { HashMap currentOffsets = fetcher.snapshotCurrentState(); if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); }for (Map.Entry kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { unionOffsetStates.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } }if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // truncate the map of pending offsets to commit, to prevent infinite growth while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { pendingOffsetsToCommit.remove(0); } } } } }

我们可以看到,在FlinkKafkaConsumerBase这个类里,就是通过ListState来对状态进行存储,rescale时类似下图
flink|Flink的State与Rescale
文章图片

Keyed State 对于Keyed State,由于他的语义,他天然的解决了state如何分配的问题,即在rescale后这个key会被重新分配到哪个Operator实例,那么就把原来的该key的state分配到那个Operator即可,那么是不是就说明我们不用再讨论Keyed State在rescale时的情况了呢?不尽然,虽然Keyed State已经解决了状态分发的问题,但是会带来因状态分发而带来的restore低效率的问题。图2A展示如果使用Keyed State的Key作为restore时重新分配的最小粒度可能带来的后果,即会产生大量对dfs的随即读写,了解常见dfs系统的话应该就能理解这会大大降低IO的性能。



【flink|Flink的State与Rescale】针对这种情况, flink设计了KeyGroup机制,将Keygroup而不俗Key作为Keyed State的最小原子性可分配元素。KeyGroup设计如下,key group的数量和max parallelism一致,一个Operator的计算实例会处理1-n个Key Group,一个Key定位到处理他的Operator的步骤如下
  1. 通过hash(key) % max parallelism得到该key对应的key group
/** * Assigns the given key to a key-group index. * * @param key the key to assign * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. * @return the key-group to which the given key is assigned */ public static int assignToKeyGroup(Object key, int maxParallelism) { Preconditions.checkNotNull(key, "Assigned key must not be null!"); return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } /** * Assigns the given key to a key-group index. * * @param keyHash the hash of the key to assign * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. * @return the key-group to which the given key is assigned */ public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; }

  1. 根据Keygroup Index找到处理该KeyGroup的具体Operator实例
/** * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum * parallelism. * * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want * to go beyond this boundary, this method must perform arithmetic on long values. * * @param maxParallelism Maximal parallelism that the job was initially created with. * @param parallelismThe current parallelism under which the job runs. Must be <= maxParallelism. * @param operatorIndexId of a key-group. 0 <= keyGroupID < maxParallelism. * @return the computed key-group range for the operator. */ public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( int maxParallelism, int parallelism, int operatorIndex) {checkParallelismPreconditions(parallelism); checkParallelismPreconditions(maxParallelism); Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism); int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; return new KeyGroupRange(start, end); }

该方法用于计算每个Operator处理的KeyGroup的范围
通过这种设计,在keyed state进行rescale时,状态的restore就会变成图2B,满足了对于dfl的顺序读取
flink|Flink的State与Rescale
文章图片

总结 纵观针对state的两个设计,均是设计出了一种更加高效并且能满足需求的原子性分配单位,只不过一个是对对象进行更细粒度的拆分,一个是对对象进行粗粒度的合并。
参考 https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html (图片以及本文大体内容均来自该文,图片模糊不清的可以来这看)
https://juejin.im/post/6844904053512601607
https://blog.csdn.net/nazeniwaresakini/article/details/104220138

    推荐阅读