文档|结合案例讲解MapReduce重要知识点 ---------- 内存排序找出每一组中的最大值

待处理数据 内容如下

二次排序: 23 321 23 290 23 567 78 650 78 554 78 756 16 18 16 16 16 15 9 8 9 0 9 3

【文档|结合案例讲解MapReduce重要知识点 ---------- 内存排序找出每一组中的最大值】处理后的数据 内容如下
输出数据: ----------- 9 8 ----------- 16 18 ---------- 23 567 ----------- 78 756

MapReduce类FindMax
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * @author lyd * * 获取每一组中的最大值?? 二次排序: 23 321 23 290 23 567 78 650 78 554 78 756 16 18 16 16 16 15 9 8 9 0 9 3输出数据: ----------- 9 8 ----------- 16 18 ---------- 23 567 ----------- 78 756 */ public class FindMax extends ToolRunner implements Tool{ /** * 自定义的myMapper * @author lyd * */ static class MyMapper extends Mapper{@Override protected void setup(Context context)throws IOException, InterruptedException { }@Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String lines [] = line.split(" "); context.write(new IntWritable(Integer.parseInt(lines[0])), new IntWritable(Integer.parseInt(lines[1]))); }@Override protected void cleanup(Context context)throws IOException, InterruptedException { } } /** * 自定义MyReducer * @author lyd * */ static class MyReducer extends Reducer{@Override protected void setup(Context context)throws IOException, InterruptedException { }@Override protected void reduce(IntWritable key, Iterable value,Context context) throws IOException, InterruptedException { List li = new ArrayList(); for (IntWritable i : value) { li.add(i.get()); } //排序 Collections.sort(li); context.write(new Text(key.get()+""), new Text(li.get(li.size()-1)+"")); }@Override protected void cleanup(Context context)throws IOException, InterruptedException { } } @Override public void setConf(Configuration conf) { conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); } @Override public Configuration getConf() { return new Configuration(); } /** * 驱动方法 */ @Override public int run(String[] args) throws Exception { //1、获取conf对象 Configuration conf = getConf(); //2、创建job Job job = Job.getInstance(conf, "model01"); //3、设置运行job的class job.setJarByClass(FindMax.class); //4、设置map相关属性 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); //5、设置reduce相关属性 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //判断输出目录是否存在,若存在则删除 FileSystem fs = FileSystem.get(conf); if(fs.exists(new Path(args[1]))){ fs.delete(new Path(args[1]), true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); //6、提交运行job int isok = job.waitForCompletion(true) ? 0 : 1; return isok; } /** * job的主入口 * @param args */ public static void main(String[] args) { try { //对输入参数作解析 String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs(); System.exit(ToolRunner.run(new FindMax(), argss)); } catch (Exception e) { e.printStackTrace(); } } }


    推荐阅读