spark-streaming 编程(五)updateStateByKey

【spark-streaming 编程(五)updateStateByKey】updateStateByKey(func)
从名字上来看,该函数会更新根据key聚合,并不断更新value值
要想使用该函数,Dstream之前的类型必须是K,V形式的二元组。
经过执行func函数后,会返回一个key的所有的聚合值得状态。
以word count为例,对于每一个批的数据进行分解聚合,会得到当前的这个批的状态,经过聚合后得到值的,假设有(word1,10),(word2,15),(word3,1);
而对于之前的累积值,(word1,100),(word2,10),(word3,15),(word4,10)
则下一步的状态值为(word1,100+10),(word2,10+15),(word3,1+15),(word4,0+10)
等待下一个批次的数据到了,继续按照这个逻辑进行处理。
代码示例:

package com.lgh.sparkstreamingimport kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils/** * Created by lgh on 2017/8/24. */ object UpdateStateByKey { def main(args: Array[String]): Unit = { val brokers = "mtime-bigdata00:9092,mtime-bigdata01:9092"; val topics = "testkafka"; val batchseconds = "10"; val checkpointDirectory = "./upbykey"; val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createcontext(brokers, topics, batchseconds, checkpointDirectory)) ssc.start() ssc.awaitTermination() }def createcontext(brokers: String, topics: String, batchseconds: String, checkpointDirectory: String): StreamingContext = { val sparkconf = new SparkConf().setAppName("TestUpStateByKey").setMaster("local[3]") val ssc = new StreamingContext(sparkconf, Seconds(batchseconds.toInt)) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers); val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)val lines: DStream[String] = messages.map(_._2)val message: DStream[(String, Long)] = lines.flatMap(_.split(" ")).map(x => (x, 1L)).reduceByKey(_+_)val keyvalue=https://www.it610.com/article/message.updateStateByKey[Long](addFunction _) keyvalue.print(100)ssc.checkpoint(checkpointDirectory) ssc }//curValues:对于每一个key的所有value值得集合Seq //preVauleState: 之前的每一个key的值对应的value值;这这个例子中是单词计数的之前的累计值 // def addFunction(currValues: Seq[Long], preVauleState: Option[Long]): Option[Long] = { //计算得到当前key在这个Dstream中的值 val currentSum:Long = currValues.sum //得到当前key以前的累积值 val previousSum:Long = preVauleState.getOrElse(0) //计算出当前key对应的新的值 val nowvalue:Long=currentSum + previousSum; //返回结果值;在scala中,Some和None是Option的子类 Some(nowvalue) }}

    推荐阅读