分布式计算框架Map/Reduce

1 MapReduce是什么
MapReduce是一种编程模型,是面向大数据并行处理的计算模型、框架和平台。
1)MapReduce是一个基于集群的高性能并行计算平台。
2)2MapReduce是一个并行计算与运行的软件框架。
3)MapReduce是一个并行程序设计模型与方法。
2基本特点
1)分布可靠,对数据集的操作分发给集群中的多个节点实现可靠性,每个节点周期性返回它完成的任务和最新的状态
2)封装了实现细节,基于框架API编程,面向业务展开分布式编码
3)提供跨语言编程的能力
3MapReduce主要功能
1.1 数据划分和计算任务调度
1.2数据/代码互相定位
1.3系统优化
1.4出错检测和恢复
4 MapReduce 的运行流程
分布式计算框架Map/Reduce
文章图片

主要包含的步骤:
1) 首先正式提交作业代码,并对输入数据源进行切片
2) master调度worker执行map任务
3) worker当中的map任务读取输入源切片
4) worker执行map任务,将任务输出保存在本地
5) master调度worker执行reduce任务,reduce worker读取map任务的输出文件
6) 执行reduce任务,将任务输出保存到HDFS
5运行流程详解
分布式计算框架Map/Reduce
文章图片


MapReduce将作业的整个运行过程分为两个阶段:Map阶段Reduce阶段。
Map阶段由一定数量的Map Task组成,流程如下:

  1. 输入数据格式解析:InputFormat
  2. 输入数据处理:Mapper
  3. 数据分区:Partitioner
  4. 数据按照key排序
  5. 本地规约:Combiner(相当于local reducer,可选)
  6. 将任务输出保存在本地
Reduce阶段由一定数量的Reduce Task组成,流程如下:
  1. 数据远程拷贝
  2. 数据按照key排序和文件合并merge
  3. 数据处理:Reducer
  4. 数据输出格式:OutputFormat
通常我们把从Mapper阶段输出数据到Reduce阶段的reduce计算之间的过程称之为shuffle

6MapReduce 开发流程
  • 搭建开发环境,参考HDFS环境搭建,基本一致
  • 基于MapReduce框架编写代码,Map、Reduce、Driver三部分组成。
  • 编译打包,将源代码打成的包和依赖jar包打成一个包
  • 上传至运行环境
  • 运行hadoop jar命令,现已由yarn jar替代,建议使用新命令提交执行


1Map类编写
package com.tledu.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; public class WordCountMapper extends Mapper {@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //map 数据处理,将输入的kv 转换成输出的kv //词频统计,输出的kv 是word freq的类型数据, //输入的value是不是一行数据,多个单词用空格分割 //1将单词提取出来 StringTokenizer str = new StringTokenizer(value.toString()); while (str.hasMoreTokens()){ //获取到了每个单词 String wordStr =str.nextToken(); //输出kv形式,k就是这个单词,value是个数 Text word = new Text(wordStr); IntWritable one = new IntWritable(1); //将数据输出 context.write(word,one); } } }



2Reduce代码实现
package com.tledu.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer {@Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //key 聚合过来的key //values 是key的多有的值 //求频率,计数 int sum = 0; for (IntWritable val : values){ sum +=val.get(); } //求出了综合 context.write(key,new IntWritable(sum)); } }

3Driver类编写
package com.tledu.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { publicstaticvoidmain(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //dirver是驱动,将整个流程打通 //创建配置 Configuration conf = new Configuration(); //创建任务 Job job = Job.getInstance(conf,"job021-tianbiao-wordcount"); //运行的主类 mapper, combine, reducer job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); //配置输出的 kv形式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定输入和输出目录 FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //开启任务。等待执行 System.exit(job.waitForCompletion(true) ? 0 :1); } }

1注意: 需要将winutils.ext放到已搭建好的maven hadoop开发项目的根目录下的null/bin/下
2
  • 鉴于本地运行的固有bug,需要重写存在bug的文件,步骤如下:
    • 找到Bug文件路径:org.apache.hadoop.io.nativeio. NativeIO
      • 该文件所在的jar包为:hadoop-common包下
      • 复制到JAVA下 复制类 NativeIO
      • 找 access 方法改为return true;(
        public static boolean access(String path, NativeIO.Windows.AccessRight desiredAccess) throws IOException { return access0(path, desiredAccess.accessRight()); }







2.4本地模拟分布式计算环境运行mapreduce
鉴于远程运行进行代码测试的复杂性,以及其它新框架均开始支持本地local环境模拟分布式计算运行, 故mapreduce从2.x开始也已经支持本地环境
具体做法请参见辅助资料集” 06-本地local环境模拟mapreduce并行计算的操作步骤”。

2.5 Maven打包
使用Maven命令,基于配置的Maven插件实现代码打包。

2.6 上传到运行环境
使用rz命令将打好的运行包上传到集群环境中。

2.7 运行WordCount程序
具体提交命令为:
【分布式计算框架Map/Reduce】yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3

    推荐阅读