使用Spark处理二次排序问题

现在有这样一个需求:
有这样一份log日志记录了某时间戳下某个设备访问网站时产生的上行流量、下行流量。
时间戳/设备号/上行流量/下行流量
使用Spark处理二次排序问题
文章图片

现在想统计出每个设备号的最早访问时间及总的上行流量、下行流量,最后打印出10个按上行流量、下行流量排序的最多的10个记录。
思路:涉及到排序问题,我们可以使用Spark的sortByKey算子,我们可以自定义排序方式,实现Comparable接口即可;另外spark只能根据key进行排序,如果我们要排序的key,value不符合要排序的在前,可以将其调转顺序。
1.生成基础访问数据:

/** * 用于生成需要的模拟数据,并把数据写入到file中 */ public class DataFileGenerator { public static void main(String[] args) { //生成100个设备号 List deviceIds = new ArrayList<>(); for(int i=0; i<100; i++){ deviceIds.add(getUuid()); }//生成10000个时间戳和上下行流量 Random random = new Random(); StringBuffer sb = new StringBuffer(); for(int i=0; i<10000; i++){ //时间戳 long timestamp = System.currentTimeMillis() - random.nextInt(10000); //设备id String deviceId = deviceIds.get(random.nextInt(100)); //上行流量 int upTraffic = random.nextInt(10000); int downTraffic = random.nextInt(10000); sb.append(timestamp).append("\t").append(deviceId).append("\t").append(upTraffic).append("\t").append(downTraffic).append("\n"); }//将数据写出到磁盘 PrintWriter printWriter = null; try { printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream("./app-log.txt"))); printWriter.write(sb.toString()); } catch (FileNotFoundException e) { e.printStackTrace(); }finally { printWriter.close(); } }protected static String getUuid(){ return UUID.randomUUID().toString().replace("-",""); } }


【使用Spark处理二次排序问题】2.万物皆对象,定义访问日志的实体类,并实现Comparable接口,定义比较器方法
/** * 某个设备的访问信息 */ @Data @Accessors(chain = true) @NoArgsConstructor @AllArgsConstructor public class AccessLogInfo implements Serializable,Comparable {/** * 时间戳 */ private Long timestamp; /** * 上行流量 */ private Integer upTraffic; /** * 下行流量 */ private Integer downTraffic; @Override public int compareTo(AccessLogInfo o1) { //先比较上行流量,如果相等,则比较下行流量 if(getUpTraffic()-o1.getUpTraffic()==0){ return getDownTraffic()-o1.getDownTraffic(); }else{ return getUpTraffic()-o1.getUpTraffic(); } } }

3.开发处理业务逻辑代码:
public class AppLogSpark { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("AppLogSpark").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD lines = context.textFile("./app-log.txt"); JavaPairRDD logInfoJavaPairRDD = mapToPair(lines); JavaPairRDD aggregateData = https://www.it610.com/article/aggregateByDeviceId(logInfoJavaPairRDD); JavaPairRDD swapData = swapData(aggregateData); //排序 JavaPairRDD sortData = swapData.sortByKey(false); List result = sortData.take(10); for(Tuple2 t : result){ System.out.println(t._2 + "\t" + t._1.getTimestamp()+"\t"+t._1.getUpTraffic()+"\t"+t._1.getDownTraffic()+"\n"); }context.close(); }//将读取的每一行数据处理成 deviceId - AccessLogInfo的格式 private static JavaPairRDD mapToPair(JavaRDD lines){ return lines.mapToPair(new PairFunction() { @Override public Tuple2 call(String accessLog) throws Exception { String[] split = accessLog.split("\t"); Long timestamp = Long.parseLong(split[0]); String deviceId = split[1]; Integer upTraffic = Integer.parseInt(split[2]); Integer downTraffic = Integer.parseInt(split[3]); AccessLogInfo logInfo = new AccessLogInfo().setTimestamp(timestamp).setUpTraffic(upTraffic).setDownTraffic(downTraffic); return new Tuple2<>(deviceId,logInfo); } }); }//根据key处理聚合数据 private static JavaPairRDD aggregateByDeviceId(JavaPairRDD logInfoJavaPairRDD){ return logInfoJavaPairRDD.reduceByKey(new Function2() { @Override public AccessLogInfo call(AccessLogInfo v1, AccessLogInfo v2) throws Exception { Long timestamp = v1.getTimestamp() <= v2.getTimestamp() ? v1.getTimestamp() : v2.getTimestamp(); int upTraffic = v1.getUpTraffic() + v2.getUpTraffic(); int downTraffic = v1.getDownTraffic() + v2.getDownTraffic(); return new AccessLogInfo(timestamp, upTraffic, downTraffic); } }); }// 将数据的key value进行转换 private static JavaPairRDD swapData(JavaPairRDD aggregateData){ return aggregateData.mapToPair(new PairFunction() { @Override public Tuple2 call(Tuple2 tuple2) throws Exception { return tuple2.swap(); } }); }}

最终生成的结果如下:
使用Spark处理二次排序问题
文章图片

参考文档:https://blog.csdn.net/dream_broken/article/details/78354699

    推荐阅读