10.|10. 左外连接

date[2019-01-01]

Data Algorithms(chapter4)
数据说明
[hadoop@chen spark-data]$ cat ch4/input/users.tsv
u1 UT
u2 GA
u3 CA
u4 CA
u5 GA
[hadoop@chen spark-data]$ cat ch4/input/transactions.tsv
t1 p3 u1 1 300
t2 p1 u2 1 100
t3 p1 u1 1 100
t4 p2 u2 1 10
t5 p4 u4 1 9
t6 p1 u1 1 100
t7 p4 u1 1 9
t8 p4 u5 2 40
Left Outer Join
package org.dataalgorithms.chap04.scalaimport org.apache.spark.SparkConf import org.apache.spark.SparkContext/** * Demonstrates how to do "left outer join" on two RDD * without using Spark's inbuilt feature 'leftOuterJoin'. * * The main purpose here is to show the comparison with Hadoop * MapReduce shown earlier in the book and is only for demonstration purpose. * For your project we suggest to use Spark's built-in feature * 'leftOuterJoin' or use DataFrame (highly recommended). * * @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com) * @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com) ** */ object LeftOuterJoin {def main(args: Array[String]): Unit = { if (args.size < 3) { println("Usage: LeftOuterJoin ") sys.exit(1) }val sparkConf = new SparkConf().setAppName("LeftOuterJoin") val sc = new SparkContext(sparkConf)val usersInputFile = args(0) val transactionsInputFile = args(1) val output = args(2)val usersRaw = sc.textFile(usersInputFile) val transactionsRaw = sc.textFile(transactionsInputFile)val users = usersRaw.map(line => { val tokens = line.split("\t") (tokens(0), ("L", tokens(1))) // Tagging Locations with L })val transactions = transactionsRaw.map(line => { val tokens = line.split("\t") (tokens(2), ("P", tokens(1))) // Tagging Products with P }) println("===========users==========") users.foreach(println) println("===========transactions==========") transactions.foreach(println)// This operation is expensive and is listed to compare with Hadoop // MapReduce approach, please compare it with more optimized approach // shown in SparkLeftOuterJoin.scala or DataFramLeftOuterJoin.scala val all = users union transactions println("===========all==========") all.foreach(println)val grouped = all.groupByKey() println("===========group==========") grouped.foreach(println)val productLocations = grouped.flatMap { case (userId, iterable) => // span returns two Iterable, one containing Location and other containing Products val (location, products) = iterable span (_._1 == "L") val loc = location.headOption.getOrElse(("L", "UNKNOWN")) products.filter(_._1 == "P").map(p => (p._2, loc._2)).toSet } // val productByLocations = productLocations.groupByKey()val result = productByLocations.map(t => (t._1, t._2.size)) // Return (product, location count) tupleresult.saveAsTextFile(output) // Saves output to the file.// done sc.stop() } }

输出:
【10.|10. 左外连接】===========users==========
(u4,(L,CA))
(u1,(L,UT))
(u5,(L,GA))
(u2,(L,GA))
(u3,(L,CA))
===========transactions==========
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========all==========
(u4,(L,CA))
(u5,(L,GA))
(u1,(L,UT))
(u2,(L,GA))
(u3,(L,CA))
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========group==========
(u3,CompactBuffer((L,CA)))
(u5,CompactBuffer((L,GA), (P,p4)))
(u2,CompactBuffer((L,GA), (P,p1), (P,p2)))
(u1,CompactBuffer((L,UT), (P,p3), (P,p1), (P,p1), (P,p4)))
(u4,CompactBuffer((L,CA), (P,p4)))
============= RESULT ==============
(p4,3)
(p1,2)
(p2,1)
(p3,1)

    推荐阅读