Shuffle
MapReduce确保每个Reducer的输入都是按键排序的。
系统执行排序的过程,即,将map输出作为输入传给Reducer的过程称为Shuffle。
了解Shuffle的过程,有助于我们理解MapReduce的工作机制。
文章图片
Map端
map函数开始产生输出时,并不是简单的将它写到磁盘,而是利用缓冲的方式写到内存,并出于效率考虑,进行排序。
【MapReduce的Shuffle和排序】1)每个输入分片由一个Map任务处理。(HDFS一个块的大小默认为128M,可以设置块的大小)
2)map输出的结果会暂存在一个环形内存缓冲区中。(缓冲区默认大小为100M,由io.sort.mb属性控制)
3)当缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),由一个后台线程将该缓冲区中的数据写到磁盘新建的溢出文件中。在溢出写到磁盘的过程中,map输出继续写到缓冲区,但是如果在此期间缓冲区被填满,map会被阻塞直到写磁盘过程完成。
4)在写入磁盘之前,线程首先根据Reduce任务的数目将数据划分为相同数目的分区,也就是一个Reduce任务对应一个分区的数据,这样避免Reduce任务分配到的数据不均匀。(分区就是对数据进行Hash的过程);
5)然后对每个分区中的数据进行排序(第一次排序);
6)如果此时设置了Combiner,将排序后的结果进行Combia操作,使得Map输出结果更紧凑,使得让尽可能少的数据写入到磁盘和传递给Reducer;
7)当Map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并,合并的过程中会不断地进行排序和Combia操作。(属性io.sort.factor控制一次最多合并多少流,默认10)。这样做的目的1,尽量减少每次写入磁盘的数据量,目的2,尽量减少下一复制阶段网络传输的数据量。最后合并成一个已分区且已排序的文件(第二次排序)。
8)为了减少网络传输数据量,节约磁盘空间,可以在这里将数据压缩。(mapred.compress.map.out设置为ture,mapred.map.output.compression.codec指定使用的压缩库)
9)将分区中的数据拷贝给相对应的Reduce任务。Reducer通过HTTP方式得到输出文件的分区。
Reduce端
1)Reduce会接收到不同Map任务传来的数据,并且每个Map传来的数据都是有序的。
2)如果Reduce端接收的数据量少,则直接存在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制);如果数据量超过了缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定)则对数据合并后溢写到磁盘中。
3)随着溢写文件的增多,后台线程会将这些文件合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间;
4)复制完所有Map输出后,Reduce任务进入排序阶段,这个阶段将合并Map输出,维持其顺序排序(第三次排序),这是循环进行的。例如,有50个Map输出,而合并因子默认是10,合并会进行5次,每次将10个文件合并成一个文件,过程中产生5个中间文件。
5)合并的过程中会产生许多的中间文件写入磁盘,但MapReduce会让写入磁盘的数据尽可能少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到Reduce函数。
6)在Reduce阶段,对已排序输出中的每个键调用Reduce函数,此阶段的输出直接写入到输出文件系统HDFS。
Shuffle过程配置调优
Sort排序
作业的进度组成
推荐阅读
- 大数据|hadoop安装
- 大数据|hbase安装
- Gank Spark
- hadoop|Import/Export实现hbase集群间迁移
- 解决(Some projects cannot be imported because they already exist in the workspace)
- centos中修改时区及时间的方法
- 利用Hadoop平台进行大规模(百万以上)中文网页聚类
- 大数据|HBase导出CSV格式数据的方法
- Hadoop 技术生态体系
- spark|Spark,SparkSql wordCount,java wordcount