Spark|Spark Structured Streaming java example
场景
实时数据存储在kafka,时间顺序不一定,计算需使用到其他静态资源(rest API或数据库中)
要求按天计算,计算时有时间顺序要求,每小时计算一次,结果输出到kafka
关键点
window
参考:spark window on event time
checkpointLocation
主要用于记录一些metadata,offset和算子计算的中间结果,用于故障恢复和重启
参考:spark-checkpointing
startingOffsets
初始读取kafka的偏移量,当checkpointLocation不存在时使用,或者当算子更新checkpointLocation失效时
参考:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
方案1:窗口计算
使用structured streaming 窗口计算,窗口长度24小时,步长24小时,设置watermark为48小时。
Dataset lines = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "_")
.option("subscribe", "topic")
.option("startingOffsets", "{\"topic\":{\"0\":_offset_}}")
.load();
Dataset
方案2 mapGroupsWithState状态流 【Spark|Spark Structured Streaming java example】窗口函数相当于spark决定了数据是否进入到计算(eventTime早于watermark则丢弃),是否过期(窗口时间早于watermark,意味着该窗口不会再触发更新),而mapGroupsWithState需要自己维护数据过期时间
参考:https://blog.csdn.net/bluishglc/article/details/80824522
使用SomeState自己存储和更新24小时的数据,并用于计算
Dataset lines = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "_")
.option("subscribe", topic)
.option("startingOffsets", ...)
.load();
MapPartitionsFunction dataMapFunc =
x ->
{
List msgs = new ArrayList<>();
while (x.hasNext())
{
...do something
}
return msgs.iterator();
};
Dataset dataset = lines.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.mapPartitions(dataMapFunc,
Encoders.bean(SomeData.class))
.filter(Objects::nonNull);
String delayThreshold = "24 hours";
MapGroupsWithStateFunction mapGroupsWithStateFunc =
(groupKey, dataIterator, groupState) ->
{
#spark只在每次触发计算才知到状态有没有过期
if (groupState.hasTimedOut())
{
groupState.remove();
return ...;
}SomeState state = groupState.exists() ? groupState.get() : new SomeState();
...
state.addData(dataIterator);
...
groupState.update(state);
groupState.setTimeoutTimestamp((new Date).getTime(), delayThreshold);
return new SomeOutPut();
};
Dataset d = dataset.withWatermark("timestamp", delayThreshold)
.groupByKey(SomeData::getKey, Encoders.LONG())
.mapGroupsWithState(
mapGroupsWithStateFunc,
Encoders.bean(SomeState.class),
Encoders.bean(SomeOutPut.class),
GroupStateTimeout.EventTimeTimeout());
# 过滤掉过期触发的无效结果
StreamingQuery query = d.filter(SomeOutPut::isValid)
.toJSON()
.writeStream()
.outputMode("update")
.option("checkpointLocation", "hdfs://host:port/checkpoints")
.format("kafka")
.option("kafka.bootstrap.servers", "_")
.option("topic", "topic")
.start();
query.awaitTermination();
推荐阅读
- Spark|Spark 数据倾斜及其解决方案
- Spark--java.util.NoSuchElementException:|Spark--java.util.NoSuchElementException: None.get at at
- spark|spark 资源动态释放
- SparkStreaming读Kafka-|SparkStreaming读Kafka- Couldn't find leaders for Set
- spark|spark UDAF根据某列去重求合 distinct sum
- 大数据|spark UDAF 自定义聚合函数 UserDefinedAggregateFunction 带条件的去重操作
- Python3实战Spark大数据分析及调度
- Gank Spark
- Spring|springboot项目(spark 2 操作es6+)
- sparkstreaming|sparkstreaming 源码 我们从 start() 开始说起