1 MapReduce是什么
MapReduce是一种编程模型,是面向大数据并行处理的计算模型、框架和平台。
1)MapReduce是一个基于集群的高性能并行计算平台。
2)2MapReduce是一个并行计算与运行的软件框架。
3)MapReduce是一个并行程序设计模型与方法。
2基本特点
1)分布可靠,对数据集的操作分发给集群中的多个节点实现可靠性,每个节点周期性返回它完成的任务和最新的状态
2)封装了实现细节,基于框架API编程,面向业务展开分布式编码
3)提供跨语言编程的能力
3MapReduce主要功能
1.1 数据划分和计算任务调度
1.2数据/代码互相定位
1.3系统优化
1.4出错检测和恢复
4 MapReduce 的运行流程
文章图片
主要包含的步骤:
1) 首先正式提交作业代码,并对输入数据源进行切片
2) master调度worker执行map任务
3) worker当中的map任务读取输入源切片
4) worker执行map任务,将任务输出保存在本地
5) master调度worker执行reduce任务,reduce worker读取map任务的输出文件
6) 执行reduce任务,将任务输出保存到HDFS
5运行流程详解
文章图片
MapReduce将作业的整个运行过程分为两个阶段:Map阶段Reduce阶段。
Map阶段由一定数量的Map Task组成,流程如下:
- 输入数据格式解析:InputFormat
- 输入数据处理:Mapper
- 数据分区:Partitioner
- 数据按照key排序
- 本地规约:Combiner(相当于local reducer,可选)
- 将任务输出保存在本地
- 数据远程拷贝
- 数据按照key排序和文件合并merge
- 数据处理:Reducer
- 数据输出格式:OutputFormat
6MapReduce 开发流程
- 搭建开发环境,参考HDFS环境搭建,基本一致
- 基于MapReduce框架编写代码,Map、Reduce、Driver三部分组成。
- 编译打包,将源代码打成的包和依赖jar包打成一个包
- 上传至运行环境
- 运行hadoop jar命令,现已由yarn jar替代,建议使用新命令提交执行
1Map类编写
package com.tledu.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCountMapper extends Mapper
2Reduce代码实现
package com.tledu.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer {@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//key 聚合过来的key
//values 是key的多有的值
//求频率,计数
int sum = 0;
for (IntWritable val : values){
sum +=val.get();
}
//求出了综合
context.write(key,new IntWritable(sum));
}
}
3Driver类编写
package com.tledu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
publicstaticvoidmain(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//dirver是驱动,将整个流程打通
//创建配置
Configuration conf = new Configuration();
//创建任务
Job job = Job.getInstance(conf,"job021-tianbiao-wordcount");
//运行的主类 mapper, combine, reducer
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
//配置输出的 kv形式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定输入和输出目录
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//开启任务。等待执行
System.exit(job.waitForCompletion(true) ? 0 :1);
}
}
1注意: 需要将winutils.ext放到已搭建好的maven hadoop开发项目的根目录下的null/bin/下
2
- 鉴于本地运行的固有bug,需要重写存在bug的文件,步骤如下:
- 找到Bug文件路径:org.apache.hadoop.io.nativeio. NativeIO
- 该文件所在的jar包为:hadoop-common包下
- 复制到JAVA下 复制类 NativeIO
- 找 access 方法改为return true;(
public static boolean access(String path, NativeIO.Windows.AccessRight desiredAccess) throws IOException { return access0(path, desiredAccess.accessRight()); }
)
- 该文件所在的jar包为:hadoop-common包下
- 找到Bug文件路径:org.apache.hadoop.io.nativeio. NativeIO
2.4本地模拟分布式计算环境运行mapreduce
鉴于远程运行进行代码测试的复杂性,以及其它新框架均开始支持本地local环境模拟分布式计算运行, 故mapreduce从2.x开始也已经支持本地环境
具体做法请参见辅助资料集” 06-本地local环境模拟mapreduce并行计算的操作步骤”。
2.5 Maven打包
使用Maven命令,基于配置的Maven插件实现代码打包。
2.6 上传到运行环境
使用rz命令将打好的运行包上传到集群环境中。
2.7 运行WordCount程序
具体提交命令为:
【分布式计算框架Map/Reduce】yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3
推荐阅读
- 大数据|【大数据开发笔记1】什么是Hadoop?
- 大数据|Yarn 资源调度器概述
- Hadoop教程目录
- 什么是yarn
- 什么是Sqoop
- 什么是Apache Pig
- 什么是HIVE
- 什么是HBase
- 什么是Hadoop