Spark - Task的执行过程(一)
前面讲了Task的创建、提交,以及对Task进行资源调度与分配,对于Task的实现细节一笔带过,所以这篇开始讲解Task的执行过程。Task又分为ShuffleMapTask和ResultTask,我们分开来讲。
ShuffleMapTask
【Spark - Task的执行过程(一)】ShuffleMapTask进行写入的时候,有三种方式,分别是UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter。而最终使用哪个ShuffleWriter是取决于RDD依赖的ShuffleHandle。
如果不需要map端进行合并且分区数小于等于200,则ShuffleHandle为BypassMergeSortShuffleHandle,最终的ShuffleWriter是BypassMergeSortShuffleWriter。
如果不需要map端进行合并且分区数<16777216、Serializer支持relocation则ShuffleHandle为SerializedShuffleHandle,最终的ShuffleWriter是UnsafeShuffleWriter。
其他情况下,ShuffleHandle为BaseShuffleHandle,最终的ShuffleWriter是SortShuffleWriter。
文章图片
下面就先介绍BypassMergeSortShuffleWriter的写入过程,其他两个留后面讲。
BypassMergeSortShuffleWriter
我们假设有2个分区,这样最终就有2个Task,假设每个Task发给一个Executor,下面看看Executor是怎么处理每个Task中RDD计算的数据。
文章图片
首先Executor会根据分区的数量,创建同等数量的DiskBlockObjectWriter。由于我们的分区数是2,所以DiskBlockObjectWriter是个数也是2,每一个DiskBlockObjectWriter处理一个分区的数据,最后把map任务的输出写入磁盘。每个DiskBlockObjectWriter都维持着BlockId以及他的文件File。
文章图片
DiskBlockObjectWriter准备好后,开始迭代RDD计算的结果records,假设第一条是(a,1),使用分区计算器算出a这个key的分区,比如左边的分区0,然后让左边的DiskBlockObjectWriter把(a,1)写入到Shuffle文件的输出流中。同理,(b,1),(c,1),(d,1)也假设都在左边,(e,1),(f,1),(g,1),(h,1)根据key的分区计算,都在右边。
文章图片
records结果都处理完后,DiskBlockObjectWriter就会把输出流中的数据写入到磁盘,文件的信息、索引的位置,都交给FileSegment来处理。
文章图片
此时已经生成了两个临时的Shuffle文件,为了减少网络的IO,这些临时文件需要进行合并。所以会把这两个临时的Shuffle文件内容,写入到Shuffle数据文件,并记录每个文件的长度(比如4,4)。
文章图片
写入到Shuffle数据文件后,临时文件就没有用了,就把他们都删除。
文章图片
此时还要一个索引文件,因为我们把两个文件的内容合并了,需要知道原先每个文件里有哪些东西,上面我们不是保存了文件的长度(4,4),所以根据这个得到了索引的值(0,4,8)。这样我们从文件里拿到对应的数据,就可以通过索引了。
文章图片
ResultTask
如果一个RDD的数据,最终发到3个Executor,每个Executor都只处理一个Task,那最后会有3个dataFile文件。
文章图片
最后执行ResultTask的时候,就会从这些dataFile文件拿出数据,调用函数进行最终的处理,得到最后的结果。
文章图片
推荐阅读
- 热闹中的孤独
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 放屁有这三个特征的,请注意啦!这说明你的身体毒素太多
- 一个人的旅行,三亚
- 布丽吉特,人生绝对的赢家
- 慢慢的美丽
- 尽力
- 一个小故事,我的思考。
- 家乡的那条小河
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量