Flink多并行度下WaterMark的设计区别

问题的提出
对于WaterMark设计的位置是否会影响窗口的正常开闭?
下面我模拟了两种情景(source并行度为1,map并行度为2),分别是
1.在source后设置watermark,经过map后开窗
2.在map后设置watermark,然后开窗
ps: 下面的两种代码我都设置了自然增长的watermark,窗口时间都是5秒,只是设置watermark的位置不同

watermark是testWM对象的ts字段*1000

代码一:在Source后添加WaterMark
public class WMTest {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.socketTextStream("localhost", 9999); // TODO: 2021/12/1 在source后设置watermark SingleOutputStreamOperator sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy .forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(String element, long recordTimestamp) { String[] split = element.split(","); return Long.parseLong(split[2]) * 1000; } })); // TODO: 2021/12/1 设置map并行度为2 SingleOutputStreamOperator mapDS = sourceWithWM.map(r -> { String[] split = r.split(","); return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2])); }).setParallelism(2); SingleOutputStreamOperator resultDS = mapDS.keyBy(r -> r.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction() { @Override public void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception { out.collect("我关窗啦"); } }); resultDS.print(); env.execute(); }

}
@Data
@AllArgsConstructor
class testWM{
private int id; private int num; private long ts;

}
代码二:在Map后设置WaterMark
public class WMTest {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.socketTextStream("localhost", 9999); // TODO: 2021/12/1 设置map并行度为2 SingleOutputStreamOperator mapDS = source.map(r -> { String[] split = r.split(","); return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2])); }).setParallelism(2); // TODO: 2021/12/1在map后添加watermark SingleOutputStreamOperator mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy .forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(testWM element, long recordTimestamp) { return element.getTs() * 1000; } })); SingleOutputStreamOperator resultDS = mapWithWM.keyBy(r -> r.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction() { @Override public void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception { out.collect("我关窗啦"); } }); resultDS.print(); env.execute(); }

【Flink多并行度下WaterMark的设计区别】}
@Data
@AllArgsConstructor
class testWM{
private int id; private int num; private long ts;

}
运行结果:
对于第一种,在source后添加watermark的结果如下:
Flink多并行度下WaterMark的设计区别
文章图片

当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(忽略watermark的减1).窗口关闭,没有问题
对于第二种,在map后添加watermark的结果如下:
Flink多并行度下WaterMark的设计区别
文章图片

可以很明显的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有关闭.直到第二条1,1,9进入时,窗口才被关闭,这是为什么?
我针对以上两种情况画了下图来理解.
Flink多并行度下WaterMark的设计区别
文章图片

图一.图二描绘了source之后设置watermark的场景,一般来说,这是我们生产中需要的
WaterMark以广播的形式向下游发送,大数据培训并且如果同时接收上游多条并行度的WaterMark时,以小的为准
这就导致图三(Map后设置WaterMark)中,我需要发送两条足够[0,5)这个窗口关闭的数据,才能真正关闭窗口,因为数据要经过轮询才能到达每个并行度。
拓展:
在KafkaSource中,已经做了很好得优化,在生产中我们一般设置并行度与topic分区数相同
如果设置得并行度比topic分区数多,那必然有并行度消费不到数据,就会导致WaterMark一直保持在Long.min_value.
当这种WaterMark向下游广播之后,会导致所有正常并行度的窗口全部无法关闭,因为WaterMark取了各个并行度的最小值
但是当这种状态保持一段时间之后,程序会在计算WaterMark的时候,自动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.

    推荐阅读