问题的提出
对于WaterMark设计的位置是否会影响窗口的正常开闭?
下面我模拟了两种情景(source并行度为1,map并行度为2),分别是
1.在source后设置watermark,经过map后开窗
2.在map后设置watermark,然后开窗
ps: 下面的两种代码我都设置了自然增长的watermark,窗口时间都是5秒,只是设置watermark的位置不同
watermark是testWM对象的ts字段*1000
代码一:在Source后添加WaterMark
public class WMTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.socketTextStream("localhost", 9999);
// TODO: 2021/12/1 在source后设置watermark
SingleOutputStreamOperator sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String[] split = element.split(",");
return Long.parseLong(split[2]) * 1000;
}
}));
// TODO: 2021/12/1 设置map并行度为2
SingleOutputStreamOperator mapDS = sourceWithWM.map(r -> {
String[] split = r.split(",");
return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
}).setParallelism(2);
SingleOutputStreamOperator resultDS = mapDS.keyBy(r -> r.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction() {
@Override
public void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception {
out.collect("我关窗啦");
}
});
resultDS.print();
env.execute();
}
}
@Data
@AllArgsConstructor
class testWM{
private int id;
private int num;
private long ts;
}
代码二:在Map后设置WaterMark
public class WMTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.socketTextStream("localhost", 9999);
// TODO: 2021/12/1 设置map并行度为2
SingleOutputStreamOperator mapDS = source.map(r -> {
String[] split = r.split(",");
return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
}).setParallelism(2);
// TODO: 2021/12/1在map后添加watermark
SingleOutputStreamOperator mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(testWM element, long recordTimestamp) {
return element.getTs() * 1000;
}
}));
SingleOutputStreamOperator resultDS = mapWithWM.keyBy(r -> r.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction() {
@Override
public void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception {
out.collect("我关窗啦");
}
});
resultDS.print();
env.execute();
}
【Flink多并行度下WaterMark的设计区别】}
@Data
@AllArgsConstructor
class testWM{
private int id;
private int num;
private long ts;
}
运行结果:
对于第一种,在source后添加watermark的结果如下:
文章图片
当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(忽略watermark的减1).窗口关闭,没有问题
对于第二种,在map后添加watermark的结果如下:
文章图片
可以很明显的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有关闭.直到第二条1,1,9进入时,窗口才被关闭,这是为什么?
我针对以上两种情况画了下图来理解.
文章图片
图一.图二描绘了source之后设置watermark的场景,一般来说,这是我们生产中需要的
WaterMark以广播的形式向下游发送,大数据培训并且如果同时接收上游多条并行度的WaterMark时,以小的为准
这就导致图三(Map后设置WaterMark)中,我需要发送两条足够[0,5)这个窗口关闭的数据,才能真正关闭窗口,因为数据要经过轮询才能到达每个并行度。
拓展:
在KafkaSource中,已经做了很好得优化,在生产中我们一般设置并行度与topic分区数相同
如果设置得并行度比topic分区数多,那必然有并行度消费不到数据,就会导致WaterMark一直保持在Long.min_value.
当这种WaterMark向下游广播之后,会导致所有正常并行度的窗口全部无法关闭,因为WaterMark取了各个并行度的最小值
但是当这种状态保持一段时间之后,程序会在计算WaterMark的时候,自动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.