Spark的broadcast
Spark的Broadcast
1. 概述 在实际场景中,当1个function传递到1个spark operation(例如:map、reduce)时,这个function是在远程的集群node上被执行的。这些变量会被复制到每一台机器,在远程机器上不会更新这些变量,然后又传送回driver program。跨tasks共享读写变量的支持,通常是低效率的。然而,spark提供了2种通用的共享变量模式:广播变量和累加器。
Broadcast(广播)共享配置文件,map数据集,树形数据结构等,为能够更好更快速为TASK任务使用相关变量。也可以使用,当然也可以使用redis保存共享数据,让每一个task连接redis,获取共享数据。
2. 简单示例
首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进行广播,最后在rdd的每一个partition的迭代时,使用这个广播变量.
val values = List[Int](1,2,3)
val broadcastValues = sparkContext.broadcast(values)
rdd.mapPartitions(iter => {
broadcastValues.getValue.foreach(println)
})
3. 具体示例:IP归属地查询
【Spark的broadcast】ip.txt文件的内容:
1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
object IPFind {//Ip地址转换成十进制数字
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length) {
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}//二分查找
def binarySearch(lines: Array[(String, String, String)], ip: Long): Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
return middle
if (ip < lines(middle)._1.toLong)
high = middle - 1
else {
low = middle + 1
}
}
-1
}def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.1");
val conf = new SparkConf().setAppName("IpJdbc2").setMaster("local[2]")
val sc = new SparkContext(conf)val rdd1 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{
val fields = line.split("\\|")
val start_num = fields(2)
val end_num = fields(3)
val province = fields(6)
(start_num,end_num,province)
}) //生成不可变的集合,广播到task中去
val rpRulesBroakcast =rdd1.collect()
val ipRulesBroadcast = sc.broadcast(rpRulesBroakcast) //读取要查找的Ip地址
val rdd3 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{
val fields = line.split("\\|")
fields(1)
})//在每个task中获取广播值,进行查询
val result = rdd3.map(ip =>{
val ipNum = ip2Long(ip.toString)
val index = binarySearch(ipRulesBroadcast.value,ipNum)
val info = ipRulesBroadcast.value(index)
//(ip的起始Num, ip的结束Num,省份名)
info
}).map(t => (t._3, 1)).reduceByKey(_+_)}
推荐阅读
- 热闹中的孤独
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 放屁有这三个特征的,请注意啦!这说明你的身体毒素太多
- 一个人的旅行,三亚
- 布丽吉特,人生绝对的赢家
- 慢慢的美丽
- 尽力
- 一个小故事,我的思考。
- 家乡的那条小河
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量