Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化

一. 压缩优化设计 运行MapReduce程序时,磁盘I/O操作、网络数据传输、shuffle和merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。
Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化
文章图片

压缩是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,
以减少磁盘IO,提高MR程序运行速度,它的优缺点如下:
压缩的优点:

  • 减少文件存储所占空间
  • 加快文件传输效率,从而提高系统的处理速度
  • 降低IO读写的次数
压缩的缺点
  • 用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长
二. 压缩支持 1. 检查Hadoop支持的压缩算法:hadoop checknative
Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化
文章图片

2. Hadoop支持的压缩算法
Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化
文章图片

3. 各压缩算法压缩性能对比
压缩算法 优点 缺点
Gzip 压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便 不支持split
Lzo 压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便 压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)
Bzip2 支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便 缩/解压速度慢;不支持native
Snappy 压缩速度快;支持hadoop native库 不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令
4. 同样大小的数据对应压缩比
Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化
文章图片

5. 压缩时间和解压时间
Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化
文章图片

从以上对比可以看出:压缩比越高,压缩时间越长,应当选择压缩比与压缩时间中等的压缩算法
三.Gzip压缩 1. 生成Gzip压缩文件
1. 需求:读取普通文本文件,将普通文本文件压缩为Gzip格式 2. 思路
  1. Input读取普通文本文件
  2. Map和Reduce直接输出
  3. 配置Output输
  4. 出压缩为Gzip格式
3.代码实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRWriteGzip * @Description TODO 读取普通文件数据,对数据以Gzip格式进行压缩 */ public class MRWriteGzip extends Configured implements Tool {//构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {//构建Job Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(MRWriteGzip.class); //input:配置输入 Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map:配置Map job.setMapperClass(MrMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //reduce:配置Reduce job.setReducerClass(MrReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //output:配置输出 Path outputPath = new Path(args[1]); TextOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; }//程序入口,调用run public static void main(String[] args) throws Exception { //用于管理当前程序的所有配置 Configuration conf = new Configuration(); //配置输出结果压缩为Gzip格式 conf.set("mapreduce.output.fileoutputformat.compress","true"); conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec"); //调用run方法,提交运行Job int status = ToolRunner.run(conf, new MRWriteGzip(), args); System.exit(status); }/** * 定义Mapper类 */ public static class MrMapper extends Mapper{private NullWritable outputKey = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接输出每条数据 context.write(this.outputKey,value); } }/** * 定义Reduce类 */ public static class MrReduce extends Reducer {@Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { //直接输出每条数据 for (Text value : values) { context.write(key, value); } } }}

2. 读取Gzip压缩文件
1. 需求:读取Gzip压缩文件,还原为普通文本文件 2. 思路
  1. Input直接读取上一步的压缩结果文件
  2. Map和Reduce直接输出
  3. Output将结果保存为普通文本文件
    3.代码开发
    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRReadGzip * @Description TODO 读取Gzip格式的数据,还原为普通文本文件 */ public class MRReadGzip extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {//构建Job Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(MRReadGzip.class); //input:配置输入 Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map:配置Map job.setMapperClass(MrMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //reduce:配置Reduce job.setReducerClass(MrReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //output:配置输出 Path outputPath = new Path(args[1]); TextOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception { //用于管理当前程序的所有配置 Configuration conf = new Configuration(); //配置输出结果压缩为Gzip格式 //conf.set("mapreduce.output.fileoutputformat.compress","true"); //conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec"); //调用run方法,提交运行Job int status = ToolRunner.run(conf, new MRReadGzip(), args); System.exit(status); } /** * 定义Mapper类 */ public static class MrMapper extends Mapper{private NullWritable outputKey = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接输出每条数据 context.write(this.outputKey,value); } } /** * 定义Reduce类 */ public static class MrReduce extends Reducer {@Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { //直接输出每条数据 for (Text value : values) { context.write(key, value); } } }}

    三. Snappy压缩1. 配置Hadoop支持Snappy
    Hadoop支持Snappy类型的压缩算法,并且也是最常用的一种压缩算法,但是Hadoop官方已编译的安装包中并没有提供Snappy的支持,所以如果想使用Snappy压缩,必须下载Hadoop源码,自己进行编译,在编译时添加Snappy的支持,具体编译过程请参考《Hadoop3编译安装》手册。
2. 生成Snappy压缩文件:Map输出不压缩
1. 需求:读取普通文本文件,转换为Snappy压缩文件 2. 思路
  1. Input读取普通文本文件
  2. Map和Reduce直接输出
  3. Output配置输出压缩为Snappy类型
    3. 代码开发
    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRWriteSnappy * @Description TODO 读取普通文件数据,对数据以Snappy格式进行压缩 */ public class MRWriteSnappy extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {//构建Job Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(MRWriteSnappy.class); //input:配置输入 Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map:配置Map job.setMapperClass(MrMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //reduce:配置Reduce job.setReducerClass(MrReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //output:配置输出 Path outputPath = new Path(args[1]); TextOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception { //用于管理当前程序的所有配置 Configuration conf = new Configuration(); //配置输出结果压缩为Snappy格式 conf.set("mapreduce.output.fileoutputformat.compress","true"); conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); //调用run方法,提交运行Job int status = ToolRunner.run(conf, new MRWriteSnappy(), args); System.exit(status); } /** * 定义Mapper类 */ public static class MrMapper extends Mapper{private NullWritable outputKey = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接输出每条数据 context.write(this.outputKey,value); } } /** * 定义Reduce类 */ public static class MrReduce extends Reducer {@Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { //直接输出每条数据 for (Text value : values) { context.write(key, value); } } } }

2. 生成Snappy压缩文件:Map输出压缩
1.需求:读取普通文本文件,转换为Snappy压缩文件,并对Map输出的结果使用Snappy压缩 2. 思路: 将上一步的代码中添加Map输出压缩的配置 3. 代码开发
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRMapOutputSnappy * @Description TODO 读取普通文件数据,对Map输出的数据以Snappy格式进行压缩 */ public class MRMapOutputSnappy extends Configured implements Tool {//构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {//构建Job Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(MRMapOutputSnappy.class); //input:配置输入 Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map:配置Map job.setMapperClass(MrMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //reduce:配置Reduce job.setReducerClass(MrReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //output:配置输出 Path outputPath = new Path(args[1]); TextOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; }//程序入口,调用run public static void main(String[] args) throws Exception { //用于管理当前程序的所有配置 Configuration conf = new Configuration(); //配置Map输出结果压缩为Snappy格式 conf.set("mapreduce.map.output.compress","true"); conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); //配置Reduce输出结果压缩为Snappy格式 conf.set("mapreduce.output.fileoutputformat.compress","true"); conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); //调用run方法,提交运行Job int status = ToolRunner.run(conf, new MRMapOutputSnappy(), args); System.exit(status); }/** * 定义Mapper类 */ public static class MrMapper extends Mapper{private NullWritable outputKey = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接输出每条数据 context.write(this.outputKey,value); } }/** * 定义Reduce类 */ public static class MrReduce extends Reducer {@Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { //直接输出每条数据 for (Text value : values) { context.write(key, value); } } } }

4. 读取Snappy压缩文件
1. 需求:读取上一步生成的Snappy文件,还原为普通文本文件 2. 思路:
  1. Input读取Snappy文件
  2. Map和Reduce直接输出
  3. Output直接输出为普通文本类型
    3. 代码:
    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRReadSnappy * @Description TODO 读取Snappy格式的数据,还原为普通文本文件 */ public class MRReadSnappy extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {//构建Job Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(MRReadSnappy.class); //input:配置输入 Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map:配置Map job.setMapperClass(MrMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //reduce:配置Reduce job.setReducerClass(MrReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //output:配置输出 Path outputPath = new Path(args[1]); TextOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception { //用于管理当前程序的所有配置 Configuration conf = new Configuration(); //调用run方法,提交运行Job int status = ToolRunner.run(conf, new MRReadSnappy(), args); System.exit(status); } /** * 定义Mapper类 */ public static class MrMapper extends Mapper{private NullWritable outputKey = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接输出每条数据 context.write(this.outputKey,value); } } /** * 定义Reduce类 */ public static class MrReduce extends Reducer {@Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { //直接输出每条数据 for (Text value : values) { context.write(key, value); } } } }

    五. Lzo压缩1. 配置Hadoop支持Lzo
    Hadoop本身不支持Lzo类型的压缩,需要额外单独安装,并在编译时添加Lzo的压缩算法支持,编译过程请参考编译手册《Apache Hadoop3-1-3编译安装部署lzo压缩指南》。
    编译完成后,请实现以下配置,让当前的Hadoop支持Lzo压缩
    • 添加lzo支持jar包
    cp hadoop-lzo-0.4.21-SNAPSHOT.jar /export/server/hadoop-3.1.4/share/hadoop/common/

    Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化
    文章图片

    • 同步到所有节点
    cd/export/server/hadoop-3.1.4/share/hadoop/common/ scp hadoop-lzo-0.4.21-SNAPSHOT.jar node2:$PWD scp hadoop-lzo-0.4.21-SNAPSHOT.jar node3:$PWD

    • 修改core-site.xml
    io.compression.codecs org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec io.compression.codec.lzo.class com.hadoop.compression.lzo.LzoCodec

    • 同步core-site.xml到其他所有节点
    cd/export/server/hadoop-3.1.4/etc/hadoop scpcore-site.xml node2:$PWD scpcore-site.xml node3:$PWD

    • 重新启动Hadoop集群
    2.生成Lzo压缩文件
    1. 需求:读取普通文本文件,生成Lzo压缩结果文件2. 思路
  4. 读取普通文本文件
  5. Map和Reduce直接输出
  6. 【Hadoop 入门笔记 二十三 : MapReduce性能优化-数据压缩优化】配置Output输出压缩为Lzo类型
    3. 代码开发
    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRWriteLzo * @Description TODO 读取普通文件数据,对数据以Lzo格式进行压缩 */ public class MRWriteLzo extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {//构建Job Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(MRWriteLzo.class); //input:配置输入 Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map:配置Map job.setMapperClass(MrMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //reduce:配置Reduce job.setReducerClass(MrReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //output:配置输出 Path outputPath = new Path(args[1]); TextOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception { //用于管理当前程序的所有配置 Configuration conf = new Configuration(); //配置输出结果压缩为Lzo格式 conf.set("mapreduce.output.fileoutputformat.compress","true"); conf.set("mapreduce.output.fileoutputformat.compress.codec","com.hadoop.compression.lzo.LzopCodec"); //调用run方法,提交运行Job int status = ToolRunner.run(conf, new MRWriteLzo(), args); System.exit(status); } /** * 定义Mapper类 */ public static class MrMapper extends Mapper{private NullWritable outputKey = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接输出每条数据 context.write(this.outputKey,value); } } /** * 定义Reduce类 */ public static class MrReduce extends Reducer {@Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { //直接输出每条数据 for (Text value : values) { context.write(key, value); } } } }

    3. 读取Lzo压缩文件
    1. 需求:读取Lzo压缩文件,恢复为普通文本文件2. 代码开发
    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRReadLzo * @Description TODO 读取Lzo格式的数据,还原为普通文本文件 */ public class MRReadLzo extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {//构建Job Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(MRReadLzo.class); //input:配置输入 Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map:配置Map job.setMapperClass(MrMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //reduce:配置Reduce job.setReducerClass(MrReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //output:配置输出 Path outputPath = new Path(args[1]); TextOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception { //用于管理当前程序的所有配置 Configuration conf = new Configuration(); //配置输出结果压缩为Gzip格式 //conf.set("mapreduce.output.fileoutputformat.compress","true"); //conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec"); //调用run方法,提交运行Job int status = ToolRunner.run(conf, new MRReadLzo(), args); System.exit(status); } /** * 定义Mapper类 */ public static class MrMapper extends Mapper{private NullWritable outputKey = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接输出每条数据 context.write(this.outputKey,value); } } /** * 定义Reduce类 */ public static class MrReduce extends Reducer {@Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { //直接输出每条数据 for (Text value : values) { context.write(key, value); } } } }

    推荐阅读