Flink temporal table join研究

作者:王东阳
前言
ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table具有普通table的特性,有具体独特的DDL/DML/QUERY语法,时间是其核心属性。历史意味着时间,意味着快照Snapshot。
Apache Flink遵循ANSI-SQL标准,Apache Flink中Temporal Table的概念也源于ANSI-2011的标准语义,但目前的实现在语法层面和ANSI-SQL略有差别,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的语法,Apache Flink在早期版本中仅仅支持 LATERAL TABLE(TemporalTableFunction)的语法,当前flinkv14版本中已经支持FOR SYSTEM_TIME AS OF语法。
【Flink temporal table join研究】由于Flink中基于eventtime 的 temporal table join 基于flink 的watermark机制实现,为了更好的让读者理解,本文首先介绍flink中的 动态表和时序表,时间概念,Watermark等相关知识,最后通过详细的代码用例介绍Flink中基于eventtime 的 temporal table join用法。
动态表和时序表
动态表
什么是动态表
动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。本质上,动态表上的连续查询非常类似于定义物化视图的查询。
动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
动态表转换
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:
Append-only 流: 仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
Retract 流: retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
Upsert 流: upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。
Flink在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。后面的样例代码中会展示转换的API以及Retract 流和Upsert 流的不同。
时态表(Temporal Tables)
时态表(Temporal Table)是一张随时间变化的表, 在 Flink 中称为动态表,时态表中的每条记录都关联了一个或多个时间段,所有的 Flink 表都是时态的(动态的)。也就是说时态表是动态表的特例,时态表一定是动态表,动态表不一定是时态表。
时态表包含表的一个或多个有版本的表快照,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。
Flink 使用主键约束和事件时间来定义一张版本表和版本视图,在后面介绍temporal join的相关样例中会展示这两种。
样例代码

  1. 环境初始化
    首先初始化StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv, 如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置当前运行模式为 STREAMING模式
env.setParallelism(1); 设置并行度是1主要是为测试的目的,便于观察join的结果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置时间属性是 EventTime
  1. 首先创建并注册一个普通表 RatesHistory
    DataStream ratesStream = env.fromElements(
    Row.of(LocalDateTime.parse("2021-08-21T09:02:00"), "US Dollar", 102), Row.of(LocalDateTime.parse("2021-08-21T09:00:00"), "Euro", 114), Row.of(LocalDateTime.parse("2021-08-21T09:00:00"), "Yen", 1), Row.of(LocalDateTime.parse("2021-08-21T10:45:00"), "Euro", 116), Row.of(LocalDateTime.parse("2021-08-21T11:15:00"), "Euro", 119), Row.of(LocalDateTime.parse("2021-08-21T11:49:00"), "Pounds", 108)) .returns( Types.ROW_NAMED( new String[] {"currency_time", "currency", "rate"}, Types.LOCAL_DATE_TIME, Types.STRING, Types.INT));

Table rateTable = tEnv.fromDataStream(ratesStream, Schema.newBuilder().build()); tEnv.registerTable("RatesHistory", rateTable); rateTable.printSchema(); tEnv.from("RatesHistory").execute().print();

得到RatesHistory的schema信息以及表中内容:
(
currency_time TIMESTAMP(9),
currency STRING,
rate INT
)
op currency_time currency rate
+I 2021-08-21 09:02:00.000000000 US Dollar 102
+I 2021-08-21 09:00:00.000000000 Euro 114
+I 2021-08-21 09:00:00.000000000 Yen 1
+I 2021-08-21 10:45:00.000000000 Euro 116
+I 2021-08-21 11:15:00.000000000 Euro 119
+I 2021-08-21 11:49:00.000000000 Pounds 108
6 rows in set
  1. 声明版本表
    在 Flink 中,定义了主键约束和事件时间属性的表就是版本表。相比上面的代码,在使用fromDataStream的第二个参数Schema里面,通过columnByExpression 指定事件时间的时间戳(flink中要求必须是 TIMESTAMP(3) ), 通过 primaryKey("currency") 指定 currency 主键约束。
// version table
Table versionedTable = tEnv.fromDataStream(ratesStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(currency_time AS TIMESTAMP(3))") .primaryKey("currency") .build()); tEnv.registerTable("versionRate", versionedTable); System.out.println("versioned table get"); versionedTable.printSchema(); tEnv.from("versionRate").execute().print();

打印versionRate的schema信息以及表中内容:
(
currency_time TIMESTAMP(9),
currency STRING NOT NULL,
rate INT,
rowtime TIMESTAMP(3) AS CAST(currency_time AS TIMESTAMP(3)),
CONSTRAINT PK_currency PRIMARY KEY (currency) NOT ENFORCED
)
op currency_time currency rate rowtime
+I 2021-08-21 09:02:00.000000000 US Dollar 102 2021-08-21 09:02:00.000
+I 2021-08-21 09:00:00.000000000 Euro 114 2021-08-21 09:00:00.000
+I 2021-08-21 09:00:00.000000000 Yen 1 2021-08-21 09:00:00.000
+I 2021-08-21 10:45:00.000000000 Euro 116 2021-08-21 10:45:00.000
+I 2021-08-21 11:15:00.000000000 Euro 119 2021-08-21 11:15:00.000
+I 2021-08-21 11:49:00.000000000 Pounds 108 2021-08-21 11:49:00.000
  1. 声明版本视图
    Flink 也支持定义版本视图只要一个视图包含主键和事件时间便是一个版本视图。为了在 RatesHistory 上定义版本表,Flink 支持通过去重查询定义版本视图, 去重查询可以产出一个有序的 changelog 流,去重查询能够推断主键并保留原始数据流的事件时间属性。
// https://nightlies.apache.org/...
Table versionedRateView = tEnv.sqlQuery( "select currency, rate, currency_time " + // (1) `currency_time` 保留了事件时间 "from ( " + "select *, " + "ROW_NUMBER() OVER (PARTITION BY currency " + //(2) `currency` 是去重query的unique key,作为主键 "ORDER BY currency_time DESC) AS rowNum " + "FROM RatesHistory ) " + "WHERE rowNum = 1"); tEnv.createTemporaryView("versioned_rates", versionedRateView); versionedRateView.printSchema(); tEnv.from("versioned_rates").execute().print();

对于去重语法中的相关参数描述如下
Parameter Specification:
ROW_NUMBER(): 给每一行分配一个从1开始的递增的唯一的序号。
PARTITION BY col1[, col2...]: 指定分区列.
ORDER BY time_attr [asc|desc]:指定排序所基于的列, 必须是 time attribute. 当前Flink支持 processing time attribute 和event time attribute. Ordering by ASC 意味保留最老的那列, ordering by DESC 意味保留最新的那列.
WHERE rownum = 1: rownum = 1 用于 Flink 获取到去重后的数据。
行 (1) 保留了事件时间作为视图 versioned_rates 的事件时间,行 (2) 使得视图 versioned_rates 有了主键, 因此视图 versioned_rates 是一个版本视图。
视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和事件时间。
打印schema和versioned_rates表中内容
(
currency STRING,
rate INT,
currency_time TIMESTAMP(9)
)
op currency rate currency_time
+I US Dollar 102 2021-08-21 09:02:00.000000000
+I Euro 114 2021-08-21 09:00:00.000000000
+I Yen 1 2021-08-21 09:00:00.000000000
-U Euro 114 2021-08-21 09:00:00.000000000
+U Euro 116 2021-08-21 10:45:00.000000000
-U Euro 116 2021-08-21 10:45:00.000000000
+U Euro 119 2021-08-21 11:15:00.000000000
+I Pounds 108 2021-08-21 11:49:00.000000000
可以看到Euro中一共有5条记录,属于Retract 类型的changelogstream,包含UPDATE_BEFORE, UPDATE_AFTER两种
| +I | Euro | 114 | 2021-08-21 09:00:00.000000000 |
| -U | Euro | 114 | 2021-08-21 09:00:00.000000000 |
| +U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
| -U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
| +U | Euro | 119 | 2021-08-21 11:15:00.000000000 |
这5条记录的具体生成过程如下:
当第一条 Euro数据 LocalDateTime.parse("2021-08-21T09:00:00"), "Euro", 114 进入到versioned_rates 所对应的底层Flink算子中时,由于当前Euro只有这一条记录,执行去重语法后就得到 Euro 114 2021-08-21 09:00:00.000000000
当第二条 Euro数据 LocalDateTime.parse("2021-08-21T10:45:00"), "Euro", 116 进入到versioned_rates 所对应的底层Flink算子中时,执行去重语法获取最新的记录就得到 Euro 116 2021-08-21 10:45:00.000000000, 由于 currency是主键,所以执行更新操作,生成UPDATE_BEFORE,UPDATE_AFTER 两条changelogstream。
当第三条 Euro数据 LocalDateTime.parse("2021-08-21T11:15:00"), "Euro", 119 进入到versioned_rates 所对应的底层Flink算子中时,执行去重语法获取最新的记录就得到 Euro 119 2021-08-21 11:15:00.000000000, 由于 currency是主键,所以执行更新操作,生成UPDATE_BEFORE,UPDATE_AFTER 两条changelogstream。
  1. 通过table API显示转为datastream
    flink API 中提供了 toChangelogStream 接口
// default retract mode
tEnv.toChangelogStream(versionedRateView) .executeAndCollect() .forEachRemaining(System.out::println); tEnv.toChangelogStream(versionedRateView, Schema.newBuilder().build(), ChangelogMode.upsert()) .executeAndCollect() .forEachRemaining(System.out::println);

+I[US Dollar, 102, 2021-08-21T09:02]
+I[Euro, 114, 2021-08-21T09:00]
+I[Yen, 1, 2021-08-21T09:00]
-U[Euro, 114, 2021-08-21T09:00]
+U[Euro, 116, 2021-08-21T10:45]
-U[Euro, 116, 2021-08-21T10:45]
+U[Euro, 119, 2021-08-21T11:15]
+I[Pounds, 108, 2021-08-21T11:49]
得到输出
+I[US Dollar, 102, 2021-08-21T09:02]
+I[Euro, 114, 2021-08-21T09:00]
+I[Yen, 1, 2021-08-21T09:00]
+U[Euro, 116, 2021-08-21T10:45]
+U[Euro, 119, 2021-08-21T11:15]
+I[Pounds, 108, 2021-08-21T11:49]
水印 Watermark
此章节主要参考
《flink内核原理与实现》 4.4章节 水印
《Flink原理、实战与性能优化》4.2章节 时间概念与Watermark
时间概念
对于流式数据处理,最大的特点是数据上具有时间的属性特征,Flimk根据时间产生的位置不同,将时间区分为三种时间概念,分别为 事件生成时间(Event Time)、事件接入时间(Ingestion Time)和事件处理时间(Processing Time)
1.事件时间(Event Time)
事件时间(Event Time)是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink之前就已经嵌入到事件中,时 间顺序取决于事件产生的地方,和下游数据处理系统的时间无关。事件数据具有不变的事件时间属性,该时间自事件元素产生就不会改 变。通常情况下可以在Flink系统中指定事件时间属性或者设定时间提取器来提取事件时间。
所有进入到Flink流式系统处理的事件,其时间都是在外部系统中产生,经过网络进入到Flink系统内处理的,在理论情况下(所有系统 都具有相同系统时钟),事件时间对应的时间戳一定会早于在Flink系统中处理的时间戳,但在实际情况中往往会出现数据记录乱序、延迟 到达等问题。基于EventTime的时间概念,数据处理过程依赖于数据本身产生的时间,而不是Flink系统中Operator所在主机节点的系统时间,这样能够借助于事件产生时的时间信息来还原事件的先后关系。
2.接入时间(Ingestion Time)
接入时间(Ingestion Time)是数据进入Flink系统的时间,Ingestion Time依赖于Source Operator所在主机的系统时钟。Ingestion Time介于Event Time和Process Time之间,相对于ProcessTime,Ingestion Time生成的代价相对较高,Ingestion Time具有一 定的可预见性,主要因为Ingestion Time在数据接入过程生成后,时间戳就不再发生变化,和后续数据处理Operator所在机器的时钟没有 关系,从而不会因为某台机器时钟不同步或网络时延而导致计算结果不准确的问题。但是需要注意的是相比于Event Time,Ingestion Time不能处理乱序事件,所以也就不用生成对应的Watermarks
3.处理时间(Processing Time)
处理时间(Processing Time)是指数据在操作算子计算过程中获取到的所在主机时间。当用户选择使用Processing Time时,所有和时 间相关的计算算子,例如Windows计算,在当前的任务中所有的算子将直接使用其所在主机的系统时间。Processing Time是Flink系统中最简单的一种时间概念,基于Processing Time时间概念,Flink的程序性能相对较高,延时也相对较低,对接入到系统中的数据时间相关的计算完全交给算子内部决定,时间窗口计算依赖的时间都是在具体算子运行的过程中产生,不需要做任何时间上的对比和协调。但 Processing Time时间概念虽然在性能和易用性的角度上具有优势,但考虑到对数据乱序处理的情况,Processing Time就不是最优的选择。同时在分布式系统中,数据本身不乱序,但每台机器的时间如果不同步,也可能导致数据处理过程中数据乱序的问题,从而影响计算结果。总之,Processing Time概念适用于时间计算精度要求不是特别高的计算场景,例如统计某些延时非常高的日志数据等。
4.时间概念指定
在Flink中默认情况下使用是Process Time时间概念,如果用户选择使用Event Time或者Ingestion Time概念,则需要在创建的 StreamExecutionEnvironment中调用setStreamTimeCharacteristic()方法设定系统的时间概念,如下代码使用 TimeCharacteristic.EventTime作为系统的时间概念,这样对当前的StreamExecutionEnvironment会全局生效
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

setStreamTimeCharacteristic 在Flink1.14中已经不建议使用了,进入到setStreamTimeCharacteristic底层源码,可以看到底层是调用了getConfig().setAutoWatermarkInterval
@Deprecated
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = (TimeCharacteristic)Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { this.getConfig().setAutoWatermarkInterval(0L); } else { this.getConfig().setAutoWatermarkInterval(200L); }

}
5.如何解决乱序问题
这里引用下 Apache Flink 漫谈系列 - Watermark是个啥? 这篇文章中的实例图片
当Watermark的时间戳等于Event中携带的EventTime时候,也就是(Watermark=EventTime) 的计算方式如下
由于watermark = eventtime, 所以 在Eventtime等于15s的时候, windows2[10,15) 这个窗口就触发了进行了计算,然后在16s的时候乱序的11号数据到达,被丢弃。
如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下
由于 Watermark = EventTime -5s,在Eventtime为15s的时候,Watermark等于15-5=10s, 并不会触发Window2窗口计算, 只有在Eventtime为20s的时候, Watermark等于20-5 = 15s ,触发Window2[10, 15) 的计算,由于乱序的11号记录在之前的16s时候到达了,所以Window2的计算包含了11号记录。
Watermark生成
从流处理原始设备产生事件,到Flink读取到数据,再到Flink多个算子处理数据,在这个过程中,会受到网络延迟、数据乱序、背压、Failover等多种情况的影响,导致数据是乱序的。虽然大部分情况下没有问题,但是不得不在设计上考虑此类异常情况,为了保证计算结果的正确性,需要等待数据,这带来了计算的延迟。对于延迟太久的数据,不能无限期地等下去,所以必须有一个机制,来保证特定的时间后一定会触发窗口进行计算。
比如基于事件时间的Window创建后,具体该如何确定属于该Window的数据元素已经全部到达。如果确定全部到达,就可以对Window的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达Flink系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。Flink会将用读取进入系统的最新事件时间减去固定的时间间隔作为Watermark,该时间间隔为用户外部配置的支持最大延迟到达的时间长度,也就是说理论上认为不会有事件超过该间隔到达,否则就认为是迟到事件或异常事件。
通常Watermark在Source Function中生成,如果是并行计算的任务,在多个并行执行的Source Function中,相互独立产生各自的 Watermark。而Flink提供了额外的机制,允许在调用DataStream API操作(如map、filter等)之后,根据业务逻辑的需要,使用时间戳和Watermark生成器修改数据记录的时间戳和Watermark。
  1. Source Function
    Source Function可以直接为数据元素分配时间戳,同时也会向下游 发 送 Watermark 。 在 Source Function 中 为 数 据 分 配 了 时 间 戳 和Watermark就不必在DataStream API中使用了。需要注意的是:如果一个 timestamp 分 配 器 被 使 用 的 话 , 由 源 提 供 的 任 何 Timestamp 和Watermark都会被重写。
public class ExampleCountSource implements SourceFunction, CheckpointedFunction {
public void run(SourceContext ctx) { ... T next = getNext(); // 为元素赋予时间戳 ctx.collectWithTimestamp(next, next.getEventTimestamp()); // 生成Watermark发送到下游 if (next.hasWatermarkTime()){ ctx.emitWatermark(new Watermark(next.getWatermarkTime())); } ...}

}
另外需要注意下官方源码中的一段注释:
  • Sources may assign timestamps to elements and may manually emit watermarks.
    • However, these are only interpreted if the streaming program runs on
    • {@link TimeCharacteristic#EventTime}. On other time characteristics
    • ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
    • the watermarks from the source function are ignored.
    如果当前flink环境中TimeCharacteristic 设置的时间概念是EventTime, Source function会为元素分配 timestamp 和 触发watermark
    如果当前flink环境中TimeCharacteristic 设置的时间概念是IngestionTime 或 ProcessingTime, 那么source function中产生的watermark会忽略掉。
  • DataStream API
    DataStream API中使用的TimestampAssigner接口定义了时间戳的提 取 行 为 , 其 有 两 个 不 同 接 AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks,分别代表了不同的Watermark生成策略
周期性Watermark策略 : 在 Flink 中 叫 作PeriodicWatermarkAssigner,周期性(一定时间间隔或者达到一定的记 录 条 数 ) 地 产 生 一 个 Watermark 。 在 实 际 的 生 产 中 使 用 周 期 性Watermark策略的时候,必须注意时间和数据量,结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。这个策略又可以分为两种:
AscendingTimestamps:递增Watermark,作用在Flink SQL中的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-1,此处减1的目的是确保有最大时间戳的事件不会被当做迟到数据丢弃。
BoundedOutOfOrderTimestamps:固定延迟Watermark,作用在Flink SQL的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-固定延迟 。
每事件Watermark策略 :在 Flink 中 叫 作PuntuatedWatamarkAssigner ,对每一个事件都会尝试进行Watermark的生成,但是如果生成的Watermark是null或者Watermark小于之前的Watermark,则该Watermark不会发往下游,因为发往下游也不会有任何效果,不会触发任何窗口的执行。 在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark,在一定程度上会对下游算子造成压力,所以只有在实时性要求非常高的场景下才会选择Punctuated的方式进行Watermark的生成。
  1. 多流的Watermark
    Watermark 在 作 业 的 DAG 从 上 游 向 下 游 传 递 , 算 子 收 到 上 游Watermark后会更新其Watermark。如果新的Watermark大于算子的当前Watermark,则更新算子的Watermark为新Watermark,并发送给下游算子 。Watermark是在Source Function中生成或者在后续的DataStreamAPI中生成的。
Flink作业一般是并行执行的,作业包含多个Task,每个Task运行一个或一组算子(OperatorChain)实例,Task在生成Watermark的时候是相互独立的,也就是说在作业中存在多个并行的Watermark。 某些算子会有多个上游输入,如Union或keyBy、partition之后的算子。在Flink的底层执行模型上,多流输入会被分解为多个双流输入,所以对于多流Watermark的处理也就是双流Watermark的处理,无论是哪一个流的Watermark进入算子,都需要跟另一个流的当前算子进行 比 较 ,选 择 较 小 的 Watermark , 即Min ( input1Watermark,intput2Watermark ,与 算 子 当 前 的Watermark 比 较 , 如 果 大 于 算 子 当 前 的 Watermark , 则 更 新 算 子 的Watermark为新的Watermark,并发送给下游
如上图,Source算子产生各自的Watermark,并随着数据流流向下游的map算子,map算子是无状态计算,所以会将Watermark向下透 传。window算子收到上游两个输入的Watermark后,选择其中较小的一个发送给下游,window(1)算子比较Watermark 29和Watermark 14,选择Watermark 14作为算子当前Watermark,并将Watermark 14发往下游,window(2)算子也采用相同的逻辑。
  1. 新的水印策略
    在flink 1.11之前的版本中,提供了两种生成水印(Watermark)的策略,分别是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,这两个接口都继承自TimestampAssigner接口。
用户想使用不同的水印生成方式,则需要实现不同的接口,但是这样引发了一个问题,对于想给水印添加一些通用的、公共的功能则变得复杂,因为我们需要给这两个接口都同时添加新的功能,这样还造成了代码的重复。
所以为了避免代码的重复,在flink 1.11 中对flink的水印生成接口进行了重构,
4.1 新的水印生成接口
当我们构建了一个DataStream之后,使用assignTimestampsAndWatermarks方法来构造水印,新的接口需要传入一个WatermarkStrategy对象。
DataStream#assignTimestampsAndWatermarks(WatermarkStrategy)
WatermarkStrategy 这个接口是做什么的呢?这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是下面的这个方法。
/**
  • Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
    */
@Override
WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个 WatermarkGenerator,我们在进入这里边看看。
@Public
public interface WatermarkGenerator {
/**
  • Called for every event, allows the watermark generator to examine and remember the
  • event timestamps, or to emit a watermark based on the event itself.
    */
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
  • Called periodically, and might emit a new watermark, or not.
    *
  • The interval in which this method is called and Watermarks are generated
  • depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
    */
void onPeriodicEmit(WatermarkOutput output);
}
这个方法简单明了,主要是有两个方法:
onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.
onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:env.getConfig().setAutoWatermarkInterval(5000L);
我们自己实现一个简单的周期性的发射水印的例子:
在这个onEvent方法里,我们从每个元素里抽取了一个时间字段,但是我们并没有生成水印发射给下游,而是自己保存了在一个变量里,在onPeriodicEmit方法里,使用最大的日志时间减去我们想要的延迟时间作为水印发射给下游。
DataStream withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
new WatermarkStrategy(){ @Override public WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context){ return new WatermarkGenerator(){ private long maxTimestamp; private long delay = 3000; @Override public void onEvent( Tuple2 event, long eventTimestamp, WatermarkOutput output){ maxTimestamp = Math.max(maxTimestamp, event.f1); } @Override public void onPeriodicEmit(WatermarkOutput output){ output.emitWatermark(new Watermark(maxTimestamp - delay)); } }; } });

4.2 内置水印生成策略
为了方便开发,flink提供了一些内置的水印生成方法供我们使用。
4.2.1 固定延迟生成水印
通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。
WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)
我们实现一个延迟3秒的固定延迟水印,可以这样做:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
他的底层使用的WatermarkGenerator接口的一个实现类BoundedOutOfOrdernessWatermarks。
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
4.2.2 单调递增生成水印
通过静态方法forMonotonousTimestamps来提供.
WatermarkStrategy.forMonotonousTimestamps()
这个也就是相当于上述的延迟策略去掉了延迟时间,以event中的时间戳充当了水印。
在程序中可以这样使用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
它的底层实现是AscendingTimestampsWatermarks,其实它就是BoundedOutOfOrdernessWatermarks类的一个子类,没有了延迟时间,我们来看看具体源码的实现.
@Public
public class AscendingTimestampsWatermarks extends BoundedOutOfOrdernessWatermarks {
/**
  • Creates a new watermark generator with for ascending timestamps.
    */
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
4.2.3 eventtime的获取
上述我们讲了flink自带的两种水印生成策略,但是对于我们使用eventtime语义的时候,我们想从我们的自己的数据中抽取eventtime,这个就需要TimestampAssigner了.
@Public
@FunctionalInterface
public interface TimestampAssigner {
............

long extractTimestamp(T element, long recordTimestamp);
}
使用的时候我们主要就是从我们自己的元素element中提取我们想要的eventtime。
使用flink自带的水印策略和eventtime抽取类,可以这样用:
List rateHistoryData = https://www.it610.com/article/Lists.newArrayList();
rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1L))); rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1L))); rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1L))); rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5L))); rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7L))); DataStream rateStream = env.fromCollection(rateHistoryData) .assignTimestampsAndWatermarks( WatermarkStrategy // here is needed for using withTimestampAssigner .forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );

其中.withTimestampAssigner((event, timestamp) -> event.f2.getTime()) 中的(event, timestamp) -> event.f2.getTime() 是一个闭包,用来标识如何从元素中得到eventtime. 在这个例子中,元素类型是元组 Tuple3, 该元组中第三个元素也就是对应的f2就是事件的发生时间。
Event Time Temporal Join
概念介绍
Event Time Temporal joins 可以利用 versioned table执行join操作,也就是说一个表可以使用变化中的元素进行加宽,通过查询在特定时间点上值。
Temporal joins 接受一个左表,会把左表中每一行数据在右表( versioned table )中找到对应的行数据进行关联。 Flink使用SQL:2011标准中的 FOR SYSTEM_TIME AS OF SQL 语法执行这个操作。 Temporal joins的语法如下
SELECT [column_list]
FROM table1 [AS ]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS ]
ON table1.column-name1 = table2.column-name1
借助 event-time 属性(也称为 rowtime 属性), 就可以检索得到特定key在过去某个时间点上的值,从而可以把两个表中在相同时间点上的数据进行join。 versioned table保存了不同版本也就是在不同时间点的数据。
举例,假设我们有一个订单的表,不同的订单中价格使用的货币不一样,而货币在不同时刻的汇率也不一样,为了使用统一的货币例如人民币计算各个订单的消费额,每一个订单需要和订单发生时的货币汇率进行join。
内部实现原理
可以去 孙金城 大佬的文章 Apache Flink 漫谈系列(11) - Temporal Table JOIN ,不过这篇文章比较老了,里面的代码还是使用 LATERAL TABLE 语法进行join,不过里面的内部实现原理的示意图倒是对研究有一定的参考意义。
注意事项
event-time temporal join 是通过左右输入流中的watermark触发的,所以要确保join两边的watermark设置正确;
The event-time temporal join 要求 temporal join 条件等式中必须包含主键;
样例代码
  1. Table function join
    Flink早期是使用 LATERAL TABLE 语法结合 Table function 实现的 Temporal table join , 这种方式没有FOR SYSTEM_TIME AS OF语法中对于watermark和主键的强制要求,具体使用方式可以看下面的代码:
1.0 环境初始化
首先初始化StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv, 如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置当前运行模式为 STREAMING模式
env.setParallelism(1); 设置并行度是1主要是为测试的目的,便于观察join的结果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置时间属性是 EventTime,
1.1 创建和注册左表 Orders
构建填充到Orders中的数据
List> orderData = https://www.it610.com/article/Lists.newArrayList();
orderData.add(new Tuple3<>(2L, "Euro", new Timestamp(2L))); orderData.add(new Tuple3<>(1L, "US Dollar", new Timestamp(3L))); orderData.add(new Tuple3<>(50L, "Yen", new Timestamp(4L))); orderData.add(new Tuple3<>(3L, "Euro", new Timestamp(5L))); orderData.add(new Tuple3<>(5L, "Euro", new Timestamp(9L)));

基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置时间戳和watermark,然后基于DataStream中构建Table并注册为表名 Orders。由于Tuple3构建的DataStream中的列名默认分别是f0,f1,f2, 这里 $("amount"), $("currency"), $("eventtime").rowtime() 的作用就是把列名改为了 amount, currency, eventtime,同时将eventtime标记为rowtime()。
DataStream> orderStream = env.fromCollection(orderData)
.assignTimestampsAndWatermarks( WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) ); Table orderTable = tEnv.fromDataStream(orderStream, $("amount"), $("currency"), $("eventtime").rowtime()); // here we use rowtiime() tEnv.registerTable("Orders", orderTable);

注意这里虽然使用了assignTimestampsAndWatermarks也设置了Watermark,但是在本例中并没有用到,因为 LATERAL TABLE 语法只需要Timestamp提取eventtime。
打印Schema和表里面的内容
orderTable.printSchema(); //only for debug
tEnv.from("Orders").execute().print(); // only for debug

(
amount BIGINT,
currency STRING,
eventtime TIMESTAMP(3) ROWTIME
)
op amount currency eventtime
+I 2 Euro 1970-01-01 00:00:00.002
+I 1 US Dollar 1970-01-01 00:00:00.003
+I 50 Yen 1970-01-01 00:00:00.004
+I 3 Euro 1970-01-01 00:00:00.005
+I 5 Euro 1970-01-01 00:00:00.009
1.2 创建和注册右表 RatesHistory
构建数据
List rateHistoryData = https://www.it610.com/article/Lists.newArrayList();
rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1L))); rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1L))); rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1L))); rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5L))); rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7L)));

基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置时间戳和watermark,然后基于DataStream中构建Table并注册为表名 RatesHistory。由于Tuple3构建的DataStream中的列名默认分别是f0,f1,f2, 这里 $("amount"), $("currency"), $("eventtime").rowtime() 的作用就是把列名改为了 currency, rate, eventtime,同时将eventtime标记为rowtime()。
DataStream rateStream = env.fromCollection(rateHistoryData)
.assignTimestampsAndWatermarks( WatermarkStrategy // here is needed for using withTimestampAssigner .forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) ); Table rateTable = tEnv.fromDataStream(rateStream, $("currency"), $("rate"), $("eventime").rowtime()); tEnv.registerTable("RatesHistory", rateTable);

打印Schema和表里面的内容
rateTable.printSchema(); //only for debug
tEnv.from("RatesHistory").execute().print(); // only for debug

(
currency STRING,
rate BIGINT,
eventime TIMESTAMP(3) ROWTIME
)
op currency rate eventime
+I US Dollar 102 1970-01-01 00:00:00.001
+I Euro 114 1970-01-01 00:00:00.001
+I Yen 1 1970-01-01 00:00:00.001
+I Euro 116 1970-01-01 00:00:00.005
+I Euro 119 1970-01-01 00:00:00.007
1.3 注册表函数
基于rateTable创建和注册为表函数 Rates,如下:
tEnv.registerFunction(
"Rates", rateTable.createTemporalTableFunction("eventime", "currency"));

需要注意createTemporalTableFunction的第一个参数传入时间戳所在列,第二个参数传入两表联合时进行匹配的列,例如在我们的例子中,Orders 表和 RatesHistory 表基于currency进行匹配。
1.4 执行联合查询
执行下面的联合查询语句:
String sqlQuery =
"SELECT o.eventtime, o.currency, o.amount, r.rate, " + " o.amount * r.rate as amount_sum " + "from " + " Orders AS o, " + " LATERAL TABLE (Rates(o.eventtime)) AS r " + "WHERE r.currency = o.currency"; tEnv.sqlQuery(sqlQuery).execute().print();

查询结果如下:
op eventtime currency amount rate amount_sum
+I 1970-01-01 00:00:00.003 US Dollar 1 102 102
+I 1970-01-01 00:00:00.004 Yen 50 1 50
+I 1970-01-01 00:00:00.002 Euro 2 114 228
+I 1970-01-01 00:00:00.005 Euro 3 116 348
+I 1970-01-01 00:00:00.009 Euro 5 119 595
可以看到在不同的eventtime上,Euro 对应的rate不一样,最终计算总和的时候采用两边相应时间点的数据进行计算。
  1. FOR SYSTEM_TIME AS OF
    2.0 环境初始化
    首先初始化StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv, 如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置当前运行模式为 STREAMING模式
env.setParallelism(1); 设置并行度是1主要是为测试的目的,便于观察join的结果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置时间属性是 EventTime,
2.1 创建和注册左表 Orders
构建填充到Orders中的数据
List> orderData = https://www.it610.com/article/Lists.newArrayList();
orderData.add(new Tuple3<>(2L, "Euro", new Timestamp(2000L))); orderData.add(new Tuple3<>(1L, "US Dollar", new Timestamp(3000L))); orderData.add(new Tuple3<>(50L, "Yen", new Timestamp(4000L))); orderData.add(new Tuple3<>(3L, "Euro", new Timestamp(5000L))); orderData.add(new Tuple3<>(5L, "Euro", new Timestamp(9000L)));

基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置时间戳和watermark.
DataStream> orderStream = env.fromCollection(orderData)
.assignTimestampsAndWatermarks( WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(10000)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );

然后基于DataStream构建Table并注册为表名 Orders。
Table orderTable = tEnv.fromDataStream(orderStream, Schema.newBuilder()
.columnByExpression("rowtime", "CAST(f2 AS TIMESTAMP(3))") .watermark("rowtime", "SOURCE_WATERMARK()") .build()); tEnv.registerTable("Orders", orderTable);

注意这里创建 Table的API与前面Table function join的不同,fromDataStream的第二个参数是Schema 类型,这段代码达到的功能是:
自动从DataStream中继承所有的物理列
通过创建一个计算列来访问流记录中的时间戳,对应代码中columnByExpression (这里是创建了一个rowtime属性列)
继承DataStream中已有的watermark, 对应代码中("rowtime", "SOURCE_WATERMARK()") ,这种用法的前提是DataStream中已经通过定义了相关的watermark 策略,在本例中我们使用 assignTimestampsAndWatermarks 对orderStream添加了watermark。
打印Schema和表里面的内容
orderTable.printSchema(); //only for debug
tEnv.from("Orders").execute().print(); // only for debug

(
f0 BIGINT NOT NULL,
f1 STRING,
f2 TIMESTAMP(9),
rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK()
)
op f0 f1 f2 rowtime
+I 2 Euro 1970-01-01 08:00:02.000000000 1970-01-01 08:00:02.000
+I 1 US Dollar 1970-01-01 08:00:03.000000000 1970-01-01 08:00:03.000
+I 50 Yen 1970-01-01 08:00:04.000000000 1970-01-01 08:00:04.000
+I 3 Euro 1970-01-01 08:00:05.000000000 1970-01-01 08:00:05.000
+I 5 Euro 1970-01-01 08:00:09.000000000 1970-01-01 08:00:09.000
可以看到orderTable的Schema中多了 下面两条属性信息,分别对应代码中设置的rowtime 和 watermark
rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK()
2.2 创建和注册右表 RatesHistory
构建填充到 RatesHistory 中的数据
List rateHistoryData = https://www.it610.com/article/Lists.newArrayList();
rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1000L))); rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1000L))); rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1000L))); rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5000L))); rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7000L)));

基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置时间戳和watermark.
DataStream rateStream = env.fromCollection(rateHistoryData)
.assignTimestampsAndWatermarks( WatermarkStrategy // here is need for using withTimestampAssigner .forBoundedOutOfOrderness(Duration.ofSeconds(1000)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );

然后基于DataStream构建Table并注册为表名 Orders。
Table rateTable = tEnv.fromDataStream(
rateStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(f2 AS TIMESTAMP(3))") .watermark("rowtime", "SOURCE_WATERMARK()") .primaryKey("f0") .build()); tEnv.createTemporaryView("RatesHistory", rateTable);

注意这里RatesHistory的Schema中多了一个 primaryKey("f0"), 否则在执行join的时候会报错
Invalid primary key 'PK_f0'. Column 'f0' is nullable.
因为在Flink中,如果要支持 FOR SYSTEM_TIME AS OF, 右表中必须要有主键,而且这个主键对应join操作中on等值匹配的列。
打印Schema和表里面的内容
rateTable.printSchema(); //only for debug
tEnv.from("RatesHistory").execute().print();

(
f0 STRING NOT NULL,
f1 BIGINT NOT NULL,
f2 TIMESTAMP(9),
rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK(),
CONSTRAINT PK_f0 PRIMARY KEY (f0) NOT ENFORCED
)
op f0 f1 f2 rowtime
+I US Dollar 102 1970-01-01 08:00:01.000000000 1970-01-01 08:00:01.000
+I Euro 114 1970-01-01 08:00:01.000000000 1970-01-01 08:00:01.000
+I Yen 1 1970-01-01 08:00:01.000000000 1970-01-01 08:00:01.000
+I Euro 116 1970-01-01 08:00:05.000000000 1970-01-01 08:00:05.000
+I Euro 119 1970-01-01 08:00:07.000000000 1970-01-01 08:00:07.000
可以看到rateTable的Schema中多了 下面三条属性信息,分别对应代码中设置的rowtime(计算列) , watermark和 primaryKey
rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK(),
CONSTRAINT PK_f0 PRIMARY KEY (f0) NOT ENFORCED
2.3 执行join
String sqlQuery2 =
"SELECT o.f1 as currency, o.f0 as amount, o.rowtime, r.f1 as rate, " + " o.f0 * r.f1 as amount_sum " + "from " + " Orders AS o " + " JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " + "ON o.f1 = r.f0"; tEnv.sqlQuery(sqlQuery2).execute().print();

得到
op currency amount rowtime rate amount_sum
+I US Dollar 1 1970-01-01 08:00:03.000 102 102
+I Yen 50 1970-01-01 08:00:04.000 1 50
+I Euro 2 1970-01-01 08:00:02.000 114 228
+I Euro 3 1970-01-01 08:00:05.000 116 348
+I Euro 5 1970-01-01 08:00:09.000 119 595
  1. 其他
    在Flink v14.3中registerTable这个API已经不建议使用了,可以改为 tEnv.createTemporaryView
参考资料
Apache Flink 漫谈系列(11) - Temporal Table JOIN https://developer.aliyun.com/...
动态表 https://nightlies.apache.org/...动态表-dynamic-table
Apache Flink 漫谈系列 - Watermark是个啥? https://mp.weixin.qq.com/s?__...
Apache Flink 漫谈系列(09) - JOIN 算子 https://developer.aliyun.com/...
flink教程-聊聊 flink 1.11 中新的水印策略 https://cloud.tencent.com/dev...
聊聊flink的TableFunction https://segmentfault.com/a/11...
Event Time Temporal Join https://nightlies.apache.org/...
DataStream API https://nightlies.apache.org/...
《flink内核原理与实现》 4.4章节 水印
《Flink原理、实战与性能优化》4.2章节 时间概念与Watermark
Watermark 机制 https://www.bilibili.com/vide...

    推荐阅读