Spark中广播的使用

import org.apache.spark.{SparkConf, SparkContext}/** day30课程 * Created by root on 2016/5/18. */ object IPLocation {def ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length){ ipNum =fragments(i).toLong | ipNum << 8L } ipNum }def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) return middle if (ip < lines(middle)._1.toLong) high = middle - 1 else { low = middle + 1 } } -1 }def main(args: Array[String]) {val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation") val sc = new SparkContext(conf)val ipRulesRdd = sc.textFile("c://ip.txt").map(line =>{ val fields = line.split("\\|") val start_num = fields(2) val end_num = fields(3) val province = fields(6) (start_num, end_num, province) }) //全部的ip映射规则 val ipRulesArrary = ipRulesRdd.collect()//广播规则 val ipRulesBroadcast = sc.broadcast(ipRulesArrary)//加载要处理的数据 val ipsRDD = sc.textFile("c://access_log").map(line => { val fields = line.split("\\|") fields(1) })val result = ipsRDD.map(ip => { val ipNum = ip2Long(ip) val index = binarySearch(ipRulesBroadcast.value, ipNum) val info = ipRulesBroadcast.value(index) info }) println(result.collect().toBuffer) sc.stop() } }


    推荐阅读