Spark - TaskSet提交

stage切分后,就会创建一个Job,所以在一个Application中,如果有多个action算子,那就有多少个Job,每个Job根据shuffle依赖,切分成多个stage。最后生成的这个job会把引用给finalStage,也就是上篇的stage4里。Job生成后,就开始提交stage,我们以stage切分的流程往下讲。主要是先找到未提交的stage,然后根据stage的分区数生成对应的task个数,封装到TaskSet进行提交。
Spark - TaskSet提交
文章图片

这个流程和切分有点类似,都是从右边往左边寻找。
stage4 首先拿到的是最右边的stage4,会把stage4.rddz(这里就是RDD11)压入waitingForVisit栈中,这个waitingForVisit栈就是存放窄依赖的,然后通过窄依赖的依赖去查找shuffle依赖。此外还有两个数据结构,missing集合,用来存放未提交的stage,visited集合,用来存放已经遍历过的RDD。
Spark - TaskSet提交
文章图片

此时waitingForVisit栈中已经有RDD,就会把RDD[11]拿出来,RDD[11]是没有遍历过的,所以会放入visited集合,另外RDD[11]的依赖RDD[10]是窄依赖,所以RDD[10]就会压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

waitingForVisit栈发现还有RDD,就会把RDD[10]拿出来,RDD[10]是没有遍历过的,所以会放入visited集合,另外RDD[10]的依赖RDD[9]是窄依赖,所以RDD[9]就会压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

waitingForVisit栈发现还有RDD,就会把RDD[9]拿出来,RDD[9]是没有遍历过的,所以会放入visited集合,另外RDD[9]的依赖RDD[4]、RDD[8]是shuffle依赖,所以stage3、state0会放入missing集合。
Spark - TaskSet提交
文章图片

waitingForVisit栈已经为空了,missing集合不为空,所以继续从stage3、state0继续上面stage4的操作。
stage0 stage0的rdd,即RDD8压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

此时waitingForVisit栈中已经有RDD,就会把RDD[8]拿出来,RDD[8]是没有遍历过的,所以会放入visited集合,另外RDD[8]的依赖RDD[6]、RDD[7]是窄依赖,所以RDD[6]、RDD[7]就会压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

waitingForVisit栈发现还有RDD,就会把RDD[7]拿出来,RDD[7]是没有遍历过的,所以会放入visited集合,另外RDD[7]没有任何依赖,于是不做处理。
Spark - TaskSet提交
文章图片

waitingForVisit栈发现还有RDD,就会把RDD[6]拿出来,RDD[6]是没有遍历过的,所以会放入visited集合,另外RDD[6]的依赖RDD[5]是窄依赖,所以RDD[5]就会压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

waitingForVisit栈发现还有RDD,就会把RDD[5]拿出来,RDD[5]是没有遍历过的,所以会放入visited集合,另外RDD[5]没有任何依赖,于是不做处理。
Spark - TaskSet提交
文章图片

waitingForVisit栈已经为空了,这次RDD[8]并没有任何shuffle依赖,于是开始创建TaskSet。
stage0的分区数是4,所以会把数据存入4个Task,stage0是ShuffleMapStage,所以这个Task就是ShuffleMapTask,最后把Task集合封装在TaskSet中,交给TaskSchedulerImpl处理。
stage3 【Spark - TaskSet提交】stage0处理完后,就开始处理stage3,把RDD4压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

此时waitingForVisit栈中已经有RDD,就会把RDD[4]拿出来,RDD[4]是没有遍历过的,所以会放入visited集合,另外RDD[8]的依赖RDD[3]是窄依赖,所以RDD[3]就会压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

waitingForVisit栈发现还有RDD,就会把RDD[3]拿出来,RDD[3]是没有遍历过的,所以会放入visited集合,另外RDD[3]的依赖RDD[2]是窄依赖,所以RDD[2]就会压入waitingForVisit栈中。
Spark - TaskSet提交
文章图片

waitingForVisit栈发现还有RDD,就会把RDD[2]拿出来,RDD[2]是没有遍历过的,所以会放入visited集合,另外RDD[2]的依赖RDD[1]、RDD[0]是shuffle依赖,所以stage1、state2会放入missing集合。
Spark - TaskSet提交
文章图片

waitingForVisit栈已经为空了,并且stage1、state2并没有任何shuffle依赖,于是开始创建TaskSet。
stage1、state2的分区数是2,所以会把数据存入2个Task,stage1、state2是ShuffleMapStage,所以这两个stage的Task都是ShuffleMapTask,最后把Task集合封装在TaskSet中,交给TaskSchedulerImpl处理。
此时最左边的三个stage都已经封装好TaskSet,TaskSet的提交任务结束。

    推荐阅读