数据算法|数据算法 Hadoop/Spark大数据处理---第八章

本章为求共同好友
【数据算法|数据算法 Hadoop/Spark大数据处理---第八章】主要的方法是

  • 先获得好友的列表,一般为:(100,100 200 300 400)之类
  • 进行转换变成[(100,200),(100,100 200 300 400)],[(100,300),(100,100 200 300 400)]之类。
  • 之后对key进行排序
  • 最后寻找合并之后的key中的value中相同的即可
本章一共有三种实现方式
  1. 基于传统mapreduce实现
  2. 基于传统spark实现
  3. 基于传统的Scala实现
++基于传统mapreduce实现++
//map函数 //获得friend列表 static String getFriends(String[] tokens) { if (tokens.length == 2) { return ""; } StringBuilder builder = new StringBuilder(); for (int i = 1; i < tokens.length; i++) { builder.append(tokens[i]); if (i < (tokens.length - 1)) { builder.append(","); } } return builder.toString(); } //对key进行排序,确保相同的为一组 static String buildSortedKey(String person, String friend) { long p = Long.parseLong(person); long f = Long.parseLong(friend); if (p < f) { return person + "," + friend; } else { return friend + "," + person; } }public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // parse input, delimiter is a single space String[] tokens = StringUtils.split(value.toString(), " "); // create reducer value String friends = getFriends(tokens); REDUCER_VALUE.set(friends); //获得整串friend序列的host String person = tokens[0]; for (int i = 1; i < tokens.length; i++) { String friend = tokens[i]; //让person与每个friend逐一的成对,buildSortedKey返回一个string,也是一个text String reducerKeyAsString = buildSortedKey(person, friend); REDUCER_KEY.set(reducerKeyAsString); context.write(REDUCER_KEY, REDUCER_VALUE); } }

//reduce函数 @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Map map = new HashMap(); //对value进行操作 Iterator iterator = values.iterator(); int numOfValues = 0; while (iterator.hasNext()) { String friends = iterator.next().toString(); if (friends.equals("")) { context.write(key, new Text("[]")); return; } addFriends(map, friends); numOfValues++; }// now iterate the map to see how many have numOfValues List commonFriends = new ArrayList(); for (Map.Entry entry : map.entrySet()) { //numOfValues为总数,当entry.getValue()与之相用时,则证明两个人的好友列表都有 if (entry.getValue() == numOfValues) { commonFriends.add(entry.getKey()); } }// sen it to output context.write(key, new Text(commonFriends.toString())); }static void addFriends(Map map, String friendsList) { //获得value中的friends数组 String[] friends = StringUtils.split(friendsList, ","); for (String friend : friends) { Integer count = map.get(friend); if (count == null) { map.put(friend, 1); } else { map.put(friend, ++count); } } }

++基于传统spark实现++
//传统spark实现 public static void main(String[] args) throws Exception {if (args.length < 1) { // Spark master URL: // format:spark://:7077 // example:spark://myserver00:7077 System.err.println("Usage: FindCommonFriends "); System.exit(1); }//String sparkMasterURL = args[0]; //System.out.println("sparkMasterURL="+sparkMasterURL); String hdfsInputFileName = args[0]; System.out.println("hdfsInputFileName="+hdfsInputFileName); // create context object JavaSparkContext ctx = SparkUtil.createJavaSparkContext("FindCommonFriends"); // create the first RDD from input file JavaRDD records = ctx.textFile(hdfsInputFileName, 1); // debug0 List debug0 = records.collect(); for (String t : debug0) { System.out.println("debug0 record="+t); }// PairFlatMapFunction // T => Iterable> JavaPairRDD,Iterable> pairs = //TKV records.flatMapToPair(new PairFlatMapFunction, Iterable>() { @Override public Iterator,Iterable>> call(String s) { //String的输入格式为100,100 200 400 300 etc String[] tokens = s.split(","); long person = Long.parseLong(tokens[0]); String friendsAsString = tokens[1]; String[] friendsTokenized = friendsAsString.split(" "); //只有一个好友的情况,即两人没有共同的好友 if (friendsTokenized.length == 1) { Tuple2 key = buildSortedTuple(person, Long.parseLong(friendsTokenized[0])); return Arrays.asList(new Tuple2,Iterable>(key, new ArrayList())).iterator(); } //获得出person的好友列表 List friends = new ArrayList(); for (String f : friendsTokenized) { friends.add(Long.parseLong(f)); } //发送特定格式的Tuple2 List ,Iterable>> result = new ArrayList ,Iterable>>(); for (Long f : friends) { Tuple2 key = buildSortedTuple(person, f); result.add(new Tuple2, Iterable>(key, friends)); } return result.iterator(); } }); // 输出Tuple2 List ,Iterable>> debug1 = pairs.collect(); for (Tuple2,Iterable> t2 : debug1) { System.out.println("debug1 key="+t2._1+"\t value="https://www.it610.com/article/+t2._2); } //对key进行排序 JavaPairRDD, Iterable>> grouped = pairs.groupByKey(); // debug2 List ,Iterable>>> debug2 = grouped.collect(); for (Tuple2, Iterable>> t2 : debug2) { System.out.println("debug2 key="+t2._1+"\t value="https://www.it610.com/article/+t2._2); }// Find intersection of all List> // mapValues[U](f: (V) => U): JavaPairRDD[K, U] // Pass each value in the key-value pair RDD through a map function without changing the keys; // this also retains the original RDD's partitioning. JavaPairRDD, Iterable> commonFriends = grouped.mapValues(new Function< Iterable>, // input Iterable// output >() { @Override public Iterable call(Iterable> s) { Map countCommon = new HashMap(); int size = 0; for (Iterable iter : s) { size++; List list = iterableToList(iter); if ((list == null) || (list.isEmpty())) { continue; } // for (Long f : list) { Integer count = countCommon.get(f); if (count == null) { countCommon.put(f, 1); } else { countCommon.put(f, ++count); } } }// if countCommon.Entry ==countCommon.Entry // then that is a common friend List finalCommonFriends = new ArrayList(); for (Map.Entry entry : countCommon.entrySet()){ //size表示总数目,当entry的value等于的时候,则表示双方都有的共同好友 if (entry.getValue() == size) { finalCommonFriends.add(entry.getKey()); } } return finalCommonFriends; } }); // debug3 List, Iterable>> debug3 = commonFriends.collect(); for (Tuple2, Iterable> t2 : debug3) { System.out.println("debug3 key="+t2._1+ "\t value="https://www.it610.com/article/+t2._2); }System.exit(0); }static Tuple2 buildSortedTuple(long a, long b) { if (a < b) { return new Tuple2(a,b); } else { return new Tuple2(b,a); } }static List iterableToList(Iterable iterable) { List list = new ArrayList(); for (Long item : iterable) { list.add(item); } return list; }

++基于传统的Scala实现++
//传统Scala实现 def main(args: Array[String]): Unit = { if (args.size < 2) { println("Usage: FindCommonFriends ") sys.exit(1) }val sparkConf = new SparkConf().setAppName("FindCommonFriends") val sc = new SparkContext(sparkConf)val input = args(0) val output = args(1)val records = sc.textFile(input)val pairs = records.flatMap(s => { val tokens = s.split(",") //获得host val person = tokens(0).toLong //获得friend列表的list方式 val friends = tokens(1).split("\\s+").map(_.toLong).toList val result = for { i <- 0 until friends.size friend = friends(i) //这里就以后把host与所有friend的全部生产出来了 } yield { //这个判断person和friend的方法,也即是在完成key的排序 if (person < friend) ((person, friend), friends) else ((friend, person), friends) } result }) //对key进行分组 val grouped = pairs.groupByKey()val commonFriends = grouped.mapValues(iter => { val friendCount = for { list <- iter if !list.isEmpty friend <- list //分组类所有人的friend都赋值上1 } yield ((friend, 1)) //分到相用组eg:[a,,1,1] 把这个序列打平编程[a,,(1,1)] //然后对第二个_.2进行相加,之后大于1的取出它的key friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1) })// save the result to the file commonFriends.saveAsTextFile(output) //Format result for easy viewing val formatedResult = commonFriends.map( f => s"(${f._1._1}, ${f._1._2})\t${f._2.mkString("[", ", ", "]")}" )formatedResult.foreach(println)// done! sc.stop() }

    推荐阅读