MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

智者不为愚者谋,勇者不为怯者死。这篇文章主要讲述MapReduce统计上行流量下行流量及流量之和,并且到集群上运行相关的知识,希望能为你提供帮助。

MapReduce统计上行流量、下行流量及流量之和

  • 数据集需求分析
    • 数据
    • 需求
    • 分析
  • 具体操作
    • 自定义一个数据类型
    • Map阶段
    • 自定义分区
    • Reduce阶段
    • Driver阶段
  • 将程序打成jar包
    • 在IDEA上打jar包的流程图
  • 在集群上运行

数据集需求分析 数据
136315798506613726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com24 27 2481 24681 200 136315799505213826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.44 0 264 0 200 136315799107613926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.992 4 132 1512 200 136315440002213926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.44 0 240 0 200 136315799304418211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 136315799305513560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.9918 15 1116 954 200 136315799503315920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 136315798301913719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.824 0 240 0 200 136315798404113660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 136315797309815013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 136315798602915989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 136315799209313560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.9915 9 918 4938 200 136315798604113480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.43 3 180 180 200 136315798404013602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 136315799509313922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn12 12 3008 3720 200 136315798204013502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 136315798607218320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 136315799004313925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 136315798807213760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.822 2 120 120 200 136315798507913823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.996 3 360 180 200 136315798506913600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.5518 138 1080 186852 200

需求
  • 统计每个电话号码的总的上行流量、下行流量及总流量
  • 按照号码的前三个进行分区操作
分析
  • 将电话作为key,这样就可以按照key进行分组
  • 在reduce阶段进行求和汇总
具体操作 自定义一个数据类型
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Objects; public class Bean implements WritableComparable< Bean> { private int sum_low; private int sum_up; private int sum_bean; @Override public int compareTo(Bean o) { return this.sum_bean - o.sum_bean; }@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(sum_low); dataOutput.writeInt(sum_up); dataOutput.writeInt(sum_bean); }@Override public void readFields(DataInput dataInput) throws IOException { sum_low = dataInput.readInt(); sum_up = dataInput.readInt(); sum_bean = dataInput.readInt(); }public void set(int sum_low, int sum_up, int sum_bean) { this.sum_low = sum_low; this.sum_up = sum_up; this.sum_bean = sum_bean; }@Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Bean bean = (Bean) o; return sum_low == bean.sum_low & & sum_up == bean.sum_up & & sum_bean == bean.sum_bean; }@Override public int hashCode() { return Objects.hash(sum_low, sum_up, sum_bean); }@Override public String toString() { return sum_low + "\\t" + sum_up + "\\t" + sum_bean; }public int getSum_low() { return sum_low; }public void setSum_low(int sum_low) { this.sum_low = sum_low; }public int getSum_up() { return sum_up; }public void setSum_up(int sum_up) { this.sum_up = sum_up; }public int getSum_bean() { return sum_bean; }public void setSum_bean(int sum_bean) { this.sum_bean = sum_bean; } }

Map阶段
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 136315798607218320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 * 求每个手机号的上行流量之和、下行流量之和、上下行流量之和 */ public class MapTest extends Mapper< LongWritable, Text, Text, Bean> { Bean v = new Bean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] datas = value.toString().split("\\t"); k.set(datas[1]); //这里先暂时存储单个的上下行流量 v.set(Integer.parseInt(datas[datas.length-3]),Integer.parseInt(datas[datas.length-2]),0); context.write(k,v); } }

自定义分区
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ParTest extends Partitioner< Text,Bean> { @Override public int getPartition(Text text, Bean bean, int i) { String prePhone =text.toString().substring(0,3); //substring左闭右开 int partition = 4; //五个分区,从0开始算 if("136".equals(prePhone)){ partition = 0; }else if ("137".equals(prePhone)){ partition = 1; }else if ("138".equals(prePhone)){ partition = 2; }else if ("139".equals(prePhone)){ partition = 3; }else { partition = 4; } return partition; } }

Reduce阶段
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class RedTest extends Reducer< Text,Bean,Text,Bean> { int sum_low = 0; int sum_up = 0; Bean v = new Bean(); @Override protected void reduce(Text key, Iterable< Bean> values, Context context) throws IOException, InterruptedException { for (Bean b:values){ sum_up+=b.getSum_up(); sum_low+=b.getSum_low(); } v.set(sum_low,sum_up,sum_low+sum_up); context.write(key,v); sum_up=0; sum_low=0; } }

Driver阶段
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; public class DriTest { public static void main(String[] args) throws Exception{ File file = new File("D:\\\\FlowSum\\\\output"); if (file.exists()){ delFile(file); driver(); }else { driver(); } } public static void delFile(File file) { File[] files = file.listFiles(); if (files != null & & files.length != 0) { for (int i = 0; i< files.length; i++) { delFile(files[i]); } } file.delete(); } publicstatic void driver() throws Exception{ Configuration conf = new Configuration(); conf.set("fs.default","hdfs://192.168.0.155:9000/"); Job job = Job.getInstance(conf); job.setMapperClass(MapTest.class); job.setReducerClass(RedTest.class); job.setJarByClass(DriTest.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Bean.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(Bean.class); job.setNumReduceTasks(5); job.setPartitionerClass(ParTest.class); FileInputFormat.setInputPaths(job, "/MR/input"); FileOutputFormat.setOutputPath(job, new Path("/MR/output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

将程序打成jar包 在IDEA上打jar包的流程图
MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

文章图片

MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

文章图片

MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

文章图片

MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

文章图片

MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

文章图片

在集群上运行
  • 在集群上运行的命令:hadoop jar MPTEST.jar FlovwBean.DriTest
  • 其中FlovwBean.DriTest是Driver的绝对路径
    MapReduce统计上行流量下行流量及流量之和,并且到集群上运行

    文章图片
【MapReduce统计上行流量下行流量及流量之和,并且到集群上运行】

    推荐阅读