大数据|Scala中自定义累加器的使用

使用自定义累加器的目的:
如果要使用多个累加器的话,会使程序变的复杂,不便于扩展维护
【大数据|Scala中自定义累加器的使用】 代码实现

import org.apache.spark.{AccumulatorParam, SparkConf, SparkContext}object CustomAccumulator extends AccumulatorParam[String] { override def zero(initialValue: String): String = "SESSION_COUNT=0|TIME_PERIOD_1s_3s=0|TIME_PERIOD_4s_6s=0|STEP_PERIOD_1_3=0|STEP_PERIOD_4_6=0"override def addInPlace(v1: String, v2: String): String = add(v1, v2)// v1 : SESSION_COUNT=0|TIME_PERIOD_1s_3s=0|TIME_PERIOD_4s_6s=0|STEP_PERIOD_1_3=0|STEP_PERIOD_4_6=0 // v2 : TIME_PERIOD_1s_3s // return: SESSION_COUNT=0|TIME_PERIOD_1s_3s=1|TIME_PERIOD_4s_6s=0|STEP_PERIOD_1_3=0|STEP_PERIOD_4_6=0 // 这里有三种情况: // 1、最常见的情况。v1 = 上一次的累计值,v2 = 传入的一个字段 // 2、如果只有一个分区,在计算结束的时候,v1 = 空,v2 = 该分区的累计值 // 3、如果只有多个分区,在计算结束的时候,v1 = 某一个分区的累计值,v2 = 某一个分区的累计值(v1、v2可能成对出现多数)def add(v1: String, v2: String): String = { // 对应第二种情况 if (v1.trim.size==0) return v2println(s"v1 = $v1; v2 = $v2") val mapv1 = v1.split("\\|").map(x=>(x.split("=")(0), x.split("=")(1))).toMapval result = if (v2.split("\\|").size > 1) { // 对应第三种情况(分区的结果做合并) val mapv2 = v2.split("\\|").map(x=>(x.split("=")(0), x.split("=")(1))).toMap val result3 = for ((key, value) <- mapv1) yield { key + "=" + (mapv2(key).toInt + value.toInt) } result3.mkString("|") } else { // 对应第一种情况(最常见的情况) val result1 = for ((key, value) <- mapv1) yield { if (key==v2) key + "=" + (value.toInt+1) else key + "=" + value } result1.mkString("|") }result }def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[3]") val sc = new SparkContext(sparkConf) sc.addJar("/home/spark/IdeaProjects/SparkSamples/out/artifacts/SparkSamples_jar/SparkSamples.jar") sc.setLogLevel("WARN")val accum = sc.accumulator("")(CustomAccumulator) val rdd = sc.makeRDD(Array("TIME_PERIOD_1s_3s", "TIME_PERIOD_4s_6s", "STEP_PERIOD_1_3", "STEP_PERIOD_4_6", "SESSION_COUNT", "TIME_PERIOD_1s_3s", "TIME_PERIOD_4s_6s", "STEP_PERIOD_1_3", "STEP_PERIOD_4_6", "SESSION_COUNT")) rdd.foreach(x=>{ accum.add(x) }) println(accum.value) sc.stop() } }


    推荐阅读