Spark|Spark Streaming状态操作: updateStateByKey、mapWithState、基于window的状态操作

在Spark Streaming中,DStream的转换分为有状态和无状态两种。无状态的操作,即当前批次的处理不依赖于先前批次的数据,如map()、flatMap()、filter()、reduceByKey()、groupByKey()等等; 而有状态的操作,即当前批次的处理需要依赖先前批次的数据,这样的话,就需要跨批次维护状态。
总结spark streaming中的状态操作:updateStateByKey、mapWithState、和基于window的状态操作。
updateStateByKey

//状态更新函数 val updateFunc=(currentValues:Seq[Int], previousState:Option[Int])=>{ //该批次结果 val currentCount = currentValues.sum //先前状态 val previousCount= previousState.getOrElse(0) //如何更新 Some(currentCount+previousCount) } //updateStateByKey val result=kafkaDirectStream .map(_.value()) //json string 转换成 case class .map(parse(_).extractOrElse[TwitterFeed](null)) .filter(_!=null) .map(item=>(item.language,1)) .updateStateByKey(updateFunc)

注意:
  1. 需要开启checkpoint,并设置checkpoint目录。用于存放元数据(任务元数据、kafka offset等)、数据(如Key的State)。
  2. 应通过StreamingContext.getOrCreate(checkpointPath、creatingFunc)来获取StreamingContext,业务逻辑都应封装在creatingFunc函数中。
mapWithState mapWithState从spark 1.6.0开始出现,可以看做是updateStateByKey的升级版,有一些updateStateByKey所没有的特征:
  1. 支持输出只发生更新的状态和全量状态
mapWithState默认每个批次只会返回当前批次中有新数据的Key的状态,也可返回全量状态。updateStateByKey每个批次都会返回所有状态。
  1. 内置状态超时管理
内置状态超时管理,可对超时的Key单独处理。也可实现如会话超时(Session Timeout)的功能。在updateStateByKey中,如果要实现类似功能,需要大量编码。
  1. 初始化状态
可以选择自定义的RDD来初始化状态。
  1. 可以返回任何我们期望的类型
mapWithState函数可知,可以返回任何我们期望的类型,而updateStateByKey做不到这一点。
  1. 性能更高
实现上,mapWithState只是增量更新,updateStateByKey每个批次都会对历史全量状态和当前增量数据进行cogroup合并,状态较大时,性能较低。
//状态更新函数 val mappingFunction = (currentKey: String, currentValue: Option[Int], previousState: State[Int]) => { //超时时处理 if(previousState.isTimingOut()){ println("Key:"+currentKey+" is timing out!") None }else{ //没有超时时处理 val currentCount =currentValue.getOrElse(0)+previousState.getOption().getOrElse(0) previousState.update(currentCount) (currentKey,currentCount) } } //mapWithState val result=kafkaDirectStream .map(_.value()) //json string 转换成 case class .map(parse(_).extractOrElse[TwitterFeed](null)) .filter(_!=null) .map(item=>(item.language,1)) .reduceByKey(_+_) .mapWithState( StateSpec //更新函数 .function(mappingFunction) //初始状态 .initialState(initialStateRDD) //超时时间 .timeout(Seconds(10)) ) //输出全量状态 //result.stateSnapshots().foreachRDD(_.foreach(println)) //只输出有更新的状态 result.foreachRDD(_.foreach(println))

注意:
  1. 需要开启checkpoint,并设置checkpoint目录。用于存放元数据(任务元数据、kafka offset等)、数据(如Key的State)。
  2. 应通过StreamingContext.getOrCreate(checkpointPath、creatingFunc)来获取StreamingContext,业务逻辑都应封装在creatingFunc函数中。
基于窗口的状态转换 reduceByKeyAndWindow 基于滑动窗口,对(K,V)类型的DStream按Key进行Reduce操作,返回新的DStream。
  1. 普通版的reduceByKeyAndWindow
def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration): DStream[(K, V)]

  1. 增量版的reduceByKeyAndWindow
def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int = ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]

对于普通版的reduceByKeyAndWindow,每个滑动间隔都对窗口内的数据做聚合,性能低效。
对于一个滑动窗口,每一个新窗口实际上是在之前窗口中减去一些数据,再加上一些新的数据,从而构成新的窗口。增量版的reduceByKeyAndWindow便采用这种逻辑,通过加新减旧实现增量窗口聚合。reduceFunc,即正向的reduce 函数(如_+_);invReduceFunc,即反向的reduce 函数(如 _-_)。
注意:加新减旧版的reduceByKeyAndWindow需要checkpoint支持。
val result: DStream[(String, Int)] =kafkaDirectStream .map(_.value()) //json string 转换成 case class .map(parse(_).extractOrElse[TwitterFeed](null)) .filter(_!=null) .map(item=>(item.language,1)) .reduceByKeyAndWindow( (v1:Int,v2:Int)=>{v1+v2},//加新 (v1:Int,v2:Int)=>{v1-v2},//减旧 Seconds(60),//窗口间隔:必须是batchDuration整数倍 Seconds(10)//滑动间隔:必须是batchDuration整数倍 )

reduceByWindow 基于滑动窗口,对非(K,V)类型的DStream进行Reduce操作,返回新的DStream。
  1. 普通版的reduceByWindow
def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) }

  1. 增量版的reduceByWindow
def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.map((1, _)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) }

实现上调用了加新减旧版的reduceByKeyAndWindow。需要checkpoint支持。
countByValueAndWindow 统计窗口内每类元素个数。
def countByValueAndWindow( windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering[T] = null) : DStream[(T, Long)] = ssc.withScope { this.map((_, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, windowDuration, slideDuration, numPartitions, (x: (T, Long)) => x._2 != 0L )

countByValueAndWindow先将窗口内的每个元素转换成(_,1L),然后调用加新减旧版的reduceByKeyAndWindow进行计算。需要checkpoint支持。
countByWindow 统计窗口内所有元素个数。
def countByWindow( windowDuration: Duration, slideDuration: Duration): DStream[Long] = ssc.withScope { this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) }

【Spark|Spark Streaming状态操作: updateStateByKey、mapWithState、基于window的状态操作】countByWindow先将窗口内的每个元素转换成1L,然后调用加新减旧版的reduceByWindow进行计算。需要checkpoint支持。

    推荐阅读