使用Spark处理二次排序问题
现在有这样一个需求:
有这样一份log日志记录了某时间戳下某个设备访问网站时产生的上行流量、下行流量。
时间戳/设备号/上行流量/下行流量
文章图片
现在想统计出每个设备号的最早访问时间及总的上行流量、下行流量,最后打印出10个按上行流量、下行流量排序的最多的10个记录。
思路:涉及到排序问题,我们可以使用Spark的sortByKey算子,我们可以自定义排序方式,实现Comparable接口即可;另外spark只能根据key进行排序,如果我们要排序的key,value不符合要排序的在前,可以将其调转顺序。
1.生成基础访问数据:
/** * 用于生成需要的模拟数据,并把数据写入到file中 */ public class DataFileGenerator { public static void main(String[] args) { //生成100个设备号 List deviceIds = new ArrayList<>(); for(int i=0; i<100; i++){ deviceIds.add(getUuid()); }//生成10000个时间戳和上下行流量 Random random = new Random(); StringBuffer sb = new StringBuffer(); for(int i=0; i<10000; i++){ //时间戳 long timestamp = System.currentTimeMillis() - random.nextInt(10000); //设备id String deviceId = deviceIds.get(random.nextInt(100)); //上行流量 int upTraffic = random.nextInt(10000); int downTraffic = random.nextInt(10000); sb.append(timestamp).append("\t").append(deviceId).append("\t").append(upTraffic).append("\t").append(downTraffic).append("\n"); }//将数据写出到磁盘 PrintWriter printWriter = null; try { printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream("./app-log.txt"))); printWriter.write(sb.toString()); } catch (FileNotFoundException e) { e.printStackTrace(); }finally { printWriter.close(); } }protected static String getUuid(){ return UUID.randomUUID().toString().replace("-",""); } }
【使用Spark处理二次排序问题】2.万物皆对象,定义访问日志的实体类,并实现Comparable接口,定义比较器方法
3.开发处理业务逻辑代码:/** * 某个设备的访问信息 */ @Data @Accessors(chain = true) @NoArgsConstructor @AllArgsConstructor public class AccessLogInfo implements Serializable,Comparable {/** * 时间戳 */ private Long timestamp; /** * 上行流量 */ private Integer upTraffic; /** * 下行流量 */ private Integer downTraffic; @Override public int compareTo(AccessLogInfo o1) { //先比较上行流量,如果相等,则比较下行流量 if(getUpTraffic()-o1.getUpTraffic()==0){ return getDownTraffic()-o1.getDownTraffic(); }else{ return getUpTraffic()-o1.getUpTraffic(); } } }
最终生成的结果如下:public class AppLogSpark { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("AppLogSpark").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD lines = context.textFile("./app-log.txt"); JavaPairRDD logInfoJavaPairRDD = mapToPair(lines); JavaPairRDD aggregateData = https://www.it610.com/article/aggregateByDeviceId(logInfoJavaPairRDD); JavaPairRDD swapData = swapData(aggregateData); //排序 JavaPairRDD sortData = swapData.sortByKey(false); List
result = sortData.take(10); for(Tuple2 t : result){ System.out.println(t._2 + "\t" + t._1.getTimestamp()+"\t"+t._1.getUpTraffic()+"\t"+t._1.getDownTraffic()+"\n"); }context.close(); }//将读取的每一行数据处理成 deviceId - AccessLogInfo的格式 private static JavaPairRDD mapToPair(JavaRDD lines){ return lines.mapToPair(new PairFunction() { @Override public Tuple2 call(String accessLog) throws Exception { String[] split = accessLog.split("\t"); Long timestamp = Long.parseLong(split[0]); String deviceId = split[1]; Integer upTraffic = Integer.parseInt(split[2]); Integer downTraffic = Integer.parseInt(split[3]); AccessLogInfo logInfo = new AccessLogInfo().setTimestamp(timestamp).setUpTraffic(upTraffic).setDownTraffic(downTraffic); return new Tuple2<>(deviceId,logInfo); } }); }//根据key处理聚合数据 private static JavaPairRDD aggregateByDeviceId(JavaPairRDD logInfoJavaPairRDD){ return logInfoJavaPairRDD.reduceByKey(new Function2() { @Override public AccessLogInfo call(AccessLogInfo v1, AccessLogInfo v2) throws Exception { Long timestamp = v1.getTimestamp() <= v2.getTimestamp() ? v1.getTimestamp() : v2.getTimestamp(); int upTraffic = v1.getUpTraffic() + v2.getUpTraffic(); int downTraffic = v1.getDownTraffic() + v2.getDownTraffic(); return new AccessLogInfo(timestamp, upTraffic, downTraffic); } }); }// 将数据的key value进行转换 private static JavaPairRDD swapData(JavaPairRDD aggregateData){ return aggregateData.mapToPair(new PairFunction () { @Override public Tuple2 call(Tuple2 tuple2) throws Exception { return tuple2.swap(); } }); }}
文章图片
参考文档:https://blog.csdn.net/dream_broken/article/details/78354699
推荐阅读
- 由浅入深理解AOP
- 【译】20个更有效地使用谷歌搜索的技巧
- mybatisplus如何在xml的连表查询中使用queryWrapper
- MybatisPlus|MybatisPlus LambdaQueryWrapper使用int默认值的坑及解决
- MybatisPlus使用queryWrapper如何实现复杂查询
- Java|Java OpenCV图像处理之SIFT角点检测详解
- 事件处理程序
- iOS中的Block
- Linux下面如何查看tomcat已经使用多少线程
- 使用composer自动加载类文件