spark-streaming 编程(五)updateStateByKey
【spark-streaming 编程(五)updateStateByKey】updateStateByKey(func)
从名字上来看,该函数会更新根据key聚合,并不断更新value值
要想使用该函数,Dstream之前的类型必须是K,V形式的二元组。
经过执行func函数后,会返回一个key的所有的聚合值得状态。
以word count为例,对于每一个批的数据进行分解聚合,会得到当前的这个批的状态,经过聚合后得到值的,假设有(word1,10),(word2,15),(word3,1);
而对于之前的累积值,(word1,100),(word2,10),(word3,15),(word4,10)
则下一步的状态值为(word1,100+10),(word2,10+15),(word3,1+15),(word4,0+10)
等待下一个批次的数据到了,继续按照这个逻辑进行处理。
代码示例:
package com.lgh.sparkstreamingimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils/**
* Created by lgh on 2017/8/24.
*/
object UpdateStateByKey {
def main(args: Array[String]): Unit = {
val brokers = "mtime-bigdata00:9092,mtime-bigdata01:9092";
val topics = "testkafka";
val batchseconds = "10";
val checkpointDirectory = "./upbykey";
val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createcontext(brokers, topics, batchseconds, checkpointDirectory))
ssc.start()
ssc.awaitTermination()
}def createcontext(brokers: String, topics: String, batchseconds: String, checkpointDirectory: String): StreamingContext = {
val sparkconf = new SparkConf().setAppName("TestUpStateByKey").setMaster("local[3]")
val ssc = new StreamingContext(sparkconf, Seconds(batchseconds.toInt))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers);
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)val lines: DStream[String] = messages.map(_._2)val message: DStream[(String, Long)] = lines.flatMap(_.split(" ")).map(x => (x, 1L)).reduceByKey(_+_)val keyvalue=https://www.it610.com/article/message.updateStateByKey[Long](addFunction _)
keyvalue.print(100)ssc.checkpoint(checkpointDirectory)
ssc
}//curValues:对于每一个key的所有value值得集合Seq
//preVauleState: 之前的每一个key的值对应的value值;这这个例子中是单词计数的之前的累计值
//
def addFunction(currValues: Seq[Long], preVauleState: Option[Long]): Option[Long] = {
//计算得到当前key在这个Dstream中的值
val currentSum:Long = currValues.sum
//得到当前key以前的累积值
val previousSum:Long = preVauleState.getOrElse(0)
//计算出当前key对应的新的值
val nowvalue:Long=currentSum + previousSum;
//返回结果值;在scala中,Some和None是Option的子类
Some(nowvalue)
}}
推荐阅读
- android第三方框架(五)ButterKnife
- 野营记-第五章|野营记-第五章 讨伐梦魇兽
- 【故障公告】周五下午的一次突发故障
- 华为旁!大社区、地铁新盘,佳兆业城市广场五期!
- 五年后,我要成为独立自强自信的女性
- 二十年后的家乡
- 《格列佛游记》第二卷第五章概括
- 六月更新的......
- 2021-05-05五一的五天假期结束了
- 阿菘的ScalersTalk第五轮新概念朗读持续力训练Day15|阿菘的ScalersTalk第五轮新概念朗读持续力训练Day15 20191025