Spark之---UpdateStateByKey算子操作
1.说明 【Spark之---UpdateStateByKey算子操作】SparkStreaming的一般是7天24小时不停息的运行,而在运行的时候,中间会有很多的状态,而有些状态我们需要一些操作,比如累计,更新或者其他的操作。那么如何将这些独立的状态联系起来就成了一种迫切的需求。
2.介绍 UpdateStateByKey的主要功能:
- 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。
- 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
注意 - 使用到updateStateByKey要开启checkpoint机制和功能。
- 多久会将内存中的数据写入到磁盘一份?
- 如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份
- 这里是以单词统计为例
import org.apache.hadoop.mapred.lib.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object Streaming_demo {
def main(args: Array[String]): Unit = {
// 创建conf
val conf = new SparkConf().setMaster("local[2]").setAppName("streaming_demo")
// 创建SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
// 设置日志
ssc.sparkContext.setLogLevel("WARN")
// 设置检查点,checkpoint
ssc.checkpoint("/Users/ricky/Desktop/chekpoint")
// 使用Socket流产生数据
val lines = ssc.socketTextStream("localhost",9999)
// 切分数据
val words = lines.flatMap(_.split(" "))
// 遍历数据
val word = words.map(word=>{
(word,1)
});
// 使用UpdateStateByKey进行更新
val result = word.updateStateByKey((seq:Seq[Int],option:Option[Int])=>{
// 初始化一个变量
var value = https://www.it610.com/article/0;
// 该变量用于更新,加上上一个状态的值,这里隐含一个判断,如果有上一个状态就获取,如果没有就赋值为0
value += option.getOrElse(0)
// 遍历当前的序列,序列里面每一个元素都是当前批次的数据计算结果,累加上一次的计算结果
for(elem <- seq){
value +=elem
}
// 返回一个Option对象
Option(value)
})
// 累加,打印
val wordcount1 = result.reduceByKey(_ + _)
wordcount1.print()
// 启动SparkStreaming,设置阻塞
ssc.start()
ssc.awaitTermination()
}
}
推荐阅读
- PMSJ寻平面设计师之现代(Hyundai)
- 太平之莲
- 闲杂“细雨”
- 七年之痒之后
- 深入理解Go之generate
- 由浅入深理解AOP
- 期刊|期刊 | 国内核心期刊之(北大核心)
- 生活随笔|好天气下的意外之喜
- 感恩之旅第75天
- python学习之|python学习之 实现QQ自动发送消息