(4)Flink|(4)Flink CEP SQL贪婪词量演示

基于上一篇(3)Flink CEP SQL宽松近邻代码演示的延展,在上一篇中我们使用贪婪词量 +(至少匹配1行或多行),本篇将演示多种贪婪词量的效果:
(1)使用贪婪词量 *(匹配0行或多行)

public static void main(String[] args) { EnvironmentSettings settings = null; StreamTableEnvironment tEnv = null; try {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); tEnv = StreamTableEnvironment.create(env, settings); System.out.println("===============CEP_SQL_10================="); final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); DataStream dataStream = env.fromElements( new Ticker(1, "ACME", 22, 1, LocalDateTime.parse("2021-12-10 10:00:00", dateTimeFormatter)), new Ticker(3, "ACME", 19, 1, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)), new Ticker(4, "ACME", 23, 3, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)), new Ticker(5, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)), new Ticker(6, "Apple", 18, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)), new Ticker(7, "Apple", 16, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)), new Ticker(8, "Apple", 14, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)), new Ticker(9, "Apple", 19, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)), new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)), new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)), new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)), new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)), new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)), new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)), new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)), new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)), new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter))); Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder() .column("id", DataTypes.BIGINT()) .column("symbol", DataTypes.STRING()) .column("price", DataTypes.BIGINT()) .column("tax", DataTypes.BIGINT()) .column("rowtime", DataTypes.TIMESTAMP(3)) .watermark("rowtime", "rowtime - INTERVAL '1' SECOND") .build()); tEnv.createTemporaryView("CEP_SQL_10", table); String sql = "SELECT * " + "FROM CEP_SQL_10 " + "MATCH_RECOGNIZE ( " + "PARTITION BY symbol " +//按symbol分区,将相同卡号的数据分到同一个计算节点上。 "ORDER BY rowtime " +//在窗口内,对事件时间进行排序。 "MEASURES " +//定义如何根据匹配成功的输入事件构造输出事件 "e1.id as id,"+ "AVG(e1.price) as avgPrice,"+ "e1.rowtime AS start_tstamp, " + "e3.rowtime AS end_tstamp " + "ONE ROW PER MATCH " +//匹配成功输出一条 "AFTER MATCHskip to next row " +//匹配后跳转到下一行 "PATTERN ( e1 e2* e3) WITHIN INTERVAL '2' MINUTE" + "DEFINE " +//定义各事件的匹配条件 "e1 AS " + "e1.price = 25 , " + "e2 AS " + "e2.price > 10 AND e2.price <19," + "e3 AS " + "e3.price = 19 " + ") MR"; TableResult res = tEnv.executeSql(sql); res.print(); tEnv.dropTemporaryView("CEP_SQL_10"); }

匹配到了三组数据
(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

贪婪词量 *(匹配0行或多行)
(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

(2)使用贪婪词量 {n}(严格匹配n行)
(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

【(4)Flink|(4)Flink CEP SQL贪婪词量演示】(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

(3)使用贪婪词量 {n,}(n或者更多行(n≥O))
(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

(4)Flink|(4)Flink CEP SQL贪婪词量演示
文章图片

    推荐阅读