1、概述
flink中比较重要的是时间和状态,学习flink的过程中对水位线的理解一直模糊,经过一段时间的消化,在此总结总结。本文主要把水位线是什么,怎么来的,有什么用描述清楚。
2、不太好理解的水位线
有些人喜欢把水位线叫成水印,不管是水印还是水位线,中文翻译过来一点都不贴切我们的生活,比较抽象,让人难得理解。在我们生活中水位线类似家中挂在墙上的一个挂钟,类似我们的手表。下面来聊聊如下的话题:
1,到底是如何产生。
2,既然是一个挂钟,钟表有哪些特点呢,钟表每隔1s秒针往前走一小步,时间是不是越来越大,这些特点水位线是不是也有呢。
3,挂钟有什么用处啊?晚上看看手表发现12点,我们肯定自我暗示:"应该睡觉了",通过时间让我们知道什么时间该干什么事情。
3、什么叫水位线
3.1、水位线的定义
水位线就是一个逻辑时钟,为什么叫逻辑时钟?正常时间是有cpu产生的,周期而固定的往前走,但是我们这个时钟的时间是程序员计算出来,根据"事件时间"动态计算出来(至于什么是时间事件,有什么使用场景这里就不讲了),如某一时刻计算的结果为x,x值为2022-10-10 10:10:10对应的时间戳为1665367810000,x的值随着事件时间的变大而变大,可能的结果为x,x+1,x+2,x+3,x+4 ... 连续的越来越大的时间戳是不是类似钟表每隔1s往前走一步呢。
3.2、水位线(逻辑时钟)的组成
水位线由一串连续的时间戳组成,越来越大,每个时间戳都是根据事件时间动态计算出来的。时钟也是由一连续的时间组成,也是越来越大,如2022-10-10 10:10:10,2022-10-10 10:10:11,2022-10-10 10:10:12,2022-10-10 10:10:13 。。。等,水位线就是类似生活中的时钟,所以我把这个水位线称为逻辑时钟,逻辑时钟就是水位线,水印机制。
3.3、逻辑时钟当前时间
类似时钟的当前时间,此处此刻为几点几分几秒,这个当前时间比较重要,窗口的闭合,定时任务的触发都是根据当前时间来判断的。
当前值特点:越来越大,流刚刚产生的时候插入负无穷大值,结束是插入正无穷大的值。
个人觉得这个当前值类似一个指针类型的变量,他的指向是不停的变化的(个人理解)。
3.4、当前时间的计算公式
时钟的"当前时间"对应一个具体的时间戳。时钟的当前值xxx = 事件时间 - 最大延迟时间 - 1毫秒。
3.5、来一个案例
案例描述:从socket读取数据,并打印当前水位的具体值。
?
package com.deepexi.sql;
?
?
?
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
?
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
?
import org.apache.flink.api.common.typeinfo.Types;
?
import org.apache.flink.api.java.tuple.Tuple2;
?
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
?
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
?
import org.apache.flink.util.Collector;
?
?
?
import java.time.Duration;
?
?
?
public class ExampleTest {
?
?
?
public static void main(String[] args) throws Exception {
?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
?
env.setParallelism(1);
?
env
?
//从socket读取数据
?
.socketTextStream("192.168.117.211", 9999)
?
.map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1])))
?
.returns(Types.TUPLE(Types.STRING, Types.LONG))
?
.assignTimestampsAndWatermarks(
?
//5s延迟时间
?
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
?
.withTimestampAssigner(new SerializableTimestampAssigner() {
?
@Override
?
public long extractTimestamp(Tuple2 element, long recordTimestamp) {
?
//提取事件时间
?
return element.f1;
?
}
?
})
?
)
?
//分流
?
.keyBy(r -> r.f0)
?
.process(new KeyedProcessFunction, String>() {
?
@Override
?
public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {
?
out.collect("当前的水位线是:" + ctx.timerService().currentWatermark());
?
}
?
})
?
.print();
?
?
?
env.execute();
?
}
?
?
?
}
nc -lk 9999 开启socket服务,监听9999端口
命令行输入:a 1000
[root@localhost ~]# nc -lk 9999 a 1000
idea控制台打印
当前的水位线是:-9223372036854775808 //-9223372036854775808是一个无穷大的数字
命令行输入:a 2000
idea控制台打印:
当前的水位线是:-4001 //当前水位线的值 = 事件时间 - 最大延迟时间 -1 = 1000 - 5000 -1 = -4000
为什么用1000- 5000 -1而用2000 - 5000 -1? flink会周期往流中插入水位线,水位线也是流中的一个元素,还是看下图理解吧。
命令行输入:a 3000
idea控制台打印:当前的水位线是:-3001 //2000 - 5000 -1 = -2000
命令行输入:a 10000
idea控制台打印:当前的水位线是:-2001 //3000 - 5000 -1 = -2000
命令行输入:a 1000
idea控制台打印:当前的水位线是:4999 //10000 - 5000 -1 = 4999
命令行输入:a 1000
idea控制台打印:当前的水位线是:4999 //10000 - 5000 -1 = 4999
命令行输入:a 2000
idea控制台打印:当前的水位线是:4999 //10000 - 5000 -1 = 4999
通过控制台的打印结果发现水位线的和钟表一样,值总是越来越大的,随着事件时间的变化而变化,但是不会变小,也可能会停止某一刻,如输入a 1000后在输入a 1000,a 2000水位线的值始终是4999。
整个打印过程
命令行窗口:
?
[root@master ~]# nc -lk 9999
?
a 1000
?
a 2000
?
a 3000
?
a 10000
?
a 1000
?
a 1000
?
a 2000
idea打印:
?
当前的水位线是:-9223372036854775808
?
当前的水位线是:-4001
?
当前的水位线是:-3001
?
当前的水位线是:-2001
?
当前的水位线是:4999
?
当前的水位线是:4999
?
当前的水位线是:4999
文章图片
4、如何产生的 【聊聊flink水位线】水位线本质就是一个时间戳,这个时间戳是程序员根据事件时间动态计算出来,直接来一个案例吧。
案例1
自定义水位线的产生逻辑,实现WatermarkStrategy接口,flink会每隔200毫秒的调用onPeriodicEmit方法。
public class ExampleTest2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置每隔1分钟插入一次水位线
//env.getConfig().setAutoWatermarkInterval(6 * 1000L);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(new CustomWatermarkGenerator())
.print();
env.execute();
}public static class CustomWatermarkGenerator implements WatermarkStrategy {
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Tuple2 element, long recordTimestamp) {
return element.f1;
}
};
}@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator() {
// 最大延迟时间
private Long bound = 5000L;
private Long maxTs = -Long.MAX_VALUE + bound + 1L;
@Override
public void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) {
//更新观察到的最大事件时间
maxTs = Math.max(maxTs, event.f1);
}@Override
public void onPeriodicEmit(WatermarkOutput output) {
System.out.println("水位线的值:" + (maxTs - bound - 1L));
// 发送水位线,计算公式:事件时间-延迟时间-1L
output.emitWatermark(new Watermark(maxTs - bound - 1L));
}
};
}
}
}
nc -lk 9999 开启socket服务,监听9999端口
启动idea,控制台每隔200毫秒打印结果:水位线的值:xxxxx。如下:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
命令行输入:a 1000
控制台每隔200毫秒打印结果:水位线的值:xxxxx。如下:
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
命令行输入:a 2000
控制台每隔200毫秒打印接口:水位线的值:xxxxx。如下:
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
//默认200毫秒插入水位线到流,可以设置水位线的插入流的时间间隔
env.getConfig().setAutoWatermarkInterval(6 * 1000L);
整个打印过程
命令行窗口:
[root@master ~]# nc -lk 9999
a 1000
a 2000
idea打印:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
(a,1000)
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
(a,2000)
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
Disconnected from the target VM, address: '127.0.0.1:58591', transport: 'socket'
水位线的值:-3001Process finished with exit code 130
通过结果我们可以知道,水位线的值随着事件时间1000,2000的变化而变化。如果输入a 2000后在输入a 1000,控制台打印结果是怎样的?那肯定打印的是:水位线的值:-3001,因为水位线的值和时间一样永远只会越来越大。
案例2 改造一下程序,新增如下代码,keyby后,把命令行输入的元素打印出来。
文章图片
nc -lk 9999启动socket监听9999端口
启动idea
命令行输入
[root@localhost ~]# nc -lk 9999
a 1000
a 2000
a 5000
a 6000
idea控制台打印:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
输入业务数据是:(a,1000)
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
输入业务数据是:(a,2000)
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
输入业务数据是:(a,5000)
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
输入业务数据是:(a,6000)
水位线的值:999
水位线的值:999
水位线的值:999
水位线的值:999
水位线的值:999
分析计算结果结果:
-9223372036854775807,-9223372036854775807,(a,1000),-4001,-4001,-4001,-4001,-4001,-4001,-4001,-4001,(a,2000),-3001,-3001,-3001,-3001,-3001,(a,5000),-1,-1,-1,(a,6000),999,999,999,999
不知道大家有没有一种感觉,水位线和业务数据什么关系?是不是类似生活中落花和流水的关系呢?业务数据就是河流中的水,水位线就像落在水中的花,他们两一起流向大海,水位线和业务数据一样都属于流中的一个元素。
5、有什么用 在流的世界逻辑时钟就是一个参照物。还是挂钟来举例吧,看看挂钟已经12点了,我们肯定在会暗示自己该放下手机了要睡觉了。针对源源不断的数据流,把数据流拆分为多段进行处理,针对每段数据进行统计,那什么时候触发统计呢?这个时候就会用这个逻辑时钟,窗口看看逻辑时间当前处于几点钟,发现窗口结束时间小于时钟的时间,窗口闭合进行统计。
案例1,水位线触发定时任务的执行
功能描述:水位线的当前时间戳大于定时任务的的触发时间后 触发定时任务的执行。
public class ExampleTest3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1])))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Tuple2 element, long recordTimestamp) {
return element.f1;
}
})
)
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction, String>() {
@Override
public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {
//out.collect("当前的水位线是:" + ctx.timerService().currentWatermark());
ctx.timerService().registerEventTimeTimer(value.f1 + 5000L);
out.collect("注册了一个时间戳是:" + new Timestamp(value.f1 + 5000L) + " 的定时器");
}@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect("定时器触发了!");
}
})
.print();
env.execute();
}
}
nc -lk 9999 开启socket服务,监听9999端口
命令行输入:a 1665367810000 //1665367810000对应的时间为2022-10-10 10:10:10
控制台输出:注册了一个时间戳是:2022-10-10 10:10:15.0 的定时器 //2022-10-10 10:10:15转换为时间戳为1665367815000
解释一下控制台输出结果
当前水位线的值:2022-10-10 10:10:10 - 5s -1毫秒 = 1665367810000 - 5000 -1 = 1665367804999。当水位线的值大于1665367815000定时任务触发。
命令行输入:1665367821000 //命令行输入2022-10-10 10:10:21对应的时间戳1665367821000将会触发定时任务
控制台输出:定时器触发了!
命名行打印输入
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea打印输入
注册了一个时间戳是:2022-10-10 10:10:15.0 的定时器
注册了一个时间戳是:2022-10-10 10:10:26.0 的定时器
定时器触发了!
案例2,水位线当前时间戳大于窗口结束时间触发窗口闭 案例day3.Example4
public class ExampleTest4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(
// 最大延迟时间设置为5秒
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Tuple2 element, long recordTimestamp) {
return element.f1;
// 告诉flink事件时间是哪一个字段
}
})
)
.keyBy(r -> r.f0)
// 5秒的事件时间滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction() {
@Override
public void process(String key, Context context, Iterable elements, Collector out) throws Exception {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
//System.out.println("当前窗口的结束值:" + context.currentWatermark());
//System.out.println("当前水位线的值:" + context.currentWatermark());
long count = elements.spliterator().getExactSizeIfKnown();
out.collect("用户" + key + " 在窗口" +
"" + new Timestamp(windowStart) + "~" + new Timestamp(windowEnd) + "" +
"中的pv次数是:" + count);
}
})
.print();
env.execute();
}
}
命令行输入:a 1665367810000 //flink将开启一个2022-10-10 10:10:10.0~2022-10-10 10:10:15的窗口,当水位线当前值(当前值指上面的当前时间)大于窗口结束时间对应的时间戳会触发窗口闭合。
命令行输入:a 1665367821000 //此时水位线当前值为:1665367821000 - 5000 -1 = 1665367815999,1665367815999转换为时间:2022-10-10 10:10:15,2022-10-10 10:10:15等于窗口结束时间,所以触发窗口闭合。
控制输出:用户a 在窗口2022-10-10 10:10:10.0~2022-10-10 10:10:15.0中的pv次数是:1
命令行
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea
当前窗口的结束值:1665367815999
当前水位线的值:1665367815999
用户a 在窗口2022-10-10 10:10:10.0~2022-10-10 10:10:15.0中的pv次数是:1
如果根据"处理时间"来进行统计分析,窗口要闭合进行统计,肯定有一个参考的时间,只是这个时间是cpu帮忙产生的,窗口的闭合根据cpu产生的时间进行闭合,但逻辑时钟的某瞬间的值是程序计算出来的,这也是为什么把水位线称为逻辑时钟。
6、迟到数据的处理 6.1、什么叫迟到数据
事件时间小于水位线当前时间戳,比如当前数据流的数据xxx携带的事件时间是2022:20:50,逻辑时钟的此时的时间为2022:20:51,那么flink认为xxx就是一条迟到数据。
案例描述:手动发送水位线,手动发送携带事件时间的元素。
public class ExampleTest5 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator result = env
.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
// 发送数据携带事件时间的数据hello world
ctx.collectWithTimestamp("hello world", 1000L);
// 发送水位线
ctx.emitWatermark(new Watermark(999L));
// 发送数据携带事件时间的数据 hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// 发送水位线
ctx.emitWatermark(new Watermark(1999L));
// 发送数据携带事件时间的数据hello late
ctx.collectWithTimestamp("hello late", 1000L);
}@Override
public void cancel() {}
})
.process(new ProcessFunction() {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
//System.out.println("当前水位线:" + ctx.timerService().currentWatermark());
//判断事件时间是否小于水位线
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {
System.out.println("迟到元素:" + value);
} else {
System.out.println("正常元素:" + value);
}
}
});
env.execute();
}
}
控制台输出:
正常元素:hello world
正常元素:hello flink
迟到元素:hello late
6.2、迟到元素的处理
理解了什么叫迟到元素,至于怎么处理,flink提供了几种方案,如
案例:迟到数据发送到"侧输出流"中
public class ExampleTest {// 定义侧输出流
private static OutputTag lateElement = new OutputTag("late-element") {
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator result = env
.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
// 发送数据携带事件时间的数据hello world
ctx.collectWithTimestamp("hello world", 1000L);
// 发送水位线
ctx.emitWatermark(new Watermark(999L));
// 发送数据携带事件时间的数据 hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// 发送水位线
ctx.emitWatermark(new Watermark(1999L));
// 发送数据携带事件时间的数据hello late
ctx.collectWithTimestamp("hello late", 1000L);
}@Override
public void cancel() {}
})
.process(new ProcessFunction() {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
//判断事件时间是否小于水位线
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {
ctx.output(lateElement, "迟到元素发送到侧输出流:" + value);
} else {
out.collect("正常到达的元素:" + value);
}
}
});
result.print("主流:");
result.getSideOutput(lateElement).print("侧输出流:");
env.execute();
}
}
idea控制台输出:
主流:> 正常到达的元素:hello world
主流:> 正常到达的元素:hello flink
侧输出流:> 迟到元素发送到侧输出流:hello late
思考:窗口,迟到元素,水位线之间有什么关联?
7、总结 水位线类似生活中的时钟,通过时钟我们知道当前时间处于几点几分秒,这个"当前时间"在flink里面对应一个时间戳,通过时间戳来触发窗口的闭合,触发定时任务的执行。也类似一个参照物的角色。