(四)Hadoop之MapReduce实战小例子

输入数据文件 AvgTemperature.txt

DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM10 20160602,00,霾,1984,130,9,390,348,300 20160802,01,霾,1163,81,8,393,368,302 20160706,02,霾,1079,108,17,360,394,306 20160706,03,霾,1116,79,6,339,387,303 20160502,04,霾,1198,98,16,357,325,307 20160602,05,霾,1762,126,9,324,316,301 20160408,06,霾,1996,131,3,349,344,301 20160604,07,霾,1952,119,26,347,300,309 20160105,08,霾,1410,81,8,350,395,307 20160104,09,霾,1718,130,4,352,335,308 20160501,10,霾,1714,119,27,310,336,307 20160601,11,霾,1660,130,23,311,364,302 20160606,12,霾,1598,96,12,369,346,309 20160602,13,霾,1673,127,2,343,346,303 20160706,14,霾,1578,122,8,360,323,307 20160707,15,霾,1237,118,12,384,384,301 20160205,16,霾,1231,78,9,361,357,302 20160605,17,霾,1166,86,30,350,388,307 20160506,18,霾,1426,94,2,378,372,305 20160805,19,霾,1874,144,20,376,327,302 20160405,20,霾,1778,94,22,360,335,304 20160104,21,霾,1055,64,22,376,361,305 20160304,22,霾,1349,78,15,367,384,308 20160203,23,霾,2004,110,2,359,371,304 20160603,24,霾,1375,115,19,308,301,308 20160402,25,霾,1201,69,5,387,342,305 20160707,26,霾,1272,112,23,348,333,307 20160702,27,霾,1738,60,12,393,300,303 20160301,28,霾,1752,107,12,364,331,301 20160704,29,霾,1442,65,9,332,369,308

第一题:编写月平均气温统计程序
(四)Hadoop之MapReduce实战小例子
文章图片

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class AvgTemperature {public static class StatMapper extends Mapper { private IntWritable intValue = https://www.it610.com/article/new IntWritable(); private Text dateKey = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); String date = items[0]; String tmp = items[5]; if(!"DATE".equals(date) && !"N/A".equals(tmp)){//排除第一行说明以及未取到数据的行 dateKey.set(date.substring(0, 6)); intValue.set(Integer.parseInt(tmp)); context.write(dateKey, intValue); } } }public static class StatReducer extends Reducer { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int tmp_sum = 0; int count = 0; for(IntWritable val : values){ tmp_sum += val.get(); count++; } int tmp_avg = tmp_sum/count; result.set(tmp_avg); context.write(key, result); } }public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration(); Job job = new Job(conf, "AvgTemperature"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(AvgTemperature.class); job.setMapperClass(StatMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(StatReducer.class); job.setPartitionerClass(HashPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); TextInputFormat.setInputPaths(job, args[0]); job.setNumReduceTasks(Integer.parseInt(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

(四)Hadoop之MapReduce实战小例子
文章图片

运行结果:
20160111 2016025 20160313 20160410 20160515 20160616 20160712 20160814

第二题:编写每日空气质量统计程序
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import java.io.IOException; /** * @Author Natasha * @Description * @Date 2020/10/30 20:37 **/ public class AirQuality { public static class AirQualityMapprer extends Mapper{private Text text = new Text(); private IntWritable intWritable = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] item = value.toString().split(","); String date = item[0]; String kongqi = item[6]; if(!"DATE".equals(date) && !"N/A".equals(kongqi)){//排除第一行说明以及未取到数据的行 text.set(date.substring(0, 6)); intWritable.set(Integer.parseInt(kongqi)); context.write(text, intWritable); } } }public static class AirQualityReducer extends Reducer{ private IntWritable res = new IntWritable(); public void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException { int aqi = 0; int cnt = 0; for(IntWritable iw : value){ aqi += iw.get(); cnt++; } int aqi_avg = aqi/cnt; res.set(aqi_avg); context.write(key, res); } }public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "AirQuality"); job.setJarByClass(AirQuality.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(AirQualityMapprer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(AirQualityReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(Integer.parseInt(args[2])); TextInputFormat.setInputPaths(job, args[0]); TextOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

【(四)Hadoop之MapReduce实战小例子】运行结果:
201601359 201602360 201603365 201604365 201605348 201606342 201607359 201608384

    推荐阅读