文档|结合案例讲解MapReduce重要知识点 ---------- 内存排序找出每一组中的最大值
待处理数据 内容如下
二次排序:
23 321
23 290
23 567
78 650
78 554
78 756
16 18
16 16
16 15
9 8
9 0
9 3
【文档|结合案例讲解MapReduce重要知识点 ---------- 内存排序找出每一组中的最大值】处理后的数据 内容如下
输出数据:
-----------
9 8
-----------
16 18
----------
23 567
-----------
78 756
MapReduce类FindMax
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author lyd
*
*
获取每一组中的最大值??
二次排序:
23 321
23 290
23 567
78 650
78 554
78 756
16 18
16 16
16 15
9 8
9 0
9 3输出数据:
-----------
9 8
-----------
16 18
----------
23 567
-----------
78 756
*/
public class FindMax extends ToolRunner implements Tool{ /**
* 自定义的myMapper
* @author lyd
*
*/
static class MyMapper extends Mapper{@Override
protected void setup(Context context)throws IOException, InterruptedException {
}@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String lines [] = line.split(" ");
context.write(new IntWritable(Integer.parseInt(lines[0])), new IntWritable(Integer.parseInt(lines[1])));
}@Override
protected void cleanup(Context context)throws IOException, InterruptedException {
} }
/**
* 自定义MyReducer
* @author lyd
*
*/
static class MyReducer extends Reducer{@Override
protected void setup(Context context)throws IOException, InterruptedException {
}@Override
protected void reduce(IntWritable key, Iterable value,Context context)
throws IOException, InterruptedException {
List li = new ArrayList();
for (IntWritable i : value) {
li.add(i.get());
}
//排序
Collections.sort(li);
context.write(new Text(key.get()+""), new Text(li.get(li.size()-1)+""));
}@Override
protected void cleanup(Context context)throws IOException, InterruptedException {
}
}
@Override
public void setConf(Configuration conf) {
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
} @Override
public Configuration getConf() {
return new Configuration();
}
/**
* 驱动方法
*/
@Override
public int run(String[] args) throws Exception {
//1、获取conf对象
Configuration conf = getConf();
//2、创建job
Job job = Job.getInstance(conf, "model01");
//3、设置运行job的class
job.setJarByClass(FindMax.class);
//4、设置map相关属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//5、设置reduce相关属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//判断输出目录是否存在,若存在则删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]), true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交运行job
int isok = job.waitForCompletion(true) ? 0 : 1;
return isok;
}
/**
* job的主入口
* @param args
*/
public static void main(String[] args) {
try {
//对输入参数作解析
String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
System.exit(ToolRunner.run(new FindMax(), argss));
} catch (Exception e) {
e.printStackTrace();
}
}
}
推荐阅读
- ts泛型使用举例
- 【38】“劳逸结合”的重要性
- 翼光家长沙龙复盘
- 小程序有哪些低成本获客手段——案例解析
- 【文魁大脑实用记忆第五期】贤派张丽琼第8次案例一地理记忆有汽泡图
- springboot结合redis实现搜索栏热搜功能及文字过滤
- P5 DS——构件与文档关联
- W15L21-L22-电商企业案例分析
- 案例11(|案例11: “我们” 小成功关注个人,大成功关注大多数人)
- LaTeX记录|LaTeX记录 —— LaTeX文档基本结构