Spark|DAGScheduler之Job的提交划分Stage

整体流程图
Spark|DAGScheduler之Job的提交划分Stage
文章图片

源码分析 spark 2.3
getOrCreateParentStages 创建所有祖先Stage

/** * Get or create the list of parent stages for a given RDD.The new Stages will be created with * the provided firstJobId. */ private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { // getShuffleDependencies 获取RDD的第一层直接宽依赖 getShuffleDependencies(rdd).map { shuffleDep => //getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }

getShuffleDependencies 获取RDD的第一层直接宽依赖
/** * Returns shuffle dependencies that are immediate parents of the given RDD. * * This function will not return more distant ancestors.For example, if C has a shuffle * dependency on B which has a shuffle dependency on A: * * A <-- B <-- C * * calling this function with rdd C will only return the B <-- C dependency. * * This function is scheduler-visible for the purpose of unit testing. */ private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { // 返回 所有的第一层宽依赖 case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.push(dependency.rdd) } } } parents }

getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
/** * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the * shuffle map stage doesn't already exist, this method will create the shuffle map stage in * addition to any missing ancestor shuffle map stages. */ private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stagecase None => // Create stages for all missing ancestor shuffle dependencies. // 深度遍历获取所有祖先宽依赖,按照祖先->子辈的顺序 处理宽依赖 getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies // that were not already in shuffleIdToMapStage, it's possible that by the time we // get to a particular dependency in the foreach loop, it's been added to // shuffleIdToMapStage by the stage creation process for an earlier dependency. See // SPARK-13902 for more information. if (!shuffleIdToMapStage.contains(dep.shuffleId)) { // 创建宽依赖 createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }

getMissingAncestorShuffleDependencies 深度遍历获取所有祖先宽依赖
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getMissingAncestorShuffleDependencies( rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit getShuffleDependencies(toVisit).foreach { shuffleDep => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { // 子辈宽依赖先压栈 ancestors.push(shuffleDep) waitingForVisit.push(shuffleDep.rdd) } // Otherwise, the dependency and its ancestors have already been registered. } } } // 返回宽依赖 堆栈 ancestors }

例子
RDDs原始依赖图 Spark|DAGScheduler之Job的提交划分Stage
文章图片

getShuffleDependencies Spark|DAGScheduler之Job的提交划分Stage
文章图片

####getMissingAncestorShuffleDependencies
Spark|DAGScheduler之Job的提交划分Stage
文章图片

最后划分结果 【Spark|DAGScheduler之Job的提交划分Stage】Spark|DAGScheduler之Job的提交划分Stage
文章图片

    推荐阅读