Spark - Task执行结果的处理
前面已经讲了每个action算子,都会进行stage切分,然后把每个stage根据分区创建TaskSet,根据资源运行Task,那这个stage就运行结束了。
那stage怎么知道自己各个分区已经运行结束了,stage与stage直接数据是怎么传递的,每个具有依赖的stage是怎么执行的,是本章的重点。
这里用之前的例子继续讲解。
文章图片
MapOutputTrackerMaster
【Spark - Task执行结果的处理】MapOutputTracker用于跟踪map任务的输出状态,在Drvier中的MapOutputTracker叫做MapOutputTrackerMaster,在Worker中的MapOutputTracker叫做MapOutputTrackerWorker。
在stage进行切分的时候,每个ShuffleMapStage都会把他的shuffleId以及自己的分区数量,注册到MapOutputTrackerMaster。MapOutputTrackerMaster中维护着一个map,这个map的key就是shuffleId,他的value是ShuffleStatus。所以我们之前的例子,就会有0到3的shuffleId注册到MapOutputTrackerMaster的map中。
文章图片
每个ShuffleStatus根据分区数,都会生成同等长度的类型为MapStatus数组mapStatuses,MapStatus表示ShuffleMapTask返回给TaskScheduler的执行结果。由于现在还没运行,所以数组mapStatuses是空的。这里只画了ShuffleStatus0,其他的ShuffleStatus也是一样的结构。
文章图片
waitingStages
这里补充一下TaskSet提交关于waitingStages的部分,因为waitingStages的作用是在这个篇幅中。
stage切分结束后,开始提交stage。此时stage4发现他有父类stage0和stage3,于是就先提交父类。
文章图片
stage0并没有其他父类,于是他就提交给TaskSchedulerImpl。
文章图片
stage3有两个父类stage1和stage2,于是就先提交父类。
文章图片
stage1并没有其他父类,于是他就提交给TaskSchedulerImpl,这个时候,stage0和stage1都已经提交了。
文章图片
stage2并没有其他父类,于是他就提交给TaskSchedulerImpl,这个时候,stage0和stage1、stage2都已经提交了。
文章图片
stage3的两个父类stage1和stage2都处理了,于是他就加入到加入到waitingStages。
文章图片
stage4的两个父类stage0和stage3都处理了,于是他就加入到加入到waitingStages。waitingStages里的stage并没有实际提交,所以此时是stage0和stage1、stage2开始运行,stage3、stage4等待运行。
文章图片
执行流程
Executor执行完task后,通过DriverEndpointRfe把发消息给Drvier的DriverEndpointRef,告知当前的task运行结束。
文章图片
Drvier收到消息后,发现状态是FINISHED,根据shuffleId和分区,更新MapOutputTrackerMaster的shuffleStatuses里的mapStatuses数组。我们假设是分区1完成了,此时mapStatuses[1]就有结果数据了,然后_numAvailableOutputs就加1。
文章图片
如果_numAvailableOutputs和分区数量numPartitions相等,即都等于4,说明这个stage就完成了。如果执行失败,就会把还没有完成的分区重新提交。
文章图片
当这个stage0成功后,发现waitingStages中有自己的子stage4,就把stage4从waitingStages拿出来,开始尝试提交,结果他发现stage4另外一个父stage3还没执行,于是就继续加入到waitingStages。
继续上面的流程,当stage1和stage2都执行完后,就会从waitingStages中拿出stage3继续执行,当stage3执行完后,stage4的两个父类都执行完了,所以stage4此时才开始执行。
从这里可以看出,具有依赖关系的会等依赖的先执行,是串行的,如果没有依赖关系的,会并发执行,这个是并行的。
MapOutputTrackerWorker
上面的stage3依赖着stage1和stage2,那stage3开始执行的时候,是需要stage1和stage2的执行结果的。
Executor的MapOutputTrackerWorker维护着map,key是shuffleId,value是MapStatus数组,对应着上面的mapStatuses。
文章图片
假设stage3拿stage1的时候,发现他并没有存储stage1的数据,于是就持有MapOutputTrackerMasterEndpoint引用的trackerEndpoint给MapOutputTrackerMasterEndpoint发送GetMapOutputStatuses消息。
文章图片
MapOutputTrackerMasterEndpoint收到消息后,就把消息发送给MapOutputTrackerMaster的阻塞队列mapOutputRequests。MapOutputTrackerMaster里有一个线程池,维护着线程来处理这个阻塞队列。当线程发送阻塞队列有消息后,就会开始处理这个消息,并把结果返回给请求方。
文章图片
Executor拿到数据后,就开始对数据进行计算。
推荐阅读
- CVE-2020-16898|CVE-2020-16898 TCP/IP远程代码执行漏洞
- Spark|Spark 数据倾斜及其解决方案
- 字符串拼接成段落,换行符(\n)如何只执行n-1次
- R语言|R语言 函数
- AsyncTask的个人小结
- 高效执行力第六课-小结
- 成功通航(用宜搭提升数字化管理效能,确保每次飞行任务安全执行)
- @逆战千锋|@逆战千锋 为什么sql语句执行之后表单中没有数据
- Spring|Spring Aop常见注解与执行顺序详解
- c#中task与thread区别及其使用的方法示例