Spark - Task的执行过程(三)- SortShuffleWriter

前面已经介绍了BypassMergeSortShuffleWriter和UnsafeShuffleWriter两种ShuffleWriter实现,这里开始SortShuffleWriter的讲解。
SortShuffleWriter使用ExternalSorter作为排序器,ExternalSorter又包含了具有聚合功能的PartitionedAppendOnlyMap和没有聚合功能的PartitionedPairBuffer这两种缓存,所以SortShuffleWriter既有支持排序的功能,也支持聚合的功能。
聚合 聚合是通过PartitionedAppendOnlyMap来处理的,所以记录会迭代输入给PartitionedAppendOnlyMap,下面聚合的例子均为reduce为算法。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

PartitionedAppendOnlyMap支持key是null值的,haveNullValue是用来判断是否已经有了key为null的值,nullValue是用来存储key为null的值。所以没有传入key为null的时候,haveNullValue为false,nullValue为null。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

此时records迭代的记录为(null,1),由于haveNullValue为false,直接赋值nullValue为1,并把haveNullValue就改为true。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(null,10),由于haveNullValue为true,则把nullValue的值1拿出来,并跟新值10进行相加,得到的11重新赋值给nullValue。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

如果key不为null,PartitionedAppendOnlyMap是有一个数组data来存储的,格式就是key0,value0,key1,value1的形式。key的格式是(partitionId, key)。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(a,1),我们假设a对应的分区为0,且a在hash及与mask进行与运算后,得到的pos为1,所以数组的2*1=2的位置就是key,由于此时key是空的,key的值为(0,a),2+1的位置,就是(0,a)对应的值1。
2*pos的2,是因为每个值都占有2个位置。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(a,10),此时这个key对应的位置是有值的,所以把旧值1拿出来,和新值10进行相加,得到的11存入key后面的位置。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(b,2),我们假设b对应的分区为1,且b在hash及与mask进行与运算后,得到的pos为2,所以数组的2*2=4的位置就是key,由于此时key是空的,key的值为(1,b),4+1的位置,就是(1,b)对应的值2。每次往空的key插入数据的时候,都会检验是否扩容。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(b,10),此时这个key对应的位置是有值的,所以把旧值2拿出来,和新值10进行相加,得到的12存入key后面的位置。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(c,3),我们假设c对应的分区为0,且b在hash及与mask进行与运算后,得到的pos为1,所以数组的2*1=2的位置就是key,由于此时key不为空,且key是(0,a),并不是(0,c),所以pos会进行加1并且与mask进行与运算后,重新获取pos,如果拿到的pos位置还是有其他的key,则pos再加1重新计算,直至pos位置并无其他key或者key为(0,c)。我们假设最后的位置是6。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

每次迭代records后,会看当前的内存是否超过了内存阈值,如果超过了,就会根据公式2 * currentMemory - myMemoryThreshold尝试获取内存,如果获取内存失败,说明已经没有多余的内存可以分配了,这个时候就会进行溢出。
溢出之前,先对data中的数据向前整理排列,就是往左边的空值进行迁移,按照分区ID的顺序进行重新排序。此时顺序就是(0,a),(0,c),(1,b)。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

然后再根据整理后的data以及haveNullValue、nullValue创建迭代器。这个迭代器会先访问nullValue,然后再迭代data。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

每写入1万次,就会有一个flush操作,把输出流中的数据真正写入到磁盘,并记录每个分区的元素个数以及每次写入磁盘的数据大小。
当迭代器迭代结束,就会释放占用的内存,并重新创建一个PartitionedAppendOnlyMap,每次溢出都会记录在spills数组中。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

当records的数据迭代完毕,就会根据内存的数据和溢出到磁盘的文件进行归并排序,最终合并到一个文件中,并记录每个分区的长度。最后根据分区的长度,生成索引文件。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

非聚合 【Spark - Task的执行过程(三)- SortShuffleWriter】聚合是通过PartitionedPairBuffer来处理的,所以记录会迭代输入给PartitionedPairBuffer,下面聚合的例子均为reduce为算法。
PartitionedPairBuffer里的结构就类似于一个集合,数据按照顺序进行插入,插入的时候也是每次插入2个,一个是kye一个是value,这个和PartitionedAppendOnlyMap一样。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(a,1),我们假设a对应的分区为0,所以直接在数组里插入(0,a)和1。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(b,2),我们假设b对应的分区为1,所以直接在数组里插入(1,b)和2。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(c,3),我们假设c对应的分区为0,所以直接在数组里插入(0,c)和3。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

records新迭代的记录为(a,4),我们假设a对应的分区为0,所以直接在数组里插入(0,a)和4,这里并没有聚合。
Spark - Task的执行过程(三)- SortShuffleWriter
文章图片

溢出、写文件、写索引后面的流程同聚合。

    推荐阅读