Flink Checkpoint是否支持Kafka 数据消费状态的维护()

作者:闻乃松
使用Flink实时消费kafka数据时候,涉及到offset的状态维护,为了保证Flink作业重启或者运行时的Operator级别的失败重试,如果要做到“断点续跑”,需要Flink的Checkpoint的支持。问题是,如果简单的开启Flink的Checkpoint机制,而不需要额外的编码工作,是否能达到目的?为回答该问题,本文首先要研究Flink的Checkpoint的处理机制,然后再看Flink是否支持Kafka的状态存储,于是本文分以下四个部分:

  • Flink Checkpoint 状态快照(snapshotState)主要流程
  • Flink Checkpoint 状态初始化(initializeState)主要流程
  • Kafka Source Operator 对Flink Checkpoint实现
  • Kafka Source Operator状态恢复
为了准确描述起见,本文以Flink 1.12.x版本,Kafka客户端版本2.4.x为例说明。
Flink Checkpoint 状态快照(snapshotState)主要流程
我们已知 Flink Checkpoint由CheckpointCoordinator周期性发起,它通过向相关的tasks发送触发消息和从各tasks收集确认消息(Ack)来完成checkpoint。这里省略CheckpointCoordinator发起调用逻辑解析,直奔消息受体TaskExecutor,来看Checkpoint的执行流程,在TaskExecutor中获取Task实例,触发triggerCheckpointBarrier:
TaskExecutor.class@Override public CompletableFuture triggerCheckpoint( ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) { ... final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); return CompletableFuture.completedFuture(Acknowledge.get()); ... } }

Task是在TaskExecutor中的调度执行单元,也响应Checkpoint请求:
Task.class public void triggerCheckpointBarrier( final long checkpointID, final long checkpointTimestamp, final CheckpointOptions checkpointOptions) { ... final CheckpointMetaData checkpointMetaData = https://www.it610.com/article/new CheckpointMetaData(checkpointID, checkpointTimestamp); invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); ... }

其中的看点是invokable,为AbstractInvokable类型的对象,根据调用类动态实例化:
// now load and instantiate the task's invokable code AbstractInvokable invokable = loadAndInstantiateInvokable( userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);

其中 nameOfInvokableClass参数 在Task初始化时传入,动态创建AbstractInvokable实例,比如以一个SourceOperator为例,其类名称为:
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask
从SourceOperatorStreamTask类定义来看,它又是StreamTask的子类:
class SourceOperatorStreamTask extends StreamTask>

triggerCheckpointAsync方法接连调用SourceOperatorStreamTask和StreamTask类的triggerCheckpointAsync方法,主要逻辑是在StreamTask的triggerCheckpointAsync方法中:
StreamTask.class@Override public Future triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { ... triggerCheckpoint(checkpointMetaData, checkpointOptions) ... }private boolean triggerCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { // No alignment if we inject a checkpoint CheckpointMetricsBuilder checkpointMetrics = new CheckpointMetricsBuilder() .setAlignmentDurationNanos(0L) .setBytesProcessedDuringAlignment(0L); ...boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); if (!success) { declineCheckpoint(checkpointMetaData.getCheckpointId()); } return success; }private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws Exception { ... subtaskCheckpointCoordinator.checkpointState( checkpointMetaData, checkpointOptions, checkpointMetrics, operatorChain, this::isRunning); ... }

其中subtaskCheckpointCoordinator是SubtaskCheckpointCoordinatorImpl类型实例,负责协调子任务相关的checkpoint工作:
/** * Coordinates checkpointing-related work for a subtask (i.e. {@link * org.apache.flink.runtime.taskmanager.Task Task} and {@link StreamTask}). Responsibilities: * *
    *
  1. build a snapshot (invokable) *
  2. report snapshot to the JobManager *
  3. action upon checkpoint notification *
  4. maintain storage locations *
*/ @Internal public interface SubtaskCheckpointCoordinator extends Closeable

【Flink Checkpoint是否支持Kafka 数据消费状态的维护()】下面是SubtaskCheckpointCoordinatorImpl实现类中的checkpointState主要逻辑:
SubtaskCheckpointCoordinatorImpl.class@Override public void checkpointState( CheckpointMetaData metadata, CheckpointOptions options, CheckpointMetricsBuilder metrics, OperatorChain operatorChain, Supplier isRunning) throws Exception { // All of the following steps happen as an atomic step from the perspective of barriers and // records/watermarks/timers/callbacks. // We generally try to emit the checkpoint barrier as soon as possible to not affect // downstream // checkpoint alignmentsif (lastCheckpointId >= metadata.getCheckpointId()) { LOG.info( "Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId()); channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); checkAndClearAbortedStatus(metadata.getCheckpointId()); return; }lastCheckpointId = metadata.getCheckpointId(); if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { // broadcast cancel checkpoint marker to avoid downstream back-pressure due to // checkpoint barrier align. operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId())); return; }// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. //The pre-barrier work should be nothing or minimal in the common case. operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId()); // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastEvent( new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), options.isUnalignedCheckpoint()); // Step (3): Prepare to spill the in-flight buffers for input and output if (options.isUnalignedCheckpoint()) { // output data already written while broadcasting event channelStateWriter.finishOutput(metadata.getCheckpointId()); } // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact // progress of the // streaming topology Map snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators()); if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) { finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning); } else { cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined")); } }

在Step (1)中看到prepareSnapshotPreBarrier,在正式snapshot之前做了一些轻量级的准备工作,具体操作实现在OperatorChain中,依次调用链中每个StreamOperator的prepareSnapshotPreBarrier方法:
OperatorChain.classpublic void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // go forward through the operator chain and tell each operator // to prepare the checkpoint for (StreamOperatorWrapper operatorWrapper : getAllOperators()) { if (!operatorWrapper.isClosed()) { operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId); } } }

经过一系列快照检查验证、快照前的准备、向下游广播事件操作,最终落脚到本类的checkpointStreamOperator方法:
SubtaskCheckpointCoordinatorImpl.classprivate static OperatorSnapshotFutures checkpointStreamOperator( StreamOperator op, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation, Supplier isRunning) throws Exception { try { return op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions, storageLocation); } catch (Exception ex) { if (isRunning.get()) { LOG.info(ex.getMessage(), ex); } throw ex; } }

该方法又调用AbstractStreamOperator的snapshotState:
AbstractStreamOperator.class@Override public final OperatorSnapshotFutures snapshotState( long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { return stateHandler.snapshotState( this, Optional.ofNullable(timeServiceManager), getOperatorName(), checkpointId, timestamp, checkpointOptions, factory, isUsingCustomRawKeyedState()); }

snapshotState又将checkpoint逻辑委派到StreamOperatorStateHandler。StreamOperatorStateHandler的逻辑下文再介绍。梳理上述snapshot逻辑流程,可视化表现为:
Flink Checkpoint是否支持Kafka 数据消费状态的维护()
文章图片

Flink Checkpoint 状态初始化(initializeState)主要流程
上文中提到的Task初始化启动会调用AbstractInvokable 的invoke方法,
// now load and instantiate the task's invokable code AbstractInvokable invokable = loadAndInstantiateInvokable( userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env); // run the invokable invokable.invoke();

invoke在其父类StreamTask中的invoke方法完成调用前、运行事件循环和调用后的Template 策略动作:
StreamTask.class@Override public final void invoke() throws Exception { beforeInvoke(); // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); }// let the task do its work runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); }afterInvoke(); }

在beforeInvoke方法中通过operatorChain的initializeStateAndOpenOperators进行状态初始化:
StreamTask.classprotected void beforeInvoke() throws Exception { operatorChain = new OperatorChain<>(this, recordWriter); ... operatorChain.initializeStateAndOpenOperators( createStreamTaskStateInitializer()); ... }

在operatorChain中触发当前链中所有StreamOperator:
OperatorChain.classprotected void initializeStateAndOpenOperators( StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper operatorWrapper : getAllOperators(true)) { StreamOperator operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }

继续跟进AbstractStreamOperator调用initializeState:
AbstractStreamOperator.class@Override public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); }

其中stateHandler.initializeOperatorState又将initializeOperatorState委派到了StreamOperatorStateHandler类,在其中完成具体StreamOperator子类的状态初始化。梳理初始化状态的逻辑,可视化表现为:
Flink Checkpoint是否支持Kafka 数据消费状态的维护()
文章图片

Kafka Source Operator 对Flink Checkpoint的支持
现在将Checkpoint的状态快照过程和状态初始化过程画在一起,会看到两者都汇总委派到StreamOperatorStateHandler来执行:
Flink Checkpoint是否支持Kafka 数据消费状态的维护()
文章图片

StreamOperatorStateHandler类中initializeOperatorState和snapshotState方法实现如下,主要完成的是参数的构建:
StreamOperatorStateHandler.classpublic void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception { CloseableIterable keyedStateInputs = context.rawKeyedStateInputs(); CloseableIterable operatorStateInputs = context.rawOperatorStateInputs(); StateInitializationContext initializationContext = new StateInitializationContextImpl( context.isRestored(), // information whether we restore or start for // the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateInputs, // access to keyed state stream operatorStateInputs); // access to operator state streamstreamOperator.initializeState(initializationContext); }public OperatorSnapshotFutures snapshotState( CheckpointedStreamOperator streamOperator, Optional> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, boolean isUsingCustomRawKeyedState) throws CheckpointException { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, closeableRegistry); snapshotState( streamOperator, timeServiceManager, operatorName, checkpointId, timestamp, checkpointOptions, factory, snapshotInProgress, snapshotContext, isUsingCustomRawKeyedState); return snapshotInProgress; }void snapshotState( CheckpointedStreamOperator streamOperator, Optional> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, OperatorSnapshotFutures snapshotInProgress, StateSnapshotContextSynchronousImpl snapshotContext, boolean isUsingCustomRawKeyedState) throws CheckpointException { if (timeServiceManager.isPresent()) { checkState( keyedStateBackend != null, "keyedStateBackend should be available with timeServiceManager"); final InternalTimeServiceManager manager = timeServiceManager.get(); if (manager.isUsingLegacyRawKeyedStateSnapshots()) { checkState( !isUsingCustomRawKeyedState, "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write."); manager.snapshotToRawKeyedState( snapshotContext.getRawKeyedOperatorStateOutput(), operatorName); } } streamOperator.snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture( snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); }if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); } }

值得一提的是两个方法中的StreamOperator参数要求是CheckpointedStreamOperator 类型:
public interface CheckpointedStreamOperator { void initializeState(StateInitializationContext context) throws Exception; void snapshotState(StateSnapshotContext context) throws Exception; }

比较下StreamOperator,其跟Checkpoint相关的三个方法定义如下:
Flink Checkpoint是否支持Kafka 数据消费状态的维护()
文章图片

虽然方法名字一样,参数不同,其实不用管这些,只需要知道StreamOperator将快照的相关逻辑委派到了StreamOperatorStateHandler,真正的快照逻辑都在CheckpointedStreamOperator中完成即可,于是,要想实现自定义快照逻辑,只需要实现CheckpointedStreamOperato接口,以SourceOperator为例,类定义:
public class SourceOperator extends AbstractStreamOperator implements OperatorEventHandler, PushingAsyncDataInput

而AbstractStreamOperator的类定义为:
public abstract class AbstractStreamOperator implements StreamOperator, SetupableStreamOperator, CheckpointedStreamOperator, Serializable

AbstractStreamOperator已经帮我们实现了相关方法,只需要extend AbstractStreamOperator,仍然以SourceOperator为例来看它的实现:
SourceOperator.classprivate ListState readerState; @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); final ListState rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC); readerState = new SimpleVersionedListState<>(rawState, splitSerializer); }@Override public void snapshotState(StateSnapshotContext context) throws Exception { long checkpointId = context.getCheckpointId(); LOG.debug("Taking a snapshot for checkpoint {}", checkpointId); readerState.update(sourceReader.snapshotState(checkpointId)); }

可见SourceOperator将快照状态存储在内存中的SimpleVersionedListState中,snapshotState的具体操作转给了SourceReader,来看Flink Kafka Connector提供的KafkaSourceReader 如何实现snapshotState:
KafkaSourceReader.classKafkaSourceReader extends SourceReaderBase implements SourceReader@Override public List snapshotState(long checkpointId) { List splits = super.snapshotState(checkpointId); if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) { offsetsToCommit.put(checkpointId, Collections.emptyMap()); } else { Map offsetsMap = offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); // Put the offsets of the active splits. for (KafkaPartitionSplit split : splits) { // If the checkpoint is triggered before the partition starting offsets // is retrieved, do not commit the offsets for those partitions. if (split.getStartingOffset() >= 0) { offsetsMap.put( split.getTopicPartition(), new OffsetAndMetadata(split.getStartingOffset())); } } // Put offsets of all the finished splits. offsetsMap.putAll(offsetsOfFinishedSplits); } return splits; }

上面是基于内存的状态存储,而持久化还需要外部系统的支持,继续探究StreamOperatorStateHandler的snapshot方法逻辑,其中有这么一段:
if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); }

当配置了持久化后端存储,才会将状态数据持久化,以默认的OperatorStateBackend为例:
DefaultOperatorStateBackend.class@Override public RunnableFuture> snapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {long syncStartTime = System.currentTimeMillis(); RunnableFuture> snapshotRunner = snapshotStrategy.snapshot( checkpointId, timestamp, streamFactory, checkpointOptions); return snapshotRunner; }

snapshotStrategy.snapshot执行逻辑实现在DefaultOperatorStateBackendSnapshotStrategy中:
DefaultOperatorStateBackendSnapshotStrategy.class@Override public RunnableFuture> snapshot( final long checkpointId, final long timestamp, @Nonnull final CheckpointStreamFactory streamFactory, @Nonnull final CheckpointOptions checkpointOptions) throws IOException {... for (Map.Entry> entry : registeredOperatorStatesDeepCopies.entrySet()) { operatorMetaInfoSnapshots.add( entry.getValue().getStateMetaInfo().snapshot()); } ... // ... write them all in the checkpoint stream ... DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy( operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); backendSerializationProxy.write(dov); ... for (Map.Entry> entry : registeredOperatorStatesDeepCopies.entrySet()) {PartitionableListState value = https://www.it610.com/article/entry.getValue(); long[] partitionOffsets = value.write(localOut); } }

状态数据有元数据信息和状态本身的数据,状态数据通过PartitionableListState的write方法写入文件系统:
PartitionableListState.classpublic long[] write(FSDataOutputStream out) throws IOException {long[] partitionOffsets = new long[internalList.size()]; DataOutputView dov = new DataOutputViewStreamWrapper(out); for (int i = 0; i < internalList.size(); ++i) { S element = internalList.get(i); partitionOffsets[i] = out.getPos(); getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov); }return partitionOffsets; }

Kafka Source Operator状态恢复
上面一部分介绍了Kafka Source Operator对Flink Checkpoint的支持,也是涉及到snapshot和initialState两个部分,但主要介绍了snapshot的逻辑,再来看SourceOperator的如何初始化状态的:
SourceOperator.class@Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); final ListState rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC); readerState = new SimpleVersionedListState<>(rawState, splitSerializer); }

context.getOperatorStateStore使用了DefaultOperatorStateBackend的getListState方法:
DefaultOperatorStateBackend.classprivate final Map> registeredOperatorStates; @Override publicListState getListState(ListStateDescriptor stateDescriptor) throws Exception { return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); }privateListState getListState( ListStateDescriptor stateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException { ... PartitionableListState partitionableListState = (PartitionableListState) registeredOperatorStates.get(name); ... return partitionableListState; }

而getListState仅仅是从名叫registeredOperatorStates的Map>中获取,那问题来了,registeredOperatorStates从哪里来?为了找到答案,这部分通过一个Kafka消费示例来演示和说明,首先创建KafkaSource:
KafkaSource kafkaSource = KafkaSource.builder() .setBootstrapServers(Constants.kafkaServers) .setGroupId(KafkaSinkIcebergExample.class.getName()) .setTopics(topic) .setDeserializer(recordDeserializer) .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) .setProperties(properties) .build();

并且设置重启策略:StreamOperator失败后立即重启一次好快照间隔:100毫秒1次:
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(0))); env.getCheckpointConfig().setCheckpointInterval(1 * 100L);

然后在Kafka反序列时候,设置解析100条记录后抛出异常:
public static class TestingKafkaRecordDeserializer implements KafkaRecordDeserializer { private static final long serialVersionUID = -3765473065594331694L; private transient Deserializer deserializer = new StringDeserializer(); int parseNum=0; @Override public void deserialize( ConsumerRecord record, Collector collector) { if (deserializer == null) deserializer = new StringDeserializer(); MetaAndValue metaAndValue=https://www.it610.com/article/new MetaAndValue( new TopicPartition(record.topic(), record.partition()), deserializer.deserialize(record.topic(), record.value()), record.offset()); if(parseNum++>100) { Map metaData=https://www.it610.com/article/metaAndValue.getMetaData(); throw new RuntimeException("for test"); } collector.collect(metaAndValue); } }

JobMaster初始化并创建Scheduler时候从Checkpoint进行状态初始化,如果从Checkpoint初始化失败,则试图从Savepoint恢复。
SchedulerBase.classprivate ExecutionGraph createAndRestoreExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster shuffleMaster, JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp) throws Exception {ExecutionGraph newExecutionGraph = createExecutionGraph( currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { // check whether we find a valid checkpoint if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( new HashSet<>(newExecutionGraph.getAllVertices().values()))) {// check whether we can restore from a savepoint tryRestoreExecutionGraphFromSavepoint( newExecutionGraph, jobGraph.getSavepointRestoreSettings()); } }return newExecutionGraph; }

最后回到熟悉的CheckpointCoordinator,在其方法restoreLatestCheckpointedStateInternal中从Checkpoint目录加载最新快照状态:
CheckpointCoordinator.classpublic boolean restoreInitialCheckpointIfPresent(final Set tasks) throws Exception { final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, false, // initial checkpoints exist only on JobManager failover. ok if not // present. false); // JobManager failover means JobGraphs match exactly.return restoredCheckpointId.isPresent(); }private OptionalLong restoreLatestCheckpointedStateInternal( final Set tasks, final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior, final boolean errorIfNoCheckpoint, final boolean allowNonRestoredState) throws Exception { ... // Recover the checkpoints, TODO this could be done only when there is a new leader, not // on each recovery completedCheckpointStore.recover(); // Restore from the latest checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery); ... // re-assign the task states final Map operatorStates = latest.getOperatorStates(); StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation( latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState); stateAssignmentOperation.assignStates(); ... }

上面是应用初始启动的状态恢复逻辑,那在应用运行期间的Operator失败重启的逻辑又是什么样的呢?实际上JobMaster会监听任务运行状态,并做相应处理,比如下面一个失败处理链路逻辑:
UpdateSchedulerNgOnInternalFailuresListener.class@Override public void notifyTaskFailure( final ExecutionAttemptID attemptId, final Throwable t, final boolean cancelTask, final boolean releasePartitions) {final TaskExecutionState state = new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t); schedulerNg.updateTaskExecutionState( new TaskExecutionStateTransition(state, cancelTask, releasePartitions)); } SchedulerBase.class @Override public final boolean updateTaskExecutionState( final TaskExecutionStateTransition taskExecutionState) { final Optional executionVertexId = getExecutionVertexId(taskExecutionState.getID()); boolean updateSuccess = executionGraph.updateState(taskExecutionState); if (updateSuccess) { checkState(executionVertexId.isPresent()); if (isNotifiable(executionVertexId.get(), taskExecutionState)) { updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState); } return true; } else { return false; } } DefaultScheduler.class@Override protected void updateTaskExecutionStateInternal( final ExecutionVertexID executionVertexId, final TaskExecutionStateTransition taskExecutionState) {schedulingStrategy.onExecutionStateChange( executionVertexId, taskExecutionState.getExecutionState()); maybeHandleTaskFailure(taskExecutionState, executionVertexId); }private void maybeHandleTaskFailure( final TaskExecutionStateTransition taskExecutionState, final ExecutionVertexID executionVertexId) {if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { final Throwable error = taskExecutionState.getError(userCodeLoader); handleTaskFailure(executionVertexId, error); } }private void handleTaskFailure( final ExecutionVertexID executionVertexId, @Nullable final Throwable error) { setGlobalFailureCause(error); notifyCoordinatorsAboutTaskFailure(executionVertexId, error); final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); maybeRestartTasks(failureHandlingResult); } private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { if (failureHandlingResult.canRestart()) { //调用restartTasks restartTasksWithDelay(failureHandlingResult); } else { failJob(failureHandlingResult.getError()); } }private Runnable restartTasks( final Set executionVertexVersions, final boolean isGlobalRecovery) { return () -> { final Set verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices( executionVertexVersions); removeVerticesFromRestartPending(verticesToRestart); resetForNewExecutions(verticesToRestart); try { restoreState(verticesToRestart, isGlobalRecovery); } catch (Throwable t) { handleGlobalFailure(t); return; }schedulingStrategy.restartTasks(verticesToRestart); }; } SchedulerBase.classprotected void restoreState( final Set vertices, final boolean isGlobalRecovery) throws Exception { ... if (isGlobalRecovery) { final Set jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices); checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true); } else { final Map subtasksToRestore = getInvolvedExecutionJobVerticesAndSubtasks(vertices); final OptionalLong restoredCheckpointId = checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks( subtasksToRestore.keySet()); // Ideally, the Checkpoint Coordinator would call OperatorCoordinator.resetSubtask, but // the Checkpoint Coordinator is not aware of subtasks in a local failover. It always // assigns state to all subtasks, and for the subtask execution attempts that are still // running (or not waiting to be deployed) the state assignment has simply no effect. // Because of that, we need to do the "subtask restored" notification here. // Once the Checkpoint Coordinator is properly aware of partial (region) recovery, // this code should move into the Checkpoint Coordinator. final long checkpointId = restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT); notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId); } ... }

上述整个链路涉及到DefaultScheduler和SchedulerBase,实际上还是在一个运行对象实例中进行的,两者关系为:
public abstract class SchedulerBase implements SchedulerNG public class DefaultScheduler extends SchedulerBase implements SchedulerOperations

最后又回到了熟悉的CheckpointCoordinator:
CheckpointCoordinator.classpublic OptionalLong restoreLatestCheckpointedStateToSubtasks( final Set tasks) throws Exception { // when restoring subtasks only we accept potentially unmatched state for the // following reasons //- the set frequently does not include all Job Vertices (only the ones that are part //of the restarted region), meaning there will be unmatched state by design. //- because what we might end up restoring from an original savepoint with unmatched //state, if there is was no checkpoint yet. return restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior .SKIP, // local/regional recovery does not reset coordinators false, // recovery might come before first successful checkpoint true); // see explanation above }

在schedulingStrategy.restartTasks中,每个Task 分配的状态被封装在JobManagerTaskRestore 中,jobManagerTaskRestore 会作为TaskDeploymentDescriptor 的一个属性下发到TaskEXecutor 中。当TaskDeploymentDescriptor被提交给TaskExecutor 之后,TaskExcutor 会使用TaskStateManager 用于管理当前Task的状态,TaskStateManager 对象会基于分配的JobManagerTaskRestore 和本地状态存储TaskLocalStateStore进行创建:
TaskEXecutor.class@Override public CompletableFuture submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { ... final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( jobId, tdd.getAllocationId(), taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); final TaskStateManager taskStateManager = new TaskStateManagerImpl( jobId, tdd.getExecutionAttemptId(), localStateStore, taskRestore, checkpointResponder); ... //启动 Task }

启动Task会调用StreamTask的invoke方法,并在beforeInvoke中进行如下初始化:
StreamTask.class protected void beforeInvoke() throws Exception { ... operatorChain.initializeStateAndOpenOperators( createStreamTaskStateInitializer()); ... }public StreamTaskStateInitializer createStreamTaskStateInitializer() { InternalTimeServiceManager.Provider timerServiceProvider = configuration.getTimerServiceProvider(getUserCodeClassLoader()); return new StreamTaskStateInitializerImpl( getEnvironment(), stateBackend, TtlTimeProvider.DEFAULT, timerServiceProvider != null ? timerServiceProvider : InternalTimeServiceManagerImpl::create); }

再回到operatorChain的initializeStateAndOpenOperators方法:
OperatorChain.classprotected void initializeStateAndOpenOperators( StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper operatorWrapper : getAllOperators(true)) { StreamOperator operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }

其中StreamOperator的initializeState调用了子类AbstractStreamOperator的initializeState,并在其中创建StreamOperatorStateContext:
AbstractStreamOperator.class@Override public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {final TypeSerializer keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); final StreamTask containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); }

从快照中读取状态数据并恢复的实际动作就隐藏在streamOperatorStateContext的创建过程中:
StreamTaskStateInitializerImpl.class@Override public StreamOperatorStateContext streamOperatorStateContext( // -------------- Keyed State Backend -------------- keyedStatedBackend = keyedStatedBackend( keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction); // -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); // -------------- Raw State Streams -------------- rawKeyedStateInputs = rawKeyedStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawKeyedState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs = rawOperatorStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawOperatorState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); // -------------- Internal Timer Service Manager -------------- if (keyedStatedBackend != null) {// if the operator indicates that it is using custom raw keyed state, // then whatever was written in the raw keyed state snapshot was NOT written // by the internal timer services (because there is only ever one user of raw keyed // state); // in this case, timers should not attempt to restore timers from the raw keyed // state. final Iterable restoredRawKeyedStateTimers = (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) ? rawKeyedStateInputs : Collections.emptyList(); timeServiceManager = timeServiceManagerProvider.create( keyedStatedBackend, environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers); } else { timeServiceManager = null; }// -------------- Preparing return value --------------return new StreamOperatorStateContextImpl( prioritizedOperatorSubtaskStates.isRestored(), operatorStateBackend, keyedStatedBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); }

到此为止,我们完成了梳理Flink Kafka Source Operator Checkpoint的状态恢复流程,这部分逻辑大致可以划分为两大部分:基于StateInitializationContext的状态初始化和从Checkpoint恢复状态并生成StateInitializationContext。后者的恢复过程远比文章中介绍的复杂,实际上JobMaster监听到任务失败后,会从Checkpoint持久化数据中装载最近一个快照的状态元数据,然后再将状态重新分配各子任务,特别是应用重启级别的恢复,还涉及到算子拓扑结构和并行度的改变,JobMaster状态恢复之后再提交任务重启请求,在TaskManager端还可能再从本地快照(如果启用的话)恢复状态数据。TaskManager端的状态恢复以创建完成StreamOperatorStateContext为标志,它包装了快照恢复后的完整数据,接下来就回到了正常的StreamOperator的InitialState方法调用流程。
总结
本文从Flink Checkpoint的处理流程(包括snapshot的创建和初始化两部分)和Kafka对Flink Checkpoint的支持几个部分,从Flink的代码实现角度来确定Flink的Checkpoint是支持Kafka的数据消费状态维护的,但是这个状态只是从StateInitializationContext对象中获取的,为了进一步验证StateInitializationContext的状态是否从Checkpoint持久化中获取,本文第四部分结合实验,从Flink应用重启和运行时Operator失败重试来梳理Flink的状态恢复逻辑,确定Flink是支持从Checkpoint或Savepoint恢复状态。
最后,根据前文的分析,开发者在开发Flink应用时需要注意的是:虽然Flink能够将Kafka消费状态恢复到最近一个Checkpoint快照状态,但是无法避免在两个快照之间的重复消费。一个典型情景是Sink端不支持幂等时,有可能造成数据的重复,例如PrintSink就无法撤回快照之间输出的数据。另外,在未开启Flink Checkpoint时需要依赖Kafka Client自身的commit的状态来实现状态维护。

    推荐阅读