青春须早为,岂能长少年。这篇文章主要讲述Flink Streaming-Sink相关的知识,希望能为你提供帮助。
大数据技术AI
Flink/Spark/Hadoop/数仓,数据分析、面试,源码解读等干货学习资料
100篇原创内容
该连接器提供了对 Flink 文件系统抽象 支持的文件系统中的分区文件的访问.
文件系统连接器本身就被包括在 Flink 中,不需要任何额外的依赖。当从文件系统中读取或向文件系统写入记录时,需要指定相应的记录格式。
文件系统连接器支持对本地文件系统或分布式文件系统的读取和写入。可以通过如下方式定义文件系统表:
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
connector = filesystem,-- 必选: 指定连接器类型
path = file:///path/to/whatever,-- 必选: 指向目录的路径
format = ...,-- 必选: 文件系统连接器需要指定格式
partition.default-name = ...,-- 可选: 动态分区模式下分区字段值是 null 或空字符串时,默认的分区名。
sink.shuffle-by-partition.enable = ...,-- 可选: 该选项开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 的文件数,但可能会导致数据倾斜,默认值是 false.
...
)
针对流的文件系统 sources 目前还在开发中。将来,社区会不断添加对常见的流处理场景的支持, 比如对分区和目录的检测等。
新版的文件系统连接器和旧版的文件系统连接器有很大不同:path 参数指定的是一个目录而不是一个文件,该目录下文件的格式也不是肉眼可读的。
1、分区文件Flink 的文件系统连接器在对分区的支持上,使用了标准的 hive 格式。不过,它不需要预先注册分区,而是基于目录结构自动做了分区发现。比如,以下目录结构的表, 会被自动推导为包含 ??datetime?
?? 和 ??hour?
? 分区的分区表。
path
└── datetime=2019-08-25
└── hour=11
├── part-0.parquet
├── part-1.parquet
└── hour=12
├── part-0.parquet
└── datetime=2019-08-26
└── hour=6
├── part-0.parquet
文件系统连接器支持分区新增插入和分区覆盖插入。
2、流式 Sink-滚动策略 文件系统连接器支持流式的写, 它基于 Flink 的 Streaming File Sink 将记录写入文件。按行编码的格式支持 csv 和 json。按块编码的格式支持 parquet, orc 和 avro。
你可以直接编写 SQL,把流数据插入到非分区表。如果是分区表,可以配置分区操作相关的参数
分区目录下的数据被分割到分区文件中。每个分区对应的sink的每个接受到了数据的子任务都至少会为该分区生成一个分区文件。
根据可配置的滚动策略,当前正在写入的分区文件会被关闭,新的分区文件也会被生成。该策略基于大小,和指定的文件可被打开的最大 timeout 时长,来滚动分区文件。
Key
| Default
| Type
| Description
|
sink.rolling-policy.file-size
| 128MB
| MemorySize
| 滚动前,分区文件最大大小.
|
sink.rolling-policy.rollover-interval
| 30 min
| Duration
| 滚动前,分区文件处于打开状态的最大时长 (默认值是30分钟,以避免产生大量小文件)。检查该选项的频率由参数 sink.rolling-policy.check-interval 控制。
|
sink.rolling-policy.check-interval
| 1 min
| Duration
| 基于时间的滚动策略的检查间隔。该参数控制了基于参数 sink.rolling-policy.rollover-interval 检查分区文件是否该被滚动的检查频率 .
|
【Flink Streaming-Sink】注意: 对于 bulk 格式 (parquet, orc, avro), 滚动策略和检查点间隔控制了分区文件的大小和个数 (未完成的文件会在下个检查点完成).
注意: 对于行格式 (csv, json), 如果想使得分区文件更快地在文件系统中可见,可以设置连接器参数 ??sink.rolling-policy.file-size?
?? 或 ??sink.rolling-policy.rollover-interval?
?? ,以及 ??flink-conf.yaml?
?? 中的 ??execution.checkpointing.interval?
?? 。对于其他格式 (avro, orc), 可以只设置 ??flink-conf.yaml?
?? 中的 ??execution.checkpointing.interval?
? 。
3、流式 Sink-文件合并file sink 支持文件合并,以允许应用程序可以使用较小的检查点间隔而不产生大量文件。
Key
| Default
| Type
| Description
|
auto-compaction
| false
| Boolean
| 在流式 sink 中是否开启自动合并功能。数据首先会被写入到临时文件,在检查点完成后,该检查点产生的临时文件会被合并。这些临时文件在合并前不可见.
|
compaction.file-size
| (none)
| MemorySize
| 合并目标文件大小,默认值是滚动文件大小.
|
启用该参数后,文件合并功能会根据设定的目标文件大小,合并多个小文件到大文件。当在生产环境使用文件合并功能时,需要
注意:
- 只有检查点内部的文件才会被合并,也就是说,至少会生成跟检查点个数一样多的文件。
- 合并前文件是可见的,所以文件的可见性是:检查点间隔 + 合并时长。
- 如果合并花费的时间很长,会对作业产生反压,延长检查点所需时间。
4、流式 Sink-分区提交分区数据写完毕后,经常需要通知下游应用。比如,在 Hive metastore 中新增分区或者在目录下新增 ??_SUCCESS?
?? 文件。分区提交策略是可定制的,具体的分区提交行为是基于 ??triggers?
?? 和 ??policies?
? 的组合.
- Trigger: 分区提交的时机,可以基于从分区中提取的时间对应的水印,或者基于处理时间。
- Policy: 分区提交策略,内置的策略包括提交 ?
?_SUCCESS?
? 文件和 ??hive metastore?
?, 也可以自己定制提交策略, 比如触发 hive 生成统计信息,合并小文件等。
注意: 分区提交只有在动态分区插入模式下才有效。
4.1 分区提交触发器通过配置分区提交的触发策略,来配置何时提交分区:
Key
| Default
| Type
| Description
|
sink.partition-commit.trigger
| process-time
| String
| 分区提交触发器类型。process-time: 基于机器时间,既不需要分区时间提取器也不需要水印生成器,一旦 ”当前系统时间“ 超过了 “分区创建系统时间” 和 sink.partition-commit.delay 之和,就提交分区;partition-time: 基于从分区字段提取的时间,需要水印生成器,一旦 “水印” 超过了 ”从分区字段提取的时间“ 和 sink.partition-commit.delay 之和,就提交分区.
|
sink.partition-commit.delay
| 0 s
| Duration
| 该延迟时间之前分区不会被提交。如果是按天的分区,应配置为 1 d, 如果是按小时的分区,应配置为 1 h.
|
sink.partition-commit.watermark-time-zone
| UTC
| String
| 解析 LONG 类型的水印到 TIMESTAMP 类型时所采用的时区,解析得到的水印的 TIMESTAMP 会被用来跟分区时间进行比较以判断分区是否该被提交。该参数只有在参数 ??sink.partition-commit.trigger? ? 被设置为 partition-time 时才生效。如果该参数设置的不正确,比如在 TIMESTAMP_LTZ 列上定义了 source rowtime, 但没有设置该参数,则用户可能在若干个小时后才看到分区的提交。该参数的默认值是 UTC, 代表水印是定义在 TIMESTAMP 列上或没有定义水印。如果水印定义在 TIMESTAMP_LTZ 列上,则水印的时区是会话的时区。该参数的可选值要么是完整的时区名比如 America/Los_Angeles,要么是自定义的时区 id 比如 GMT-08:00.
|
有两种类型的触发器:
- 第一种是根据分区的处理时间。该触发器不需要分区时间提取,也不需要生成水印。通过分区创建时间和当前系统时间来触发分区提交。该触发器更通用但不是很精确。比如,数据的延迟或故障转移都会导致分区的提前提交。
- 第二种是根据从分区字段提取的时间以及水印。这需要你的作业支持生成水印,分区是根据时间来切割的,比如按小时或按天分区。
如果想让下游系统尽快感知到分区,而不管分区数据是否完整:
- ‘sink.partition-commit.trigger’=‘process-time’ (默认值)
- ‘sink.partition-commit.delay’=‘0s’ (默认值) 一旦分区中有数据,分区立马就会被提交。注意:分区可能会被提交多次。
如果想让下游系统只有在分区数据完整时才感知到分区,且你的作业有水印生成的逻辑,也能从分区字段的值中提取到时间:
- ‘sink.partition-commit.trigger’=‘partition-time’
- ‘sink.partition-commit.delay’=‘1h’ (根据分区类型指定,如果是按小时的分区可配置为 ‘1h’) 该方式是最精确的提交分区的方式,该方式尽力确保提交的分区包含尽量完整的数据。
如果想让下游系统只有在数据完整时才感知到分区,但是没有水印,或者无法从分区字段的值中提取时间:
- ‘sink.partition-commit.trigger’=‘process-time’ (默认值)
- ‘sink.partition-commit.delay’=‘1h’ (根据分区类型指定,如果是按小时的分区可配置为 ‘1h’) 该方式尽量精确地提交分区,但是数据延迟或故障转移会导致分区的提前提交。
延迟数据的处理:延迟的记录会被写入到已经提交的对应分区中,且会再次触发该分区的提交。
4.2 分区时间提取器时间提取器定义了如何从分区字段值中提取时间.
Key
| Default
| Type
| Description
|
partition.time-extractor.kind
| default
| String
| 从分区字段提取时间的时间提取器。支持默认值和定制。对于默认值,可以配置时间戳模式。对于定制,应指定提取器类.
|
partition.time-extractor.class
| (none)
| String
| 实现了接口 PartitionTimeExtractor 的提取器类.
|
partition.time-extractor.timestamp-pattern
| (none)
| String
| default 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 yyyy-mm-dd hh:mm:ss 时间戳模式提取。如果需要从一个分区字段比如 ‘dt’ 提取时间戳,可以配置为: year-day 如果需要从两字分区字段,比如和提取时间戳,可以配置为:dt $hour:00:00.
|
默认的提取器是基于由分区字段组合而成的时间戳模式。你也可以指定一个实现了 ??PartitionTimeExtractor?
? 接口的自定义的提取器。
public class HourPartTimeExtractor implements PartitionTimeExtractor
@Override
public LocalDateTime extract(List<
String>
keys, List<
String>
values)
String dt = values.get(0);
String hour = values.get(1);
return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
4.3 分区提交策略分区提交策略指定了提交分区时的具体操作.
- 第一种是 metastore, 只有 hive 表支持该策略, 该策略下文件系统通过目录层次结构来管理分区.
- 第二种是 success 文件, 该策略下会在分区对应的目录下写入一个名为 ?
?_SUCCESS?
?? 的空文件.
无法复制加载中的内容
你也可以实现自己的提交策略,如:
public class AnalysisCommitPolicy implements PartitionCommitPolicy
private HiveShell hiveShell;
@Override
public void commit(Context context) throws Exception
if (hiveShell == null)
hiveShell = createHiveShell(context.catalogName());
hiveShell.execute(String.format(
"ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s = %s) location %s",
context.tableName(),
context.partitionKeys().get(0),
context.partitionValues().get(0),
context.partitionPath()));
hiveShell.execute(String.format(
"ANALYZE TABLE %s PARTITION (%s = %s) COMPUTE STATISTICS FOR COLUMNS",
context.tableName(),
context.partitionKeys().get(0),
context.partitionValues().get(0)));
5、Sink 并行度向外部文件系统(包括 hive) 写文件时的并行度,在流处理模式和批处理模式下,都可以通过对应的 table 选项指定。默认情况下,该并行度跟上一个上游的 chained operator 的并行度一样。当配置了跟上一个上游的 chained operator 不一样的并行度时,写文件的算子和合并文件的算子(如果使用了的话)会使用指定的并行度。
Key
| Default
| Type
| Description
|
sink.parallelism
| (none)
| Integer
| 向外部文件系统写文件时的并行度。必须大于 0,否则会抛出异常.
|
注意: 当前,只有在上游的 changelog 模式是 INSERT-ONLY 时,才支持设置 sink 的并行度。否则的话,会抛出异常。
6、完整示例6.1 官网案例:
如下示例演示了如何使用文件系统连接器编写流查询语句查询 kafka 中的数据并写入到文件系统中,以及通过批查询把结果数据读取出来.
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL 5 SECOND -- 在 TIMESTAMP 列上定义水印
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
connector=filesystem,
path=...,
format=parquet,
sink.partition-commit.delay=1 h,
sink.partition-commit.policy.kind=success-file
);
-- streaming sql, 插入数据到文件系统表中
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(log_ts, yyyy-MM-dd),
DATE_FORMAT(log_ts, HH)
FROM kafka_table;
-- batch sql, 分区裁剪查询
SELECT * FROM fs_table WHERE dt=2020-05-20 and `hour`=12;
如果水印是定义在 TIMESTAMP_LTZ 列上,且使用了 ??partition-time?
?? 来提交分区, 则参数 ??sink.partition-commit.watermark-time-zone?
? 需要被设置为会话的时区,否则分区会在若干小时后才会被提交。?
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- epoch 毫秒时间
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL 5 SECOND -- 在 TIMESTAMP_LTZ 列上定义水印
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
connector=filesystem,
path=...,
format=parquet,
partition.time-extractor.timestamp-pattern=$dt $hour:00:00,
sink.partition-commit.delay=1 h,
sink.partition-commit.trigger=partition-time,
sink.partition-commit.watermark-time-zone=Asia/Shanghai, -- 假定用户配置的时区是 Asia/Shanghai
sink.partition-commit.policy.kind=success-file
);
-- streaming sql, 插入数据到文件系统表中
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(ts_ltz, yyyy-MM-dd),
DATE_FORMAT(ts_ltz, HH)
FROM kafka_table;
-- batch sql, 分区裁剪查询
SELECT * FROM fs_table WHERE dt=2020-05-20 and `hour`=12;
6.2 案例2:
默认catalog
# 默认catalog
USE CATALOG default_catalog;
kafka连接器?
DROP TABLE student;
CREATE TABLE IF NOT EXISTS student
(
commonMAP<
STRING,STRING>
,
`data`STRING,
# 上报时间一
`report_time1` STRING,
# 上报时间二
`report_time2` STRING,
# row_time 计算列(用上报时间一和上报时间二结合)
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(CAST (IF (report_time1 IS NULL,report_time2,report_time) AS BIGINT)/1000)),
# WATERMARK
WATERMARK FOR row_time AS row_time - INTERVAL 30 SECOND
) WITH (
connector = kafka,
topic = student,
properties.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092,
properties.group.id = student_groupid,
scan.startup.mode = earliest-offset,
format = json,
json.fail-on-missing-field = false,
json.ignore-parse-errors = true
)
;
创建hive的catalog,名字叫myhive
CREATE CATALOG myhive
WITH (
type = hive,
default-database = ods,
hive-conf-dir = /data/hive/conf
);
USE CATALOG myhive;
设置hive方言
SET table.sql-dialect=hive;
创建hive表
DROP TABLE myhive.ods.student;
CREATE TABLE IF NOT EXISTS myhive.ods.student
(
idSTRING,
nameSTRING,
ageSTRING,
countrySTRING,
citySTRING,
dataSTRING,
add_time STRING
)
PARTITIONED BY (dt STRING)
STORED AS ORC
TBLPROPERTIES
(
partition.time-extractor.timestamp-pattern = $dt 00:00:00,
sink.partition-commit.trigger = partition-time,
sink.partition-commit.delay = 30 min,
sink.partition-commit.policy.kind = metastore,success-file
)
;
Insert
INSERT INTO TABLE myhive.ods.student
SELECT id
, name
, age
, country
, city
, data
, cast(LOCALTIMESTAMP AS STRING) add_time
, SUBSTR(report_datetime, 1, 10) dt
FROM (
SELECT common[id]id
, common[name]name
, common[age]age
, common[country]country
, common[city]city
, DATE_FORMAT(TO_TIMESTAMP(
FROM_UNIXTIME(cast(if(report_time IS NULL, report_time2, report_time) AS BIGINT) / 1000)),
yyyy-MM-dd HH:mm:ss) report_datetime
, data
FROM default_catalog.default_database.student
) t
;
注意:加上catalog和db
推荐阅读