智者不为愚者谋,勇者不为怯者死。这篇文章主要讲述MapReduce统计上行流量下行流量及流量之和,并且到集群上运行相关的知识,希望能为你提供帮助。
MapReduce统计上行流量、下行流量及流量之和
- 数据集需求分析
-
- 数据
- 需求
- 分析
- 具体操作
-
- 自定义一个数据类型
- Map阶段
- 自定义分区
- Reduce阶段
- Driver阶段
- 将程序打成jar包
-
- 在IDEA上打jar包的流程图
- 在集群上运行
数据集需求分析 数据
136315798506613726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com24 27 2481 24681 200
136315799505213826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.44 0 264 0 200
136315799107613926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.992 4 132 1512 200
136315440002213926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.44 0 240 0 200
136315799304418211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
136315799305513560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.9918 15 1116 954 200
136315799503315920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
136315798301913719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.824 0 240 0 200
136315798404113660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
136315797309815013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
136315798602915989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
136315799209313560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.9915 9 918 4938 200
136315798604113480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.43 3 180 180 200
136315798404013602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
136315799509313922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn12 12 3008 3720 200
136315798204013502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
136315798607218320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
136315799004313925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
136315798807213760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.822 2 120 120 200
136315798507913823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.996 3 360 180 200
136315798506913600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.5518 138 1080 186852 200
需求
- 统计每个电话号码的总的上行流量、下行流量及总流量
- 按照号码的前三个进行分区操作
- 将电话作为key,这样就可以按照key进行分组
- 在reduce阶段进行求和汇总
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class Bean implements WritableComparable<
Bean>
{
private int sum_low;
private int sum_up;
private int sum_bean;
@Override
public int compareTo(Bean o) {
return this.sum_bean - o.sum_bean;
}@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(sum_low);
dataOutput.writeInt(sum_up);
dataOutput.writeInt(sum_bean);
}@Override
public void readFields(DataInput dataInput) throws IOException {
sum_low = dataInput.readInt();
sum_up = dataInput.readInt();
sum_bean = dataInput.readInt();
}public void set(int sum_low, int sum_up, int sum_bean) {
this.sum_low = sum_low;
this.sum_up = sum_up;
this.sum_bean = sum_bean;
}@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Bean bean = (Bean) o;
return sum_low == bean.sum_low &
&
sum_up == bean.sum_up &
&
sum_bean == bean.sum_bean;
}@Override
public int hashCode() {
return Objects.hash(sum_low, sum_up, sum_bean);
}@Override
public String toString() {
return sum_low + "\\t" + sum_up + "\\t" + sum_bean;
}public int getSum_low() {
return sum_low;
}public void setSum_low(int sum_low) {
this.sum_low = sum_low;
}public int getSum_up() {
return sum_up;
}public void setSum_up(int sum_up) {
this.sum_up = sum_up;
}public int getSum_bean() {
return sum_bean;
}public void setSum_bean(int sum_bean) {
this.sum_bean = sum_bean;
}
}
Map阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 136315798607218320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
* 求每个手机号的上行流量之和、下行流量之和、上下行流量之和
*/
public class MapTest extends Mapper<
LongWritable, Text, Text, Bean>
{
Bean v = new Bean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] datas = value.toString().split("\\t");
k.set(datas[1]);
//这里先暂时存储单个的上下行流量
v.set(Integer.parseInt(datas[datas.length-3]),Integer.parseInt(datas[datas.length-2]),0);
context.write(k,v);
}
}
自定义分区
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ParTest extends Partitioner<
Text,Bean>
{
@Override
public int getPartition(Text text, Bean bean, int i) {
String prePhone =text.toString().substring(0,3);
//substring左闭右开
int partition = 4;
//五个分区,从0开始算
if("136".equals(prePhone)){
partition = 0;
}else if ("137".equals(prePhone)){
partition = 1;
}else if ("138".equals(prePhone)){
partition = 2;
}else if ("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
return partition;
}
}
Reduce阶段
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RedTest extends Reducer<
Text,Bean,Text,Bean>
{
int sum_low = 0;
int sum_up = 0;
Bean v = new Bean();
@Override
protected void reduce(Text key, Iterable<
Bean>
values, Context context) throws IOException, InterruptedException {
for (Bean b:values){
sum_up+=b.getSum_up();
sum_low+=b.getSum_low();
}
v.set(sum_low,sum_up,sum_low+sum_up);
context.write(key,v);
sum_up=0;
sum_low=0;
}
}
Driver阶段
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
public class DriTest {
public static void main(String[] args) throws Exception{
File file = new File("D:\\\\FlowSum\\\\output");
if (file.exists()){
delFile(file);
driver();
}else {
driver();
}
}
public static void delFile(File file) {
File[] files = file.listFiles();
if (files != null &
&
files.length != 0) {
for (int i = 0;
i<
files.length;
i++) {
delFile(files[i]);
}
}
file.delete();
}
publicstatic void driver() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.default","hdfs://192.168.0.155:9000/");
Job job = Job.getInstance(conf);
job.setMapperClass(MapTest.class);
job.setReducerClass(RedTest.class);
job.setJarByClass(DriTest.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);
job.setNumReduceTasks(5);
job.setPartitionerClass(ParTest.class);
FileInputFormat.setInputPaths(job, "/MR/input");
FileOutputFormat.setOutputPath(job, new Path("/MR/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
将程序打成jar包 在IDEA上打jar包的流程图
文章图片
文章图片
文章图片
文章图片
文章图片
在集群上运行
- 在集群上运行的命令:
hadoop jar MPTEST.jar FlovwBean.DriTest
- 其中FlovwBean.DriTest是Driver的绝对路径
文章图片
推荐阅读
- 优达学城深度学习之三(下)——卷积神经网络
- 20210602 TensorFlow 实现多点线性回归问题
- 打造一个window桌面应用(在线聊天对话机器人)
- #导入MD文档图片#Flask结合ECharts实现在线可视化效果,超级详细!
- Pandas高级教程之:自定义选项
- 20210607 TensorFlow 实现 Logistic 回归
- python基础篇(二十一)——文件和异常(上)
- 20210608 TensorFlow 实现数字图片分类
- 保姆级利用Github搭建自己的个人博客,看完就会