在mysql中、spark中分组concat排序去重

做实验 遇到一个很经典的问题,分组concat 排序 去重。
下面分别用mysql、spark dataframe、spark sql和 rdd 实现这个需求
首先看mysql 表结构

-- ---------------------------- -- Table structure for `test` -- ---------------------------- DROP TABLE IF EXISTS `test`; CREATE TABLE `test` ( `time` varchar(20) DEFAULT NULL, `app` varchar(20) DEFAULT NULL, `appstore` varchar(20) DEFAULT NULL, `version` varchar(20) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of test -- ---------------------------- INSERT INTO `test` VALUES ('2019-01-14', '王者荣耀', 'TapTap', 'v2.1'); INSERT INTO `test` VALUES ('2019-01-14', '王者荣耀', 'app store', 'v1.2'); INSERT INTO `test` VALUES ('2019-01-14', '王者荣耀', '应用宝', 'v1.3'); INSERT INTO `test` VALUES ('2019-01-14', '绝地求生', 'TapTap', 'v2.4'); INSERT INTO `test` VALUES ('2019-01-14', '绝地求生', 'app store', '2.3'); INSERT INTO `test` VALUES ('2019-01-14', '绝地求生', '应用宝', 'v2.4'); INSERT INTO `test` VALUES ('2019-02-14', '王者荣耀', 'app store', 'v1.8'); INSERT INTO `test` VALUES ('2019-02-14', '王者荣耀', 'TapTap', 'v1.8'); INSERT INTO `test` VALUES ('2019-02-14', '王者荣耀', '应用宝', 'v1.9'); INSERT INTO `test` VALUES ('2019-02-14', '绝地求生', 'TapTap', 'v2.4'); INSERT INTO `test` VALUES ('2019-02-14', '绝地求生', 'app store', 'v2.5'); INSERT INTO `test` VALUES ('2019-02-14', '绝地求生', '应用宝', 'v2.4'); INSERT INTO `test` VALUES ('2019-03-14', '王者荣耀', 'app store', 'v1.9'); INSERT INTO `test` VALUES ('2019-03-14', '王者荣耀', '应用宝', 'v2.1'); INSERT INTO `test` VALUES ('2019-03-14', '绝地求生', 'TapTap', 'v2.6'); INSERT INTO `test` VALUES ('2019-03-14', '绝地求生', 'app store', 'v2.6'); INSERT INTO `test` VALUES ('2019-03-14', '绝地求生', '应用宝', 'v2.6'); INSERT INTO `test` VALUES ('2019-03-14', '王者荣耀', 'TapTap', 'v2.0');

在mysql中、spark中分组concat排序去重
文章图片

需求是 按照app、appstore分组,统计出版本的更新情况,当然了,版本肯定要去重、排序
1、 sql 如下:
SELECT a.app, a.appstore, count(1), GROUP_CONCAT(distinct a.version order by a.version) FROM hepenghui.test a GROUP BY a.app,a.appstore

经测试,在mysql 8.0.11版本中,不用 order by a.version 也可以排序,貌似是distinct默认会排序,但是在实验环境中,mysql是5.7版本的,必须用以上sql才可以排序。
对了,mysql 5.7 默认的sql_mode 有 only_full_group_by,所以只能执行严格的group by,如果想要修改
select @@global.sql_mode; -- 去掉 only_full_group_by set @@global.sql_mode= 'STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

2、spark sql & dataframe
准备代码: info = spark.read.csv("file:///home/ub01/hepenghui/spark/test.csv") info = info.toDF('time','app','appstore','version')## spark sql 的解决方式 info.createOrReplaceTempView("test") df = spark.sql( "select a.app,a.appstore,count(1)," "concat_ws('; ',sort_array(collect_set(a.version))) as versions " "from test agroup by a.app,a.appstore order by a.app,a.appstore")## spark dataframe 的解决方式from pyspark.sql.functions import * df = info.groupBy(info.app,info.appstore)\ .agg((concat_ws("; ",sort_array(collect_set(info.version)))).alias("versions"))

【在mysql中、spark中分组concat排序去重】3、spark rdd 的方式就简单了
#coding=utf8from pyspark import SparkContextsc = SparkContext() dataFile = "hdfs:///user/ub01/test.txt" dataRdd = sc.textFile(dataFile) # 2019-01-14,王者荣耀,TapTap,v1.3 def splitData(line): list = line.split(',') return (list[1],list[2]),(list[3])rddSplit = dataRdd.map(splitData) rddGroup = rddSplit.groupByKey().mapValues(set)def sortD(list): list.sort() return listrddSort = rddGroup.mapValues(list).mapValues(sortD).sortByKey() listResult = rddSort.collect()for i in listResult: print(i)rddSort.saveAsTextFile("hdfs:///user/ub01/result")

    推荐阅读