SparkStreaming|SparkStreaming(17)(updateStateByKey算子,保留上一次计算结果)

1.实现功能 如果SparkStreaming程序断掉,重新启动,可以读取断掉之前的结果。通过,使用SparkStreaming的HA:checkpoints。
【参考:kafka(十四):SparkStreaming和Kafka接口的HA:checkpoints】
2.代码

package _0809kafka//import com.beifeng.util.SparkUtil import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/** * * 之前做的计算当中,当前批次的计算值不会累加到下一个批次 * * 当前批次的值计算完之后,存到外部存储系统中 * 下一个批次计算完值之后,在取出上一个批次计算出来的值, * 做相加,更新会原位置上 * * checkpoint会保留上一个程序的ssc的状态和UpdateStateByKey的结果 * 但是构造ssc的时候,必须按照规矩写,否则就读不到UpdateStateByKey上一次的结果 */ object UpdateStateByKeyAPI_1020HA { def main(args: Array[String]) { //使用checkpoint来存储批次的数据 //1、创建sparkConf val sparkConf: SparkConf = new SparkConf() .setAppName("UpdateStateByKeyAPI") .setMaster("local[2]") //2、创建sparkContext val sc = new SparkContext(sparkConf)//val path = s"file:///E:\\workspace\\SparkPro\\checkpoint\\streaming_05" val path = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_07"def creatingFunc():StreamingContext ={ val ssc = new StreamingContext(sc,Seconds(10)) ssc.checkpoint(path) val socketDStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata.ibeifeng.com",9999)//api updateStateByKey val resultDStream: DStream[(String, Long)] = socketDStream.mapPartitions(iter =>{ //对于当前批次的值做数据转换 iter.flatMap(_.split(" ")) .filter(_.nonEmpty) .map(word => (word,1)) }) //对于当前批次的值,做累加(aggr聚合)操作 .reduceByKey(_ + _) //对于value的操作,相同key怎么处理对应的value .updateStateByKey((seq: Seq[Int],state: Option[Long])=>{ //当前批次的相同key的value的聚合值 val sum = seq.sum val preState= state.getOrElse(0L) /** * if(sum + preState > 1000){ * Some(sum + preState) * }else{ * //清空当前key的value值 * None * } */ Some(sum + preState) })resultDStream.foreachRDD((rdd,time) =>{ println(s"----------------当前时间为:${time}----------------") //比如说:某些key不打印,某些值过于小也可以不打印,或者打印排序后的前5 rdd.filter(t =>{ t._2 > 100 }).foreach(println) }) ssc }val ssc = StreamingContext.getActiveOrCreate(path,creatingFunc)ssc.start() ssc.awaitTermination()} }

3.测试 (1)打开nc
nc -lt 9999
(2)运行程序
【SparkStreaming|SparkStreaming(17)(updateStateByKey算子,保留上一次计算结果)】(3)结果:
----------------当前时间为:1540004570000 ms---------------- (hadoop,212) (ccs,159) ----------------当前时间为:1540004580000 ms---------------- [Stage 9:=================================================>(5 + 1) / 6] (hadoop,360) (ccs,270)

(测试成功~)

    推荐阅读