SparkStreaming|SparkStreaming(17)(updateStateByKey算子,保留上一次计算结果)
1.实现功能 如果SparkStreaming程序断掉,重新启动,可以读取断掉之前的结果。通过,使用SparkStreaming的HA:checkpoints。
【参考:kafka(十四):SparkStreaming和Kafka接口的HA:checkpoints】
2.代码
package _0809kafka//import com.beifeng.util.SparkUtil
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/**
*
* 之前做的计算当中,当前批次的计算值不会累加到下一个批次
*
* 当前批次的值计算完之后,存到外部存储系统中
* 下一个批次计算完值之后,在取出上一个批次计算出来的值,
* 做相加,更新会原位置上
*
* checkpoint会保留上一个程序的ssc的状态和UpdateStateByKey的结果
* 但是构造ssc的时候,必须按照规矩写,否则就读不到UpdateStateByKey上一次的结果
*/
object UpdateStateByKeyAPI_1020HA {
def main(args: Array[String]) {
//使用checkpoint来存储批次的数据
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("UpdateStateByKeyAPI")
.setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)//val path = s"file:///E:\\workspace\\SparkPro\\checkpoint\\streaming_05"
val path = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_07"def creatingFunc():StreamingContext ={
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint(path)
val socketDStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata.ibeifeng.com",9999)//api updateStateByKey
val resultDStream: DStream[(String, Long)] = socketDStream.mapPartitions(iter =>{
//对于当前批次的值做数据转换
iter.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map(word => (word,1))
})
//对于当前批次的值,做累加(aggr聚合)操作
.reduceByKey(_ + _)
//对于value的操作,相同key怎么处理对应的value
.updateStateByKey((seq: Seq[Int],state: Option[Long])=>{
//当前批次的相同key的value的聚合值
val sum = seq.sum
val preState= state.getOrElse(0L)
/**
* if(sum + preState > 1000){
* Some(sum + preState)
* }else{
* //清空当前key的value值
* None
* }
*/
Some(sum + preState)
})resultDStream.foreachRDD((rdd,time) =>{
println(s"----------------当前时间为:${time}----------------")
//比如说:某些key不打印,某些值过于小也可以不打印,或者打印排序后的前5
rdd.filter(t =>{
t._2 > 100
}).foreach(println)
})
ssc
}val ssc = StreamingContext.getActiveOrCreate(path,creatingFunc)ssc.start()
ssc.awaitTermination()}
}
3.测试 (1)打开nc
nc -lt 9999
(2)运行程序
【SparkStreaming|SparkStreaming(17)(updateStateByKey算子,保留上一次计算结果)】(3)结果:
----------------当前时间为:1540004570000 ms----------------
(hadoop,212)
(ccs,159)
----------------当前时间为:1540004580000 ms----------------
[Stage 9:=================================================>(5 + 1) / 6]
(hadoop,360)
(ccs,270)
(测试成功~)
推荐阅读
- SparkStreaming读Kafka-|SparkStreaming读Kafka- Couldn't find leaders for Set
- sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起
- spark-streaming 编程(五)updateStateByKey
- Spark之---UpdateStateByKey算子操作
- Spark|Spark Streaming状态操作: updateStateByKey、mapWithState、基于window的状态操作
- 解析SparkStreaming和Kafka集成的两种方式
- SparkStreaming 状态管理函数比较
- SparkStreaming并行度的计算方式和设置(spark官方文档介绍)
- SparkStreaming|SparkStreaming On Kafka —— Offset 管理