Hadoop之MapReduce学习之ip去重MaxScore示例TotalScoreMapper示例

枕上从妨一夜睡,灯前读尽十年诗。这篇文章主要讲述Hadoop之MapReduce学习之ip去重MaxScore示例TotalScoreMapper示例相关的知识,希望能为你提供帮助。
一、IP去重示例
数据文件:

192.168.10.111 192.168.10.111 10.32.100.111 192.168.21.111 192.168.10.112 192.168.10.111 192.168.11.111 192.168.12.112 192.168.11.111

IPMapper:
package com.blb.ip; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class IPMapper extends Mapper< LongWritable, Text, Text, NullWritable> {@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 每一行数据就是一个ip context.write(value, NullWritable.get()); } }

IPReducer:
package com.blb.ip; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class IPReducer extends Reducer< Text, NullWritable, Text, NullWritable> {// key:ip // values:null,null,null... @Override protected void reduce(Text key, Iterable< NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get()); } }

IPDriver:
package com.blb.ip; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class IPDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(IPDriver.class); job.setMapperClass(IPMapper.class); job.setReducerClass(IPReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.10.131:9000/txt/ip.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.10.131:9000/result/ip")); job.waitForCompletion(true); } }

二、axScore示例
数据文件:
张三 684
李四312
王五721
赵六548
田七470
王八668
陈九340
MaxScoreMapper:package com.blb.maxscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxScoreMapper
extends Mapper< LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}
MaxScoreReducer:package com.blb.maxscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MaxScoreReducer
extends Reducer< Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable< IntWritable> values, Context context) throws IOException, InterruptedException {
// 在MapReduce中,为了节省内存,减少对象的创建和销毁,采取了地址复用的机制
// 即在迭代过程中,对象只创建一次
IntWritable max = new IntWritable(0);
// IntWritable max = new IntWritable(0);
// IntWritable value = https://www.songbingjia.com/android/new IntWritable();
// values:684 312 721 548...
// value.set(684);
// value.get() > max.get() --- 684 > 0 --- true
// max = value; 此时max和value都是引用类型,给的是地址
// 也就意味着max和value的地址一样
// value.set(312); 此时max的值也是312
// value.get() > max.get() --- 312 > 312 --- false
// 继续迭代,max和value的地址始终一致
// max的值就是迭代的最后一个值
for (IntWritable value : values) {
if (value.get() > max.get())
// max = value;
max.set(value.get());
}
context.write(key, max);
}
}
MaxScoreDriver:package com.blb.maxscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxScoreDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MaxScoreDriver.class);
job.setMapperClass(MaxScoreMapper.class);
job.setReducerClass(MaxScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://192.168.10.131:9000/txt/score2.txt"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.10.131:9000/result/maxscore2"));
job.waitForCompletion(true);
}
}
  三、TotalScoreMapper示例
数据文件:
张三 78
李四66
王五73
张三88
田七75
张三65
陈九90
李四67
王五78
IPMapper:package com.blb.maxscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxScoreMapper
extends Mapper< LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}
TotalScoreMapper:package com.blb.totalscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TotalScoreMapper
extends Mapper< LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拆分名字和分数
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}
TotalScoreReducer:package com.blb.totalscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TotalScoreReducer
extends Reducer< Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable< IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
TotalScoreDriver:package com.blb.totalscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TotalScoreDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TotalScoreDriver.class);
job.setMapperClass(TotalScoreMapper.class);
job.setReducerClass(TotalScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://192.168.10.131:9000/txt/score2/"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.10.131:9000/result/totalscore"));
【Hadoop之MapReduce学习之ip去重MaxScore示例TotalScoreMapper示例】job.waitForCompletion(true);
}
}
 

    推荐阅读