Spark|Spark Structured Streaming java example

场景 实时数据存储在kafka,时间顺序不一定,计算需使用到其他静态资源(rest API或数据库中)
要求按天计算,计算时有时间顺序要求,每小时计算一次,结果输出到kafka
关键点 window 参考:spark window on event time
checkpointLocation 主要用于记录一些metadata,offset和算子计算的中间结果,用于故障恢复和重启
参考:spark-checkpointing
startingOffsets 初始读取kafka的偏移量,当checkpointLocation不存在时使用,或者当算子更新checkpointLocation失效时
参考:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
方案1:窗口计算 使用structured streaming 窗口计算,窗口长度24小时,步长24小时,设置watermark为48小时。

Dataset lines = sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers", "_") .option("subscribe", "topic") .option("startingOffsets", "{\"topic\":{\"0\":_offset_}}") .load(); Dataset dataset = lines.selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()) .mapPartitions( (MapPartitionsFunction) x -> { List objs = new ArrayList<>(); ... do some thing ... return objs.iterator(); }, Encoders.bean(Objects.class)) .filter(Objects::nonNull); Dataset dataset2 = dataset.withWatermark("timestamp", "48 hours") .groupBy(functions.col("groupId"), functions.window(functions.col("timestamp"), "24 hours")) .agg(functions.collect_list("data").as("data")); Dataset dataset3 = dataset2.map( (MapFunction) x -> { for (Object obj : x.getList(2)) { // collect_list 得到的不是结构化的数据,取值比较麻烦,没找到合适的方法 GenericRowWithSchema schema = (GenericRowWithSchema) obj; int fieldIndex = schema.fieldIndex("fieldName"); long fieldValue = https://www.it610.com/article/schema.getLong(fieldIndex); ... } return ... }, Encoders.bean(Object.class)); # 输出到kafka需要配置checkpointLocation,集群建议用hdfs StreamingQuery query = dateset3.toJSON() .writeStream() .outputMode("update") .option("checkpointLocation", "hdfs://host:port/checkpoints") .format("kafka") .option("kafka.bootstrap.servers", "_") .option("topic", "topic") .start(); query.awaitTermination();
方案2 mapGroupsWithState状态流 【Spark|Spark Structured Streaming java example】窗口函数相当于spark决定了数据是否进入到计算(eventTime早于watermark则丢弃),是否过期(窗口时间早于watermark,意味着该窗口不会再触发更新),而mapGroupsWithState需要自己维护数据过期时间
参考:https://blog.csdn.net/bluishglc/article/details/80824522
使用SomeState自己存储和更新24小时的数据,并用于计算
Dataset lines = sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers", "_") .option("subscribe", topic) .option("startingOffsets", ...) .load(); MapPartitionsFunction dataMapFunc = x -> { List msgs = new ArrayList<>(); while (x.hasNext()) { ...do something } return msgs.iterator(); }; Dataset dataset = lines.selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()) .mapPartitions(dataMapFunc, Encoders.bean(SomeData.class)) .filter(Objects::nonNull); String delayThreshold = "24 hours"; MapGroupsWithStateFunction mapGroupsWithStateFunc = (groupKey, dataIterator, groupState) -> { #spark只在每次触发计算才知到状态有没有过期 if (groupState.hasTimedOut()) { groupState.remove(); return ...; }SomeState state = groupState.exists() ? groupState.get() : new SomeState(); ... state.addData(dataIterator); ... groupState.update(state); groupState.setTimeoutTimestamp((new Date).getTime(), delayThreshold); return new SomeOutPut(); }; Dataset d = dataset.withWatermark("timestamp", delayThreshold) .groupByKey(SomeData::getKey, Encoders.LONG()) .mapGroupsWithState( mapGroupsWithStateFunc, Encoders.bean(SomeState.class), Encoders.bean(SomeOutPut.class), GroupStateTimeout.EventTimeTimeout()); # 过滤掉过期触发的无效结果 StreamingQuery query = d.filter(SomeOutPut::isValid) .toJSON() .writeStream() .outputMode("update") .option("checkpointLocation", "hdfs://host:port/checkpoints") .format("kafka") .option("kafka.bootstrap.servers", "_") .option("topic", "topic") .start(); query.awaitTermination();

    推荐阅读