数据湖(十七)(Flink与Iceberg整合DataStream API操作)

笛里谁知壮士心,沙头空照征人骨。这篇文章主要讲述数据湖(十七):Flink与Iceberg整合DataStream API操作相关的知识,希望能为你提供帮助。
Flink与Iceberg整合DataStream API操作目前Flink支持使用DataStream API 和SQL API 方式实时读取和写入Iceberg表,建议大家使用SQL API 方式实时读取和写入Iceberg表。
Iceberg 支持的Flink版本为1.11.x版本以上,目前经过测试Iceberg版本与Flink的版本对应关系如下:

  • Flink1.11.x版本与Iceberg0.11.1版本匹配。
  • Flink1.12.x~Flink1.1.x 版本与Iceberg0.12.1版本匹配,SQL API有一些bug。
  • Flink1.14.x版本与Iceberg0.12.1版本能整合但是有一些小bug,例如实时读取Iceberg中的数据有bug。
以下Flink与Iceberg整合使用的Flink版本为1.13.5,Iceberg版本为0.12.1版本。后期使用SQL API 操作时使用的Flink版本为1.11.6,Iceberg版本为0.11.1版本。
一、DataStream API 实时写入Iceberg表DataStream Api方式操作Iceberg方式目前仅支持java Api。使用DataStream API 实时写入Iceberg表具体操作如下:
1、首先在Maven中导入以下依赖
< properties>
< project.build.sourceEncoding> UTF-8< /project.build.sourceEncoding>
< maven.compiler.source> 1.8< /maven.compiler.source>
< maven.compiler.target> 1.8< /maven.compiler.target>
< !-- flink 1.12.x -1.13.x版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->
< flink.version> 1.13.5< /flink.version>
< !--< flink.version> 1.12.1< /flink.version> -->
< !--< flink.version> 1.14.2< /flink.version> -->
< !-- flink 1.11.x 与Iceberg 0.11.1 合适-->
< !--< flink.version> 1.11.6< /flink.version> -->
< hadoop.version> 3.2.2< /hadoop.version>
< /properties>

< dependencies>
< dependency>
< groupId> com.alibaba.ververica< /groupId>
< artifactId> ververica-connector-iceberg< /artifactId>
< version> 1.13-vvr-4.0.7< /version>
< /dependency>
< !-- Flink 操作Iceberg 需要的Iceberg依赖 -->
< dependency>
< groupId> org.apache.iceberg< /groupId>
< artifactId> iceberg-flink-runtime< /artifactId>
< version> 0.12.1< /version>
< !--< version> 0.11.1< /version> -->
< /dependency>

< !-- java 开发Flink 所需依赖 -->
< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-java< /artifactId>
< version> $flink.version< /version>
< /dependency>
< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-streaming-java_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-clients_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-streaming-scala_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>

< !-- Flink Kafka连接器的依赖 -->
< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-connector-kafka_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>
< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-connector-base< /artifactId>
< version> $flink.version< /version>
< /dependency>

< !-- 读取hdfs文件需要jar包-->
< dependency>
< groupId> org.apache.hadoop< /groupId>
< artifactId> hadoop-client< /artifactId>
< version> $hadoop.version< /version>
< /dependency>

< !-- Flink SQL & Table-->
< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-table-runtime-blink_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-table< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-table-common< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-table-api-java< /artifactId>
< version> $flink.version< /version>
< /dependency>
< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-table-api-java-bridge_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-table-planner_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-table-planner-blink_2.11< /artifactId>
< version> $flink.version< /version>
< /dependency>

< dependency>
< groupId> junit< /groupId>
< artifactId> junit< /artifactId>
< version> 4.11< /version>
< scope> test< /scope>
< /dependency>

< !-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
< dependency>
< groupId> org.slf4j< /groupId>
< artifactId> slf4j-log4j12< /artifactId>
< version> 1.7.25< /version>
< scope> test< /scope>
< /dependency>
< dependency>
< groupId> log4j< /groupId>
< artifactId> log4j< /artifactId>
< version> 1.2.17< /version>
< /dependency>
< dependency>
< groupId> org.slf4j< /groupId>
< artifactId> slf4j-api< /artifactId>
< version> 1.7.25< /version>
< /dependency>
< dependency>
< groupId> org.slf4j< /groupId>
< artifactId> slf4j-nop< /artifactId>
< version> 1.7.25< /version>
< scope> test< /scope>
< /dependency>
< dependency>
< groupId> org.slf4j< /groupId>
< artifactId> slf4j-simple< /artifactId>
< version> 1.7.5< /version>
< /dependency>
< /dependencies>


2、编写代码使用DataStream API将Kafka数据写入到Iceberg表
import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import java.util.Map;

/**
* 使用DataStream Api 向Iceberg 表写入数据
*/
public class StreamAPIWriteIceberg
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。
env.enableCheckpointing(5000);

//2.读取Kafka 中的topic 数据
KafkaSource< String> source = KafkaSource.< String> builder()
.setBootstrapServers("node1:9092,node2:9092,node3:9092")
.setTopics("flink-iceberg-topic")
.setGroupId("my-group-id")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource< String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

//3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。
SingleOutputStreamOperator< RowData> dataStream = kafkaSource.map(new MapFunction< String, RowData> ()
@Override
public RowData map(String s) throws Exception
System.out.println("s = "+s);
String[] split = s.split(",");
GenericRowData row = new GenericRowData(4);
row.setField(0, Integer.valueOf(split[0]));
row.setField(1, StringData.fromString(split[1]));
row.setField(2, Integer.valueOf(split[2]));
row.setField(3, StringData.fromString(split[3]));
return row;

);

//4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Configuration hadoopConf = new Configuration();
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");

//配置iceberg 库名和表名
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");

//创建Icebeng表Schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "nane", Types.StringType.get()),
Types.NestedField.required(3, "age", Types.IntegerType.get()),
Types.NestedField.required(4, "loc", Types.StringType.get()));

//如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区
//PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();

//指定Iceberg表数据格式化为Parquet存储
Map< String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = null;

// 通过catalog判断表是否存在,不存在就创建,存在就加载
if (!catalog.tableExists(name))
table = catalog.createTable(name, schema, spec, props);
else
table = catalog.loadTable(name);


TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);

//5.通过DataStream Api 向Iceberg中写入数据
FlinkSink.forRowData(dataStream)
//这个 .table 也可以不写,指定tableLoader 对应的路径就可以。
.table(table)
.tableLoader(tableLoader)
//默认为false,追加数据。如果设置为true 就是覆盖数据
.overwrite(false)
.build();

env.execute("DataStream Api Write Data To Iceberg");



以上代码有如下几个注意点:
  • 需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。
  • 读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。
  • 在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。
  • 不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。
3、在Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据
# 在Kafka 中创建 flink-iceberg-topic topic
[root@node1 bin]# ./kafka-topics.sh--zookeeper node3:2181,node4:2181,node5:2181--create--topic flink-iceberg-topic--partitions 3 --replication-factor 3


创建好以上topic之后,启动代码,然后向topic中生产以下数据:
[root@node1 bin]#./kafka-console-producer.sh--topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai


可以看到在HDFS 对应的路径中保存了对应的数据:


4、通过Hive查看保存到Iceberg中的数据
  启动Hive、Hive Metastore 在Hive中创建映射Iceberg的外表:
CREATE TABLE flink_iceberg_tbl(
id int,
name string,
age int,
loc string
)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
LOCATION hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl
TBLPROPERTIES (iceberg.catalog=location_based_table);


  注意:虽然loc是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存Iceberg数据路径一致。
通过Hive查询对应的Iceberg表中的数据,结果如下:
hive> select * from flink_iceberg_tbl;
OK
2ls


二、DataStream API 批量/实时读取Iceberg表DataStream API 读取Iceberg表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。
1、批量/全量读取
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;

/**
* 使用DataStream Api 批量/实时 读取Iceberg 数据
*/
public class StreamAPIReadIceberg
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//1.配置TableLoader
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);

//2.从Iceberg中读取全量/增量读取数据
DataStream< RowData> batchData = https://www.songbingjia.com/android/FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(false)
.build();

batchData.map(new MapFunction< RowData, String> ()
@Override
public String map(RowData rowData) throws Exception
int id = rowData.getInt(0);
String name = rowData.getString(1).toString();
int age = rowData.getInt(2);
String loc = rowData.getString(3).toString();
return id+","+name+","+age+","+loc;

).print();

env.execute("DataStream Api Read Data From Iceberg");




结果如下:

【数据湖(十七)(Flink与Iceberg整合DataStream API操作)】

2、实时读取
//当配置 streaming参数为true时就是实时读取
DataStream< RowData> batchData = https://www.songbingjia.com/android/FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(true)
.build();


修改以上代码并启动,向Hive 对应的Iceberg表“flink_iceberg_tbl”中插入2条数据:
在向Hive的Iceberg表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;


向Hive 中Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl values (5,s1,30,guangzhou),(6,s2,31,tianjin);


插入完成之后,可以看到Flink 控制台实时读取到对应数据



三、指定基于快照实时增量读取数据以上案例我们发现Flink将表中所有数据都读取出来,我们也可以指定对应的snapshot-id 决定基于哪些数据增量读取数据。
DataStream< RowData> batchData = https://www.songbingjia.com/android/FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//基于某个快照实时增量读取数据,快照需要从元数据中获取
.startSnapshotId(4226332606322964975L)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(true)
.build();


结果只读取到指定快照往后的数据,如下:



四、合并data filesIceberg提供Api将小文件合并成大文件,可以通过Flink 批任务来执行。Flink中合并小文件与Spark中小文件合并完全一样。
代码如下:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;

/**
*可以通过提交Flink批量任务来合并Data Files 文件。
*/
public class RewrietDataFiles
public static void main(String[] args)

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//1.配置TableLoader
Configuration hadoopConf = new Configuration();

//2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");

//3.配置iceberg 库名和表名并加载表
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
Table table = catalog.loadTable(name);

//4..合并 data files 小文件
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
//默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。
.targetSizeInBytes(536870912L)
.execute();




    推荐阅读