Spark的broadcast

Spark的Broadcast 1. 概述 在实际场景中,当1个function传递到1个spark operation(例如:map、reduce)时,这个function是在远程的集群node上被执行的。这些变量会被复制到每一台机器,在远程机器上不会更新这些变量,然后又传送回driver program。跨tasks共享读写变量的支持,通常是低效率的。然而,spark提供了2种通用的共享变量模式:广播变量和累加器。
Broadcast(广播)共享配置文件,map数据集,树形数据结构等,为能够更好更快速为TASK任务使用相关变量。也可以使用,当然也可以使用redis保存共享数据,让每一个task连接redis,获取共享数据。
2. 简单示例
首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进行广播,最后在rdd的每一个partition的迭代时,使用这个广播变量.

val values = List[Int](1,2,3) val broadcastValues = sparkContext.broadcast(values) rdd.mapPartitions(iter => { broadcastValues.getValue.foreach(println) })

3. 具体示例:IP归属地查询
【Spark的broadcast】ip.txt文件的内容:
1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
object IPFind {//Ip地址转换成十进制数字 def ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length) { ipNum = fragments(i).toLong | ipNum << 8L } ipNum }//二分查找 def binarySearch(lines: Array[(String, String, String)], ip: Long): Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) return middle if (ip < lines(middle)._1.toLong) high = middle - 1 else { low = middle + 1 } } -1 }def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.1"); val conf = new SparkConf().setAppName("IpJdbc2").setMaster("local[2]") val sc = new SparkContext(conf)val rdd1 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{ val fields = line.split("\\|") val start_num = fields(2) val end_num = fields(3) val province = fields(6) (start_num,end_num,province) }) //生成不可变的集合,广播到task中去 val rpRulesBroakcast =rdd1.collect() val ipRulesBroadcast = sc.broadcast(rpRulesBroakcast) //读取要查找的Ip地址 val rdd3 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{ val fields = line.split("\\|") fields(1) })//在每个task中获取广播值,进行查询 val result = rdd3.map(ip =>{ val ipNum = ip2Long(ip.toString) val index = binarySearch(ipRulesBroadcast.value,ipNum) val info = ipRulesBroadcast.value(index) //(ip的起始Num, ip的结束Num,省份名) info }).map(t => (t._3, 1)).reduceByKey(_+_)}

    推荐阅读