Spark之---UpdateStateByKey算子操作

1.说明 【Spark之---UpdateStateByKey算子操作】SparkStreaming的一般是7天24小时不停息的运行,而在运行的时候,中间会有很多的状态,而有些状态我们需要一些操作,比如累计,更新或者其他的操作。那么如何将这些独立的状态联系起来就成了一种迫切的需求。
2.介绍 UpdateStateByKey的主要功能:

  • 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。
  • 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
    注意
  • 使用到updateStateByKey要开启checkpoint机制和功能。
  • 多久会将内存中的数据写入到磁盘一份?
  • 如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份
3.代码说明
  • 这里是以单词统计为例
import org.apache.hadoop.mapred.lib.HashPartitioner import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}object Streaming_demo { def main(args: Array[String]): Unit = { // 创建conf val conf = new SparkConf().setMaster("local[2]").setAppName("streaming_demo") // 创建SparkStreamingContext val ssc = new StreamingContext(conf, Seconds(3)) // 设置日志 ssc.sparkContext.setLogLevel("WARN") // 设置检查点,checkpoint ssc.checkpoint("/Users/ricky/Desktop/chekpoint") // 使用Socket流产生数据 val lines = ssc.socketTextStream("localhost",9999) // 切分数据 val words = lines.flatMap(_.split(" ")) // 遍历数据 val word = words.map(word=>{ (word,1) }); // 使用UpdateStateByKey进行更新 val result = word.updateStateByKey((seq:Seq[Int],option:Option[Int])=>{ // 初始化一个变量 var value = https://www.it610.com/article/0; // 该变量用于更新,加上上一个状态的值,这里隐含一个判断,如果有上一个状态就获取,如果没有就赋值为0 value += option.getOrElse(0) // 遍历当前的序列,序列里面每一个元素都是当前批次的数据计算结果,累加上一次的计算结果 for(elem <- seq){ value +=elem } // 返回一个Option对象 Option(value) }) // 累加,打印 val wordcount1 = result.reduceByKey(_ + _) wordcount1.print() // 启动SparkStreaming,设置阻塞 ssc.start() ssc.awaitTermination() } }

    推荐阅读