使用自定义累加器的目的:
如果要使用多个累加器的话,会使程序变的复杂,不便于扩展维护
【大数据|Scala中自定义累加器的使用】 代码实现
import org.apache.spark.{AccumulatorParam, SparkConf, SparkContext}object CustomAccumulator extends AccumulatorParam[String] {
override def zero(initialValue: String): String = "SESSION_COUNT=0|TIME_PERIOD_1s_3s=0|TIME_PERIOD_4s_6s=0|STEP_PERIOD_1_3=0|STEP_PERIOD_4_6=0"override def addInPlace(v1: String, v2: String): String = add(v1, v2)// v1 : SESSION_COUNT=0|TIME_PERIOD_1s_3s=0|TIME_PERIOD_4s_6s=0|STEP_PERIOD_1_3=0|STEP_PERIOD_4_6=0
// v2 : TIME_PERIOD_1s_3s
// return: SESSION_COUNT=0|TIME_PERIOD_1s_3s=1|TIME_PERIOD_4s_6s=0|STEP_PERIOD_1_3=0|STEP_PERIOD_4_6=0
// 这里有三种情况:
// 1、最常见的情况。v1 = 上一次的累计值,v2 = 传入的一个字段
// 2、如果只有一个分区,在计算结束的时候,v1 = 空,v2 = 该分区的累计值
// 3、如果只有多个分区,在计算结束的时候,v1 = 某一个分区的累计值,v2 = 某一个分区的累计值(v1、v2可能成对出现多数)def add(v1: String, v2: String): String = {
// 对应第二种情况
if (v1.trim.size==0) return v2println(s"v1 = $v1;
v2 = $v2")
val mapv1 = v1.split("\\|").map(x=>(x.split("=")(0), x.split("=")(1))).toMapval result = if (v2.split("\\|").size > 1) {
// 对应第三种情况(分区的结果做合并)
val mapv2 = v2.split("\\|").map(x=>(x.split("=")(0), x.split("=")(1))).toMap
val result3 = for ((key, value) <- mapv1) yield {
key + "=" + (mapv2(key).toInt + value.toInt)
}
result3.mkString("|")
}
else {
// 对应第一种情况(最常见的情况)
val result1 = for ((key, value) <- mapv1) yield {
if (key==v2) key + "=" + (value.toInt+1)
else key + "=" + value
}
result1.mkString("|")
}result
}def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[3]")
val sc = new SparkContext(sparkConf)
sc.addJar("/home/spark/IdeaProjects/SparkSamples/out/artifacts/SparkSamples_jar/SparkSamples.jar")
sc.setLogLevel("WARN")val accum = sc.accumulator("")(CustomAccumulator)
val rdd = sc.makeRDD(Array("TIME_PERIOD_1s_3s", "TIME_PERIOD_4s_6s", "STEP_PERIOD_1_3", "STEP_PERIOD_4_6", "SESSION_COUNT", "TIME_PERIOD_1s_3s", "TIME_PERIOD_4s_6s", "STEP_PERIOD_1_3", "STEP_PERIOD_4_6", "SESSION_COUNT"))
rdd.foreach(x=>{
accum.add(x)
})
println(accum.value)
sc.stop()
}
}
推荐阅读
- 人工智能|干货!人体姿态估计与运动预测
- Python专栏|数据分析的常规流程
- 读书笔记|《白话大数据和机器学习》学习笔记1
- 网络|一文彻底搞懂前端监控
- html5|各行业工资单出炉 IT类连续多年霸占“榜首”位置
- 人工智能|【机器学习】深度盘点(详细介绍 Python 中的 7 种交叉验证方法!)
- 网络|简单聊聊压缩网络
- 数据库|效率最高的Excel数据导入---(c#调用SSIS Package将数据库数据导入到Excel文件中【附源代码下载】)...
- r语言|手把手(R语言文本挖掘和词云可视化实践)
- 腾讯|SaaS的收入模型有哪些(终于有人讲明白了)