spark|Spark 累加器

累加器

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后, 传回 Driver端进行merge

案例操作1
下面来做一个求和的案例
import org.apache.spark.{SparkConf, SparkContext}object Acc1 {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List(1,2,3,4))// reduce : 分区内计算,分区间计算 //val i: Int = rdd.reduce(_+_) //println(i) var sum = 0 rdd.foreach( num => { sum += num } ) println("sum = " + sum)sc.stop()}}


运行上面的代码,通过控制台输出结果,发现并没有达到累加求和的效果
spark|Spark 累加器
文章图片

这就要使用Spark 提供的累加器

案例操作2
使用累加器求和
import org.apache.spark.{SparkConf, SparkContext}object Acc2 {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(1,2,3,4)) // 获取系统累加器 // Spark默认就提供了简单数据聚合的累加器 val sumAcc = sc.longAccumulator("sum") rdd.foreach( num => { // 使用累加器 sumAcc.add(num) } ) // 获取累加器的值 println(sumAcc.value) sc.stop() }}

【spark|Spark 累加器】
运行上面的代码,观察控制台输出效果,这就达到了累加求和的目的
spark|Spark 累加器
文章图片

自定义累加器
在某些业务场景下,直接使用spark内置的累加器不够灵活,不能完全满足一些个性化的业务操作,就需要使用到自定义累加器

  • 1.继承AccumulatorV2 ,并设定泛型 ;
  • 2、重写累加器的抽象方法
案例操作3
import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount // 创建累加器对象 val wcAcc = new MyAccumulator() // 向Spark进行注册 sc.register(wcAcc, "wordCountAcc")rdd.foreach( word => { // 数据的累加(使用累加器) wcAcc.add(word) } ) // 获取累加器累加的结果 println(wcAcc.value) sc.stop()}/* 自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型 IN : 累加器输入的数据类型 String OUT : 累加器返回的数据类型 mutable.Map[String, Long] 2. 重写方法(6) */ class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]()// 判断是否初始状态 override def isZero: Boolean = { wcMap.isEmpty }override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new MyAccumulator() }override def reset(): Unit = { wcMap.clear() }// 获取累加器需要计算的值 override def add(word: String): Unit = { val newCnt = wcMap.getOrElse(word, 0L) + 1 wcMap.update(word, newCnt) }// Driver合并多个累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMap val map2 = other.valuemap2.foreach{ case ( word, count ) => { val newCount = map1.getOrElse(word, 0L) + count map1.update(word, newCount) } } }// 累加器结果 override def value: mutable.Map[String, Long] = { wcMap } }}

运行上面的代码,观察控制台输出效果
spark|Spark 累加器
文章图片















    推荐阅读