本文首发于 泊浮目的语雀: https://www.yuque.com/17sing
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2022.2.2 | 文章首发 |
在这里,我们首先要搞清楚两种Exactly-Once的区别:
- Exactly Once:在计算引擎内部,数据不丢失不重复。本质是通过Flink开启检查点进行Barrier对齐,即可做到。
- End to End Exactly Once:这意味着从数据读取、引擎处理到写入外部存储的整个过程中,数据都是不丢失不重复的。这要求数据源可重放,写入端支持事务的恢复和回滚或幂等。
文章图片
当两条边下发barrier时,barrier1比barrier2先到达了算子,那么算子会将一条边输入的元素缓存起来,直到barrier2到了做Checkpoint以后才会下发元素。
每个算子对齐barrier后,会进行异步状态存储,然后下发barrier。每个算子做完Checkpoint时,会通知
CheckpointCoordinator
。当CheckpointCoordinator
得知所有算子的Checkpoint都做完时,认为本次Checkpoint完成。而在我们的应用程序中,有一个map算子接受了大量数据,导致barrier一直没有下发,最终整个Checkpoint超时。
2. Checkpoint的原理 其具体原理可以参考Flink团队的论文:Lightweight Asynchronous Snapshots for Distributed Dataflow。简单来说,早期流计算的容错方案都是周期性做全局状态的快照,但这有两个缺点:
- 阻塞计算——做快照时是同步阻塞的。
- 会将当前算子未处理以及正在处理的record一起做进快照,因此快照会变得特别大。
在Flink中(无环有向图),会周期性的插入Barrier这个标记,告知下游算子开始做快照。这个算法基于以下前提:
- 网络传输可靠,可以做到FIFO。这里会对算子进行
blocked
和unblocked
操作,如果一个算子是blocked
,它会把从上游通道接收到的所有数据缓存起来,直接收到unblocked
的信号才发送。 - Task可以对它们的通道进行以下操作:
block
,unblock
,send messages
,broading messages
。 - 对于Source节点来说,会被抽象成
Nil
输入通道。
- 可行性检查
- JobMaster通知Task触发检查点
- TaskExecutor执行检查点
- JobMaster确认检查点
3.1 可行性检查 参考代码:
CheckpointingCoordinator#startTriggeringCheckpoint
。- 确保作业不是处于关闭中或未启动的状态(见
CheckpointPlanCalculator#calculateCheckpointPlan
)。 - 生成新的CheckpointingID,并创建一个PendingCheckpoint——当所有Task都完成了Checkpoint,则会转换成一个CompletedCheckpoint。同时也会注册一个线程去关注是否有超时的情况,如果超时则会Abort当前的Checkpoint(见
CheckpointPlanCalculator#createPendingCheckpoint
)。 - 触发MasterHook。部分外部系统在触发检查点之前,需要做一些扩展逻辑,通过该实现MasterHook可以实现通知机制(见
CheckpointPlanCalculator#snapshotMasterState
)。 - 重复步骤1,没问题的话通知SourceStreamTask开始触发检查点(见
CheckpointPlanCalculator#triggerCheckpointRequest
)。
CheckpointPlanCalculator#triggerCheckpointRequest
中,会通过triggerTasks
方法调用到Execution#triggerCheckpoint
方法。Execution对应了一个Task实例,因此JobMaster可以通过里面的Slot引用找到其TaskManagerGateway
,发送远程请求触发Checkpoint。3.3 TaskManager执行检查点 TaskManager在代码中的体现为
TaskExecutor
。当JobMaster触发远程请求至TaskExecutor时,handle的方法为TaskExecutor#triggerCheckpoint
,之后便会调用Task#triggerCheckpointBarrier
来做:- 做一些检查,比如Task是否是Running状态
- 触发Checkpoint:调用
CheckpointableTask#triggerCheckpointAsync
- 执行检查点:
CheckpointableTask#triggerCheckpointAsync
。以StreamTask
实现为例,这里会考虑上游已经Finish时如何触发下游Checkpoint的情况——通过塞入CheckpointBarrier
来触发;如果任务没有结束,则调用StreamTask#triggerCheckpointAsyncInMailbox
。最终都会走入SubtaskCheckpointCoordinator#checkpointState
来触发Checkpoint。 - 算子保存快照:调用
OperatorChain#broadcastEvent
:保存OperatorState与KeyedState。 - 调用
SubtaskCheckpointCoordinatorImpl#finishAndReportAsync
,:异步的汇报当前快照已完成。
|-- RpcCheckpointResponder
\-- acknowledgeCheckpoint
|-- JobMaster
\-- acknowledgeCheckpoint
|-- SchedulerBase
\-- acknowledgeCheckpoint
|-- ExecutionGraphHandler
\-- acknowledgeCheckpoint
|-- CheckpointCoordinator
\-- receiveAcknowledgeMessage
在3.1中,我们提到过PendingCheckpoint。这里面维护了一些状来确保Task全部Ack、Master全部Ack。当确认完成后,
CheckpointCoordinator
将会通知所有的Checkpoint已经完成。|-- CheckpointCoordinator
\-- receiveAcknowledgeMessage
\-- sendAcknowledgeMessages
3.5 检查点恢复 该部分代码较为简单,有兴趣的同学可以根据相关调用栈自行阅读代码。
|-- Task
\-- run
\-- doRun
|-- StreamTask
\-- invoke
\-- restoreInternal
\-- restoreGates
|-- OperatorChain
\-- initializeStateAndOpenOperators
|-- StreamOperator
\-- initializeState
|-- StreamOperatorStateHandler
\-- initializeOperatorState
|-- AbstractStreamOperator
\-- initializeState
|-- StreamOperatorStateHandler
\-- initializeOperatorState
|-- CheckpointedStreamOperator
\-- initializeState #调用用户代码
3.6 End to End Exactly Once 端到端的精准一次实现其实是比较困难的——考虑一个Source对N个Sink的场景。故此Flink设计了相应的接口来保障端到端的精准一次,分别是:
- TwoPhaseCommitSinkFunction:想做精准一次的Sink必须实现此接口。
- CheckpointedFunction:Checkpoint被调用时的钩子。
- CheckpointListener:顾名思义,当Checkpoint完成或失败时会通知此接口的实现者。
可能有同学会好奇为什么JDBC Sink没有实现ExactlyOnce。本质和这个接口的执行方式无法兼容JDBC的事务使用方式——当一个算子意味退出时,是无法再对之前的事务进行操作的。因此
TwoPhaseCommitSinkFunction
中的retryCommit以及retryRollback是无法进行的——见https://github.com/apache/fli...。JDBC的Sink是基于XA实现的,尽可能保证一致性。这里可能又有同学会问了为什么不用Upset类的语句,因为这个方式并不通用——对于Upset需要一个唯一键,不然性能极差。4. 小结 【读Flink源码谈设计(Exactly Once)】本文以问题视角切入Checkpoint的原理与实现,并对相关源码做了简单的跟踪。其实代码的线路是比较清晰的,但涉及大量的类——有心的同学可能已经发现,这是单一职责原则的体现。
TwoPhaseCommitSinkFunction
中的实现也是典型的模版方法设计模式。