10.|10. 左外连接
date[2019-01-01]
Data Algorithms(chapter4)数据说明
[hadoop@chen spark-data]$ cat ch4/input/users.tsv
u1 UT
u2 GA
u3 CA
u4 CA
u5 GA
[hadoop@chen spark-data]$ cat ch4/input/transactions.tsv
t1 p3 u1 1 300
t2 p1 u2 1 100
t3 p1 u1 1 100
t4 p2 u2 1 10
t5 p4 u4 1 9
t6 p1 u1 1 100
t7 p4 u1 1 9
t8 p4 u5 2 40
Left Outer Join
package org.dataalgorithms.chap04.scalaimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext/**
* Demonstrates how to do "left outer join" on two RDD
* without using Spark's inbuilt feature 'leftOuterJoin'.
*
* The main purpose here is to show the comparison with Hadoop
* MapReduce shown earlier in the book and is only for demonstration purpose.
* For your project we suggest to use Spark's built-in feature
* 'leftOuterJoin' or use DataFrame (highly recommended).
*
* @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com)
* @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com)
**
*/
object LeftOuterJoin {def main(args: Array[String]): Unit = {
if (args.size < 3) {
println("Usage: LeftOuterJoin ")
sys.exit(1)
}val sparkConf = new SparkConf().setAppName("LeftOuterJoin")
val sc = new SparkContext(sparkConf)val usersInputFile = args(0)
val transactionsInputFile = args(1)
val output = args(2)val usersRaw = sc.textFile(usersInputFile)
val transactionsRaw = sc.textFile(transactionsInputFile)val users = usersRaw.map(line => {
val tokens = line.split("\t")
(tokens(0), ("L", tokens(1))) // Tagging Locations with L
})val transactions = transactionsRaw.map(line => {
val tokens = line.split("\t")
(tokens(2), ("P", tokens(1))) // Tagging Products with P
})
println("===========users==========")
users.foreach(println)
println("===========transactions==========")
transactions.foreach(println)// This operation is expensive and is listed to compare with Hadoop
// MapReduce approach, please compare it with more optimized approach
// shown in SparkLeftOuterJoin.scala or DataFramLeftOuterJoin.scala
val all = users union transactions
println("===========all==========")
all.foreach(println)val grouped = all.groupByKey()
println("===========group==========")
grouped.foreach(println)val productLocations = grouped.flatMap {
case (userId, iterable) =>
// span returns two Iterable, one containing Location and other containing Products
val (location, products) = iterable span (_._1 == "L")
val loc = location.headOption.getOrElse(("L", "UNKNOWN"))
products.filter(_._1 == "P").map(p => (p._2, loc._2)).toSet
}
//
val productByLocations = productLocations.groupByKey()val result = productByLocations.map(t => (t._1, t._2.size)) // Return (product, location count) tupleresult.saveAsTextFile(output) // Saves output to the file.// done
sc.stop()
}
}
输出:
【10.|10. 左外连接】===========users==========
(u4,(L,CA))
(u1,(L,UT))
(u5,(L,GA))
(u2,(L,GA))
(u3,(L,CA))
===========transactions==========
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========all==========
(u4,(L,CA))
(u5,(L,GA))
(u1,(L,UT))
(u2,(L,GA))
(u3,(L,CA))
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========group==========
(u3,CompactBuffer((L,CA)))
(u5,CompactBuffer((L,GA), (P,p4)))
(u2,CompactBuffer((L,GA), (P,p1), (P,p2)))
(u1,CompactBuffer((L,UT), (P,p3), (P,p1), (P,p1), (P,p4)))
(u4,CompactBuffer((L,CA), (P,p4)))
============= RESULT ==============
(p4,3)
(p1,2)
(p2,1)
(p3,1)
推荐阅读
- 科塔德综合征
- 10.两种记账方式
- py连接mysql
- Android|Android BLE蓝牙连接异常处理
- 周总结(10.5-10.11)
- 今日自我介绍,感恩所遇一切
- V-learn小西妈双语工程2017年03期144号谢思岩Carlos2017.10.21-10.22
- springboot整合数据库连接池-->druid
- 10.23宜尽释前嫌
- Python3|Python3 MySQL 数据库连接