Flink-OperatorChain源码详解
- 前言
- 逻辑计划中的算子链
- StreamingJobGraphGenerator(优化逻辑)
- createChain核心方法(创建chain)
- isChainable方法(判断operator是否可以加入chain):
- 划分chain的依据
- flink中chain的3种链接策略:
- 物理逻辑中的算子链(ExecutionGraph)
- OperatorChain小总结
前言 参考博客:LittleMagics的深入分析Flink的operator chain(算子链)机制
我们来看下flink-web页面中的JobGraph图
这里说明几点:
- 每一个框框代表一个Task,一个Task在一个线程当中运行。
- 每个Task中可以包含一个或者多个subTask。
- 每个Task中的并行度代表着subTask的个数。
- 多个subTask连接在一起就形成了一个OperatorChain
![Flink-OperatorChain源码详解](https://img.it610.com/image/info8/d28e74e2929d4fa4801078b536e71292.jpg)
文章图片
学过spark的同学可能知道:
spark的RDD主要有两种:transformation与action
一个action可以把之前的trans划分为一个stage。
而flink的chain可以形似与spark的stage。
问题来了:flink如何把operator去放到一个chain里面呢?
flink-web页面中流程图也就是JobGraph,底层是通过StreamingJobGraphGenerator这个类去实现的。
逻辑计划中的算子链 flink的任务作业分为三层结构:
- StreamGraph——原始逻辑执行计划
- JobGraph——优化的逻辑执行计划(Flink-Web中能看到的图,也就执行一个个dataStream,如map等)
- ExecutionGraph——物理执行计划(执行了sink操作)
private JobGraph createJobGraph() {
this.jobGraph.setScheduleMode(this.streamGraph.getScheduleMode());
// 1.通过traverseStreamGraphAndGenerateHashes方法计算出每个节点的哈希码作为唯一标识
//然后把这个唯一标识存储到Map当中,这里返回的是Map分别代表节点Id以及节点哈希码
//源码中是hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash));
Map hashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph);
// 2.扑朔图当中,一个节点的数据包含了节点Id以及他的哈希码,这个数据是保存在Map当中的
//再通过迭代器把所有的节点数据保存到List当中
List
重点来看一下setChaining方法
this.setChaining(hashes, legacyHashes, chainedOperatorHashes);
源码:
private void setChaining(Map hashes, List
createChain核心方法(创建chain) 这里再看一下createChain的方法:
private List> createChain(Integer startNodeId, Integer currentNodeId, Map hashes, List
isChainable方法(判断operator是否可以加入chain): 用于判断当前算子是否能加入到一个chain里面
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory> outOperator = downStreamVertex.getOperatorFactory();
// 判断
return downStreamVertex.getInEdges().size() == 1
// 上下游算子不能为空
&& outOperator != null && headOperator != null
// 上下游算子实例处于同一个SlotSharingGroup中
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
// 下游算子的链接策略必须是always
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
// 上游算子的链接策略是always或者head
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
// 两个算子间的物理分区逻辑是ForwardPartitioner
&& edge.getPartitioner() instanceof ForwardPartitioner
// 两个算子间的shuffle方式不等于批处理模式
&& edge.getShuffleMode() != ShuffleMode.BATCH
// 上下游算子实例的并行度相同
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
// 没有禁用算子链
&& streamGraph.isChainingEnabled();
}
划分chain的依据 总结下:上下游算子,也就是operator能放到一个chain的条件有:
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
- 上下游节点都在同一个 slot group 中(下面会解释 slot group)
- 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
- 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
- 两个节点间物理分区逻辑是 ForwardPartitioner
- 用户没有禁用 chain
- 前后算子不为空
// 指定一个operator不参与算子链
@PublicEvolving
public SingleOutputStreamOperator disableChaining() {
return setChainingStrategy(ChainingStrategy.NEVER);
}
// 手动start一个新的chain
@PublicEvolving
public SingleOutputStreamOperator startNewChain() {
// 这里可以发现改变的是chain的一个策略。从头部开始,也就是重新创建一个chain
return setChainingStrategy(ChainingStrategy.HEAD);
}
flink中chain的3种链接策略: 因为我们平常是不会去更改这个链接策略的,而基本上flink-chain的链接策略默认为always,因此这里简单介绍下。
如果是always代表着什么?
这意味着,只要它们共享相同的插槽并与前向通道连接,就将跳过网络/本地通道,并将记录直接交给下一个转换。
@PublicEvolving
public enum ChainingStrategy {
ALWAYS,
NEVER,
HEAD;
private ChainingStrategy() {
}
}
物理逻辑中的算子链(ExecutionGraph) 前两种StreamGraph、JobGraph是在客户端生成。
ExecutionGraph在jobMaster中生成,最后一种物理执行图是一种虚拟的图,不存在的数据结构,运行在每一个TaskExecutor中。在JobGraph转换成ExecutionGraph并交由TaskManager执行之后,会生成调度执行的基本任务单元——StreamTask,负责执行具体的StreamOperator逻辑。而在启动的过程中,会生成一个OperatorChain对象,他是算子链被具体执行时候的一个状态实例。
OperatorChain类:
@Internal
public class OperatorChain> implements StreamStatusMaintainer {
private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
// 算子链中的所有算子,倒序排列,即headOperator位于该数组的末尾;
private final StreamOperator>[] allOperators;
// 算子链的输出,可以有多个
private final RecordWriterOutput>[] streamOutputs;
// 算子链的入口点
private final OperatorChain.WatermarkGaugeExposingOutput> chainEntryPoint;
// 算子链的第一个算子,对应JobGraph中的算子链起始节点;
private final OP headOperator;
private StreamStatus streamStatus;
private InputSelection finishedInputs;
这里说明几点:
- 一个OperatorChain可以包括:HeadOpeartor(第一个算子)和StreamOpeartor(其他算子的统称)
- 如果一个算子无法进入OperatorChain,那么他会单独形成一个OperatorChain(但是只包含了HeadOpeartor)
try {
var20 = true;
// 1.遍历整个算子链的所有输出边
//不断的调用createStreamOutput()方法创建对应的下游输出RecordWriterOutput
for(int i = 0;
i < outEdgesInOrder.size();
++i) {
StreamEdge outEdge = (StreamEdge)outEdgesInOrder.get(i);
RecordWriterOutput> streamOutput = this.createStreamOutput((RecordWriter)recordWriters.get(i), outEdge, (StreamConfig)chainedConfigs.get(outEdge.getSourceId()), containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// 2.调用createOutputCollector创建物理的算子链,返回一个chainEntryPoint
// 第一部分优化逻辑当中,有个StreamConfig,是用来存储数据的。
// 这个createOutputCollector方法主要做这么几件事:从StreamConfig中取出链边以及出边的一个数据(意思是取出所有的数据)
// 根据取出的数据各自创建Output,将数据写入第一步生成的RecordWriterOutput
List>> allOps = new ArrayList(chainedConfigs.size());
this.chainEntryPoint = this.createOutputCollector(containingTask, configuration, chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
if (operatorFactory != null) {
OperatorChain.WatermarkGaugeExposingOutput> output = this.getChainEntryPoint();
this.headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
this.headOperator.getMetricGroup().gauge("currentOutputWatermark", output.getWatermarkGauge());
} else {
this.headOperator = null;
}allOps.add(this.headOperator);
this.allOperators = (StreamOperator[])allOps.toArray(new StreamOperator[allOps.size()]);
success = true;
var20 = false;
} finally {
if (var20) {
if (!success) {
RecordWriterOutput[] var15 = this.streamOutputs;
int var16 = var15.length;
for(int var17 = 0;
var17 < var16;
++var17) {
RecordWriterOutput> output = var15[var17];
if (output != null) {
output.close();
}
}
}}
}
而createOutputCollector方法里面又有一个很重要的方法:getChainedOutputs()
主要用于输出结果,通过不断延伸Output来产生chainedOperator,并逆序返回。
private OperatorChain.WatermarkGaugeExposingOutput> createOutputCollector(StreamTask, ?> containingTask, StreamConfig operatorConfig, Map chainedConfigs, ClassLoader userCodeClassloader, Map, RecordWriterOutput>> streamOutputs, List>> allOperators) {
List>, StreamEdge>> allOutputs = new ArrayList(4);
Iterator var8 = operatorConfig.getNonChainedOutputs(userCodeClassloader).iterator();
StreamEdge outputEdge;
while(var8.hasNext()) {
outputEdge = (StreamEdge)var8.next();
RecordWriterOutput output = (RecordWriterOutput)streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2(output, outputEdge));
}var8 = operatorConfig.getChainedOutputs(userCodeClassloader).iterator();
int i;
while(var8.hasNext()) {
outputEdge = (StreamEdge)var8.next();
i = outputEdge.getTargetId();
StreamConfig chainedOpConfig = (StreamConfig)chainedConfigs.get(i);
OperatorChain.WatermarkGaugeExposingOutput> output = this.createChainedOperator(containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge.getOutputTag());
allOutputs.add(new Tuple2(output, outputEdge));
}List> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
if (selectors != null && !selectors.isEmpty()) {
return (OperatorChain.WatermarkGaugeExposingOutput)(containingTask.getExecutionConfig().isObjectReuseEnabled() ? new CopyingDirectedOutput(selectors, allOutputs) : new DirectedOutput(selectors, allOutputs));
} else if (allOutputs.size() == 1) {
return (OperatorChain.WatermarkGaugeExposingOutput)((Tuple2)allOutputs.get(0)).f0;
} else {
Output>[] asArray = new Output[allOutputs.size()];
for(i = 0;
i < allOutputs.size();
++i) {
asArray[i] = (Output)((Tuple2)allOutputs.get(i)).f0;
}return (OperatorChain.WatermarkGaugeExposingOutput)(containingTask.getExecutionConfig().isObjectReuseEnabled() ? new OperatorChain.CopyingBroadcastingOutputCollector(asArray, this) : new OperatorChain.BroadcastingOutputCollector(asArray, this));
}
}
OperatorChain小总结 小总结就是:
- 在JobGraph转换成ExecutionGraph会生成一个OperatorChain对象。
- OperatorChain的构造方法中:主要做了3件事情
- 调用createStreamOutput()创建对应的下游输出RecordWriterOutput:
- 调用createOutputCollector()将优化逻辑计划当中Chain中的StreamConfig(也就是数据)写入到第三步创建的RecordWriterOutput中
- 通过调用getChainedOutputs()输出结果RecordWriterOutput(包含于第四步)
![Flink-OperatorChain源码详解](https://img.it610.com/image/info8/e1527c24697d45dc88c4c904d7332117.jpg)
文章图片
推荐阅读
- 读Flink源码谈设计(图的抽象与分层)
- 一种基于Flink Window的实时指标统计方法
- Flink-使用checkpoint和savepoint进行快照恢复
- Flink-State/Checkpoint和Savepoint的详解
- Flink-sink的种类和基本使用
- Flink-dataStream的种类和基本使用
- flink|Flink的State与Rescale
- flink|flink on yarn启动流程分析
- flink|Flink heartbeat逻辑梳理