累加器
累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后, 传回 Driver端进行merge
案例操作1
下面来做一个求和的案例
import org.apache.spark.{SparkConf, SparkContext}object Acc1 {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List(1,2,3,4))// reduce : 分区内计算,分区间计算
//val i: Int = rdd.reduce(_+_)
//println(i)
var sum = 0
rdd.foreach(
num => {
sum += num
}
)
println("sum = " + sum)sc.stop()}}
运行上面的代码,通过控制台输出结果,发现并没有达到累加求和的效果
文章图片
这就要使用Spark 提供的累加器
案例操作2
使用累加器求和
import org.apache.spark.{SparkConf, SparkContext}object Acc2 {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
rdd.foreach(
num => {
// 使用累加器
sumAcc.add(num)
}
)
// 获取累加器的值
println(sumAcc.value)
sc.stop()
}}
【spark|Spark 累加器】
运行上面的代码,观察控制台输出效果,这就达到了累加求和的目的
文章图片
自定义累加器
在某些业务场景下,直接使用spark内置的累加器不够灵活,不能完全满足一些个性化的业务操作,就需要使用到自定义累加器
- 1.继承AccumulatorV2 ,并设定泛型 ;
- 2、重写累加器的抽象方法
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount
// 创建累加器对象
val wcAcc = new MyAccumulator()
// 向Spark进行注册
sc.register(wcAcc, "wordCountAcc")rdd.foreach(
word => {
// 数据的累加(使用累加器)
wcAcc.add(word)
}
)
// 获取累加器累加的结果
println(wcAcc.value)
sc.stop()}/*
自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型
IN : 累加器输入的数据类型 String
OUT : 累加器返回的数据类型 mutable.Map[String, Long]
2. 重写方法(6)
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]()// 判断是否初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}override def reset(): Unit = {
wcMap.clear()
}// 获取累加器需要计算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word, 0L) + 1
wcMap.update(word, newCnt)
}// Driver合并多个累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMap
val map2 = other.valuemap2.foreach{
case ( word, count ) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}// 累加器结果
override def value: mutable.Map[String, Long] = {
wcMap
}
}}
运行上面的代码,观察控制台输出效果
文章图片
推荐阅读
- big|集群计算——Spark-Spark Core 、Spark Streaming、Spark SQL、MLlib、Spark集群管理器
- SparkStreaming-----SparkStreaming教程
- #|Spark Streaming与流处理
- 消息队列|Kafka集成SparkStreaming
- 机器学习|Spark ALS 协同过滤算法实践
- 大数据|大数据实战电商推荐系统(3)-基于隐语义模型的离线推荐模块
- 中间件|Spark 详解
- 电商用户行为分析|1.Spark大型电商项目-电商用户行为分析简介
- hadoop|dolphinscheduler涉及HDFS功能测试(三)spark task