spark的二次排序(封装对象)

二次排序原理
Spark中大于两列的排序都叫二次排序, 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果,本文采用封装对象的编程思想进行二次排序,大大简化的代码的复杂度。
废话少说,上代码...实践是检验真理的唯一标准.......
java代码实现:

package qq1; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.io.Serializable; //新建类 class MySort implements Comparable, Serializable{ publicInteger i1; publicInteger i2; //构造方法 public MySort(Integer i1, Integer i2) { this.i1 = i1; this.i2 = i2; } //精髓 @Override public int compareTo(MySort o) { //重新定义比较规则 if(this.i1 == o.i1){//当第一列的值相同时,比较第二列的值 return this.i2 - o.i2; }else {//当第一列的值不相同,比较第一列的值 return this.i1 - o.i1; } } }public class TestSort { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("sort"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD lines = sc.textFile("./data/sort.txt"); //按照空格切分,封存到MySort对象中 JavaPairRDD trans = lines.mapToPair(new PairFunction() {@Override public Tuple2 call(String line) throws Exception { return new Tuple2<>(new MySort(Integer.valueOf(line.split(" ")[0]), Integer.valueOf(line.split(" ")[1])), line); } }); //按照第一列排好序后,第一列不动, //第一列相同值,但第二列值不同的在进行二次排序 trans.sortByKey().map(new Function,String>() {@Override public String call(Tuple2 v) throws Exception { return v._2; } }).foreach(new VoidFunction() {//遍历 @Override public void call(String s) throws Exception { System.out.println(s); } }); }}


原始数据
1 3 3 6 1 5 5 7 5 6 2 4 7 3 1 1 4 6 7 1 3 1 1 9 2 5 2 1

排序后的数据
【spark的二次排序(封装对象)】1 1
1 3
1 5
1 9
2 1
2 4
2 5
3 1
3 6
4 6
5 6
5 7
7 1
7 3

    推荐阅读