Flink-OperatorChain源码详解


Flink-OperatorChain源码详解

  • 前言
  • 逻辑计划中的算子链
  • StreamingJobGraphGenerator(优化逻辑)
    • createChain核心方法(创建chain)
    • isChainable方法(判断operator是否可以加入chain):
    • 划分chain的依据
    • flink中chain的3种链接策略:
  • 物理逻辑中的算子链(ExecutionGraph)
    • OperatorChain小总结

前言 参考博客:LittleMagics的深入分析Flink的operator chain(算子链)机制
我们来看下flink-web页面中的JobGraph图
这里说明几点:
  1. 每一个框框代表一个Task,一个Task在一个线程当中运行。
  2. 每个Task中可以包含一个或者多个subTask。
  3. 每个Task中的并行度代表着subTask的个数。
  4. 多个subTask连接在一起就形成了一个OperatorChain
Flink-OperatorChain源码详解
文章图片

学过spark的同学可能知道:
spark的RDD主要有两种:transformation与action
一个action可以把之前的trans划分为一个stage。
而flink的chain可以形似与spark的stage。
问题来了:flink如何把operator去放到一个chain里面呢?
flink-web页面中流程图也就是JobGraph,底层是通过StreamingJobGraphGenerator这个类去实现的。
逻辑计划中的算子链 flink的任务作业分为三层结构:
  1. StreamGraph——原始逻辑执行计划
  2. JobGraph——优化的逻辑执行计划(Flink-Web中能看到的图,也就执行一个个dataStream,如map等)
  3. ExecutionGraph——物理执行计划(执行了sink操作)
StreamingJobGraphGenerator(优化逻辑) 该类的主要作用是生成JobGraph,以下的createJobGraph为主要的方法。
private JobGraph createJobGraph() { this.jobGraph.setScheduleMode(this.streamGraph.getScheduleMode()); // 1.通过traverseStreamGraphAndGenerateHashes方法计算出每个节点的哈希码作为唯一标识 //然后把这个唯一标识存储到Map当中,这里返回的是Map分别代表节点Id以及节点哈希码 //源码中是hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash)); Map hashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph); // 2.扑朔图当中,一个节点的数据包含了节点Id以及他的哈希码,这个数据是保存在Map当中的 //再通过迭代器把所有的节点数据保存到List当中 List> legacyHashes = new ArrayList(this.legacyStreamGraphHashers.size()); Iterator var3 = this.legacyStreamGraphHashers.iterator(); while(var3.hasNext()) { StreamGraphHasher hasher = (StreamGraphHasher)var3.next(); legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(this.streamGraph)); } // 3.创建一个空Map集合,用于存储被chain在一起的subTask。 Map>> chainedOperatorHashes = new HashMap(); // 4.创建Chaining,开始把单个的节点组装成一个Chain了(核心) // 参数1:保存Id,哈希码的Map(第一步) // 参数2:保存了各个节点Map的List集合(第二步) // 参数3:空Map集合(第三步) this.setChaining(hashes, legacyHashes, chainedOperatorHashes); this.setPhysicalEdges(); this.setSlotSharingAndCoLocation(); this.configureCheckpointing(); JobGraphGenerator.addUserArtifactEntries(this.streamGraph.getUserArtifacts(), this.jobGraph); try { this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig()); } catch (IOException var5) { throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered"); }return this.jobGraph; }

重点来看一下setChaining方法
this.setChaining(hashes, legacyHashes, chainedOperatorHashes);

源码:
private void setChaining(Map hashes, List> legacyHashes, Map>> chainedOperatorHashes) { Iterator var4 = this.streamGraph.getSourceIDs().iterator(); while(var4.hasNext()) { Integer sourceNodeId = (Integer)var4.next(); // createChain最重要,其他的不用看,也就是个遍历方法 this.createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes); }}

createChain核心方法(创建chain) 这里再看一下createChain的方法:
private List> createChain(Integer startNodeId, Integer currentNodeId, Map hashes, List> legacyHashes, int chainIndex, Map>> chainedOperatorHashes) { if (this.builtVertices.contains(startNodeId)) { return new ArrayList(); } else { // 最终的返回结果(可以方法的最后return),也是当前算子链在JobGraph中的出边列表 List> transitiveOutEdges = new ArrayList(); // 当前能够放到一个chain的节点列表 List> chainableOutputs = new ArrayList(); // 当前不能够放到一个chain的节点列表 List> nonChainableOutputs = new ArrayList(); StreamNode currentNode = this.streamGraph.getStreamNode(currentNodeId); Iterator var11 = currentNode.getOutEdges().iterator(); // 递归调用1:分离节点,分成两个组: //组1:chainableOutputs:可以组成chain的节点列表 //组2:nonChainableOutputs:不可以组成chain的节点列表 StreamEdge nonChainable; while(var11.hasNext()) { nonChainable = (StreamEdge)var11.next(); // 判断当前的节点是否能放到一个chain里面 if (isChainable(nonChainable, this.streamGraph)) { // 如果能放到chain里面就放入chainableOutputs,否则放入nonChainableOutputs chainableOutputs.add(nonChainable); } else { nonChainableOutputs.add(nonChainable); } }// 递归调用2:对于可以放到chain中的边,继续调用createChain方法,用于延伸算子链 var11 = chainableOutputs.iterator(); while(var11.hasNext()) { nonChainable = (StreamEdge)var11.next(); transitiveOutEdges.addAll(this.createChain(startNodeId, nonChainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); } // 递归调用3:对于不可以放到chain中的边,到这里chain就被划分成一块了 // 然后调用createChain创建一个新的算子链。 var11 = nonChainableOutputs.iterator(); while(var11.hasNext()) { nonChainable = (StreamEdge)var11.next(); transitiveOutEdges.add(nonChainable); this.createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); }List> operatorHashes = (List)chainedOperatorHashes.computeIfAbsent(startNodeId, (k) -> { return new ArrayList(); }); byte[] primaryHashBytes = (byte[])hashes.get(currentNodeId); OperatorID currentOperatorId = new OperatorID(primaryHashBytes); Iterator var14 = legacyHashes.iterator(); while(var14.hasNext()) { Map legacyHash = (Map)var14.next(); operatorHashes.add(new Tuple2(primaryHashBytes, legacyHash.get(currentNodeId))); } // 这里就是一个chain的一些信息的加载 this.chainedNames.put(currentNodeId, this.createChainedName(currentNodeId, chainableOutputs)); this.chainedMinResources.put(currentNodeId, this.createChainedMinResources(currentNodeId, chainableOutputs)); this.chainedPreferredResources.put(currentNodeId, this.createChainedPreferredResources(currentNodeId, chainableOutputs)); // 判断当前节点是不是算子链的一个起始节点 if (currentNode.getInputFormat() != null) { this.getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat()); }if (currentNode.getOutputFormat() != null) { this.getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat()); } // 如果是起始节点,那么把数据写入到StreamConfig中 // createJobVertex方法用来创建一个JobGraph中的节点,也就是我们在Flink Web上看到的一个节点 StreamConfig config = currentNodeId.equals(startNodeId) ? this.createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration()); this.setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); if (currentNodeId.equals(startNodeId)) { config.setChainStart(); config.setChainIndex(0); config.setOperatorName(this.streamGraph.getStreamNode(currentNodeId).getOperatorName()); config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(this.streamGraph.getStreamNode(currentNodeId).getOutEdges()); Iterator var20 = transitiveOutEdges.iterator(); while(var20.hasNext()) { StreamEdge edge = (StreamEdge)var20.next(); this.connect(startNodeId, edge); }config.setTransitiveChainedTaskConfigs((Map)this.chainedConfigs.get(startNodeId)); } else { this.chainedConfigs.computeIfAbsent(startNodeId, (k) -> { return new HashMap(); }); config.setChainIndex(chainIndex); StreamNode node = this.streamGraph.getStreamNode(currentNodeId); config.setOperatorName(node.getOperatorName()); ((Map)this.chainedConfigs.get(startNodeId)).put(currentNodeId, config); }config.setOperatorID(currentOperatorId); if (chainableOutputs.isEmpty()) { config.setChainEnd(); }return transitiveOutEdges; } }

isChainable方法(判断operator是否可以加入chain): 用于判断当前算子是否能加入到一个chain里面
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); StreamOperatorFactory headOperator = upStreamVertex.getOperatorFactory(); StreamOperatorFactory outOperator = downStreamVertex.getOperatorFactory(); // 判断 return downStreamVertex.getInEdges().size() == 1 // 上下游算子不能为空 && outOperator != null && headOperator != null // 上下游算子实例处于同一个SlotSharingGroup中 && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 下游算子的链接策略必须是always && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS // 上游算子的链接策略是always或者head && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) // 两个算子间的物理分区逻辑是ForwardPartitioner && edge.getPartitioner() instanceof ForwardPartitioner // 两个算子间的shuffle方式不等于批处理模式 && edge.getShuffleMode() != ShuffleMode.BATCH // 上下游算子实例的并行度相同 && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // 没有禁用算子链 && streamGraph.isChainingEnabled(); }

划分chain的依据 总结下:上下游算子,也就是operator能放到一个chain的条件有:
  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  3. 上下游节点都在同一个 slot group 中(下面会解释 slot group)
  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
  6. 两个节点间物理分区逻辑是 ForwardPartitioner
  7. 用户没有禁用 chain
  8. 前后算子不为空
第七点中提到了禁用chain,这里延伸一下相关方法:
// 指定一个operator不参与算子链 @PublicEvolving public SingleOutputStreamOperator disableChaining() { return setChainingStrategy(ChainingStrategy.NEVER); } // 手动start一个新的chain @PublicEvolving public SingleOutputStreamOperator startNewChain() { // 这里可以发现改变的是chain的一个策略。从头部开始,也就是重新创建一个chain return setChainingStrategy(ChainingStrategy.HEAD); }

flink中chain的3种链接策略: 因为我们平常是不会去更改这个链接策略的,而基本上flink-chain的链接策略默认为always,因此这里简单介绍下。
如果是always代表着什么?
这意味着,只要它们共享相同的插槽并与前向通道连接,就将跳过网络/本地通道,并将记录直接交给下一个转换。
@PublicEvolving public enum ChainingStrategy { ALWAYS, NEVER, HEAD; private ChainingStrategy() { } }

物理逻辑中的算子链(ExecutionGraph) 前两种StreamGraph、JobGraph是在客户端生成。
ExecutionGraph在jobMaster中生成,最后一种物理执行图是一种虚拟的图,不存在的数据结构,运行在每一个TaskExecutor中。在JobGraph转换成ExecutionGraph并交由TaskManager执行之后,会生成调度执行的基本任务单元——StreamTask,负责执行具体的StreamOperator逻辑。而在启动的过程中,会生成一个OperatorChain对象,他是算子链被具体执行时候的一个状态实例。
OperatorChain类:
@Internal public class OperatorChain> implements StreamStatusMaintainer { private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class); // 算子链中的所有算子,倒序排列,即headOperator位于该数组的末尾; private final StreamOperator[] allOperators; // 算子链的输出,可以有多个 private final RecordWriterOutput[] streamOutputs; // 算子链的入口点 private final OperatorChain.WatermarkGaugeExposingOutput> chainEntryPoint; // 算子链的第一个算子,对应JobGraph中的算子链起始节点; private final OP headOperator; private StreamStatus streamStatus; private InputSelection finishedInputs;

这里说明几点:
  1. 一个OperatorChain可以包括:HeadOpeartor(第一个算子)和StreamOpeartor(其他算子的统称)
  2. 如果一个算子无法进入OperatorChain,那么他会单独形成一个OperatorChain(但是只包含了HeadOpeartor)
这里放出OperatorChain构造方法中的代码核心部分
try { var20 = true; // 1.遍历整个算子链的所有输出边 //不断的调用createStreamOutput()方法创建对应的下游输出RecordWriterOutput for(int i = 0; i < outEdgesInOrder.size(); ++i) { StreamEdge outEdge = (StreamEdge)outEdgesInOrder.get(i); RecordWriterOutput streamOutput = this.createStreamOutput((RecordWriter)recordWriters.get(i), outEdge, (StreamConfig)chainedConfigs.get(outEdge.getSourceId()), containingTask.getEnvironment()); this.streamOutputs[i] = streamOutput; streamOutputMap.put(outEdge, streamOutput); } // 2.调用createOutputCollector创建物理的算子链,返回一个chainEntryPoint // 第一部分优化逻辑当中,有个StreamConfig,是用来存储数据的。 // 这个createOutputCollector方法主要做这么几件事:从StreamConfig中取出链边以及出边的一个数据(意思是取出所有的数据) // 根据取出的数据各自创建Output,将数据写入第一步生成的RecordWriterOutput List> allOps = new ArrayList(chainedConfigs.size()); this.chainEntryPoint = this.createOutputCollector(containingTask, configuration, chainedConfigs, userCodeClassloader, streamOutputMap, allOps); if (operatorFactory != null) { OperatorChain.WatermarkGaugeExposingOutput> output = this.getChainEntryPoint(); this.headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output); this.headOperator.getMetricGroup().gauge("currentOutputWatermark", output.getWatermarkGauge()); } else { this.headOperator = null; }allOps.add(this.headOperator); this.allOperators = (StreamOperator[])allOps.toArray(new StreamOperator[allOps.size()]); success = true; var20 = false; } finally { if (var20) { if (!success) { RecordWriterOutput[] var15 = this.streamOutputs; int var16 = var15.length; for(int var17 = 0; var17 < var16; ++var17) { RecordWriterOutput output = var15[var17]; if (output != null) { output.close(); } } }} }

而createOutputCollector方法里面又有一个很重要的方法:getChainedOutputs()
主要用于输出结果,通过不断延伸Output来产生chainedOperator,并逆序返回。
private OperatorChain.WatermarkGaugeExposingOutput> createOutputCollector(StreamTask containingTask, StreamConfig operatorConfig, Map chainedConfigs, ClassLoader userCodeClassloader, Map, RecordWriterOutput> streamOutputs, List> allOperators) { List>, StreamEdge>> allOutputs = new ArrayList(4); Iterator var8 = operatorConfig.getNonChainedOutputs(userCodeClassloader).iterator(); StreamEdge outputEdge; while(var8.hasNext()) { outputEdge = (StreamEdge)var8.next(); RecordWriterOutput output = (RecordWriterOutput)streamOutputs.get(outputEdge); allOutputs.add(new Tuple2(output, outputEdge)); }var8 = operatorConfig.getChainedOutputs(userCodeClassloader).iterator(); int i; while(var8.hasNext()) { outputEdge = (StreamEdge)var8.next(); i = outputEdge.getTargetId(); StreamConfig chainedOpConfig = (StreamConfig)chainedConfigs.get(i); OperatorChain.WatermarkGaugeExposingOutput> output = this.createChainedOperator(containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge.getOutputTag()); allOutputs.add(new Tuple2(output, outputEdge)); }List> selectors = operatorConfig.getOutputSelectors(userCodeClassloader); if (selectors != null && !selectors.isEmpty()) { return (OperatorChain.WatermarkGaugeExposingOutput)(containingTask.getExecutionConfig().isObjectReuseEnabled() ? new CopyingDirectedOutput(selectors, allOutputs) : new DirectedOutput(selectors, allOutputs)); } else if (allOutputs.size() == 1) { return (OperatorChain.WatermarkGaugeExposingOutput)((Tuple2)allOutputs.get(0)).f0; } else { Output>[] asArray = new Output[allOutputs.size()]; for(i = 0; i < allOutputs.size(); ++i) { asArray[i] = (Output)((Tuple2)allOutputs.get(i)).f0; }return (OperatorChain.WatermarkGaugeExposingOutput)(containingTask.getExecutionConfig().isObjectReuseEnabled() ? new OperatorChain.CopyingBroadcastingOutputCollector(asArray, this) : new OperatorChain.BroadcastingOutputCollector(asArray, this)); } }

OperatorChain小总结 小总结就是:
  1. 在JobGraph转换成ExecutionGraph会生成一个OperatorChain对象。
  2. OperatorChain的构造方法中:主要做了3件事情
  3. 调用createStreamOutput()创建对应的下游输出RecordWriterOutput:
  4. 调用createOutputCollector()将优化逻辑计划当中Chain中的StreamConfig(也就是数据)写入到第三步创建的RecordWriterOutput中
  5. 通过调用getChainedOutputs()输出结果RecordWriterOutput(包含于第四步)
【Flink-OperatorChain源码详解】最终形成以下的结果:
Flink-OperatorChain源码详解
文章图片

    推荐阅读