MapReduce (基于 FileInputFormat 的 mapper 数量控制)

非淡泊无以明志,非宁静无以致远。这篇文章主要讲述MapReduce :基于 FileInputFormat 的 mapper 数量控制相关的知识,希望能为你提供帮助。
【MapReduce (基于 FileInputFormat 的 mapper 数量控制)】本篇分两部分,第一部分分析使用 java 提交 mapreduce 任务时对 mapper 数量的控制,第二部分分析使用 streaming 形式提交 mapreduce 任务时对 mapper 数量的控制。
 
环境:hadoop-3.0.2
前言:熟悉 hadoop mapreduce 的人可能已经知道,即使在程序里对 conf 显式地设置了 mapred.map.tasks 或  mapreduce.job.maps,程序也并没有运行期望数量的 mapper。
这是因为,mapper 的数量由输入的大小、HDFS 当前设置的 BlockSize、以及当前配置中的 split min size 和 split max size 等参数共同确定,并不会受到简单的人工设置 mapper num 的影响。
因此,对于 mapper num 的控制,需要我们理解 hadoop 中对于 FileInputFormat 类中 getSplit() 方法的实现,针对性地配置  BlockSize、split min size、split max size 等参数,才能达到目的。
重点:值得一提并且容易忽略的是,要区分 org.apache.hadoop.mapred.FileInputFormat类和 org.apache.hadoop.mapreduce.lib.input.FileInputFormat类,两者虽然相似,但在getSplit()上的实现是有区别的。
重要区别是,hadoop streaming 中使用的 InputFormat 类,使用的是  org.apache.hadoop.mapred.FileInputFormat,仅仅需要指定 mapreduce.job.maps ,就能够设置 mapper num了(具体源码分析在第二部分)。而使用JAVA设计的 mapreduce 任务中使用的 InputFormat 类,使用的是  org.apache.hadoop.mapreduce.lib.input.FileInputFormat,则需要通过配置BlockSize、split min size、split max size 等参数来间接性地控制 mapper num。
 
一、Java 本地提交 mapreduce 任务, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制
1. 在java本地编辑 mapreduce 任务,(默认)使用 FileInputFormat 类的子类 TextInputFormat

job.setInputFormatClass(TextInputFormat.class);


 
2. mapper 的切分逻辑在 FileInputFormat 类中的 getSplits()实现:
public List< InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start(); long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); List< InputSplit> splits = new ArrayList(); List< FileStatus> files = this.listStatus(job); Iterator var9 = files.iterator(); while(true) { while(true) { while(var9.hasNext()) { FileStatus file = (FileStatus)var9.next(); Path path = file.getPath(); long length = file.getLen(); if (length != 0L) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0L, length); }if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); }if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { if (LOG.isDebugEnabled() & & length > Math.min(file.getBlockSize(), minSize)) { LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath()); }splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, new String[0])); } }job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); }return splits; } } }

 
3. 最后确定 mapper 数量在这里:
1if (this.isSplitable(job, path)) { 2long blockSize = file.getBlockSize(); 3long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); 4 5long bytesRemaining; 6int blkIndex; 7for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { 8blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); 9splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); 10} 11 12if (bytesRemaining != 0L) { 13blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); 14splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); 15}

含义:
a. 当 this.isSplitable 开启时,只要当前未分配的大小 bytesRemaining 大于 splitSize 的 1.1 倍,就添加一个 inputSplit, 即一个mapper 被生成。 
b. 最后,不足 1.1 倍splitSize 的残余,补充为一个 mapper。因此,经常发现实际分配的 mapper 数比自己定义的会多 1 个。
c. 为什么设置1.1倍?避免将不足 0.1 倍 splitSize 的量分配为一个 mapper, 避免浪费。
 
4.  重要的两个量:BlockSize 和 splitSize
long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

其中,blockSize 是 hdfs 设置的,一般是 64MB 或 128MB,我的 hdfs 中为 128 MB = 132217728L。这个量可认为静态,我们不宜修改。
观察 splitSize 的获得:
1protected long computeSplitSize(long blockSize, long minSize, long maxSize) { 2return Math.max(minSize, Math.min(maxSize, blockSize)); 3}

在 getSplits()中找到 minSize, maxSize, blockSize 的赋值:
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job);

找到这些量的赋值、默认值:
maxSize 的 setter/getter:    除非用户重新设置,否则 maxSize 的默认值为 Long 的最大值 
1 public static void setMaxInputSplitSize(Job job, long size) { 2job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", size); 3} 4 5 public static long getMaxSplitSize(JobContext context) { 6return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L); 7}

minSize 的 setter/getter:  除非用户重新设置,否则 minSize 的默认值为 1L
protected long getFormatMinSplitSize() { return 1L; }public static void setMinInputSplitSize(Job job, long size) { job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", size); }public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L); }

 
因此容易算出,默认情况下,
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize) = Math.max(Math.max(1L,1L), Math.min(9223372036854775807L, 128M=132217728L)) = 132217728L = 128M


5. 控制 mapper 数量
知道了上面的计算过程,我们要控制 mapper,在 BlockSize 不能动的情况下,就必须控制 minSize 和 maxSize 了。这里主要控制 maxSize。 
 
TextInputFormat.setMinInputSplitSize(job, 1L); //设置minSize TextInputFormat.setMaxInputSplitSize(job, 10 * 1024 * 1024); //设置maxSize

 
测试输入文件大小为 40MB, 很小, 在默认情况下, 被分配为 1 个或 2 个 mapper 执行成功。
现在希望分配 4 个mapper:那么设置 maxSize 为10M ,那么 splitSize 计算为 10M。对于 40MB 的输入文件,理应分配 4 个mapper。
实际运行,运行了 5 个mapper,认为成功摆脱了默认启动 2 个mapper 的限制,额外多出的 1 个 mapper 则猜测是上文提到的,对残余量的补充 mapper。
 
6. 至此,对Java 本地提交 mapreduce 任务, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制方法如上。接下来讨论 streaming 使用的  org.apache.hadoop.mapred.FileInputFormat 的 mapper 控制。
 
二、streaming 提交 mapreduce 任务, org.apache.hadoop.mapred.FileInputFormat 的 mapper num 控制
1. 可通过  mapreduce.job.maps 直接控制,即使不是绝对精确。原因在下面的源码分析中可以看到。
1 hadoop dfs -rm -r -f /output & & 2 3 hadoop jar /opt/hadoop-3.0.2/share/hadoop/tools/lib/hadoop-streaming-3.0.2.jar 4 -D mapreduce.reduce.tasks=0 5 -D mapreduce.job.maps=7 6 -input /input 7 -output /output 8 -mapper "cat" 9 -inputformat TextInputFormat

 
2. 将  org.apache.hadoop.mapreduce.lib.input.FileInputFormat 中的 maxSize,尝试通过 streaming 的 -D 设置,是无效的。因为 streaming 使用的是  org.apache.hadoop.mapred.FileInputFormat,在下面的源码分析中可以看到。
 
3. 查看 FileInputFormat 的 getSplits 源码
1public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 2StopWatch sw = (new StopWatch()).start(); 3FileStatus[] files = this.listStatus(job); 4job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length); 5long totalSize = 0L; 6FileStatus[] var7 = files; 7int var8 = files.length; 8 9for(int var9 = 0; var9 < var8; ++var9) { 10FileStatus file = var7[var9]; 11if (file.isDirectory()) { 12throw new IOException("Not a file: " + file.getPath()); 13} 14 15totalSize += file.getLen(); 16} 17 18long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); 19long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); 20ArrayList< FileSplit> splits = new ArrayList(numSplits); 21NetworkTopology clusterMap = new NetworkTopology(); 22FileStatus[] var13 = files; 23int var14 = files.length; 24 25for(int var15 = 0; var15 < var14; ++var15) { 26FileStatus file = var13[var15]; 27Path path = file.getPath(); 28long length = file.getLen(); 29if (length == 0L) { 30splits.add(this.makeSplit(path, 0L, length, new String[0])); 31} else { 32FileSystem fs = path.getFileSystem(job); 33BlockLocation[] blkLocations; 34if (file instanceof LocatedFileStatus) { 35blkLocations = ((LocatedFileStatus)file).getBlockLocations(); 36} else { 37blkLocations = fs.getFileBlockLocations(file, 0L, length); 38} 39 40if (!this.isSplitable(fs, path)) { 41if (LOG.isDebugEnabled() & & length > Math.min(file.getBlockSize(), minSize)) { 42LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath()); 43} 44 45String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap); 46splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1])); 47} else { 48long blockSize = file.getBlockSize(); 49long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); 50 51long bytesRemaining; 52String[][] splitHosts; 53for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { 54splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap); 55splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); 56} 57 58if (bytesRemaining != 0L) { 59splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); 60splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); 61} 62} 63} 64} 65 66sw.stop(); 67if (LOG.isDebugEnabled()) { 68LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 69} 70 71return (InputSplit[])splits.toArray(new FileSplit[splits.size()]); 72}

与  org.apache.hadoop.mapreduce.lib.input.FileInputFormat 相似,但不同之处还是很重要的。主要在
long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);

赋值:
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);

 
4. 追溯这些量
protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }

private long minSplitSize = 1L;

 
5. 分析
minSize 默认为1L,BlockSize 我的集群为 128M,而 splitSize 就是 BlockSize 和 goalSize 的小值。
goalSize 的计算,就是输入文件总大小与  numSplits 的比值。而  numSplits 就是我们在streaming 中设置的  -D mapreduce.job.maps
因此,在streaming中才可以简单地直接设置 mapper 的数量了。
 




    推荐阅读