Flink DataStream与Table之间的相互转换


Flink DataStream与Table之间的相互转换

    • 1. 需引入的maven依赖
    • 2. 读取kafka数据,获取DataStream
    • 3. DataStream注册为Table
    • 4. Table操作
    • 5. Table转换为Datastream

Flink的api包括DataStream, Table,SQL三种,在1.7版本及其之前,SQL化并不完善,但SQL灵活的方式在很多场景下还是非常适用,因此常常会用到这几种api之间的转换。本文主要讲述Flink DataStream与Table之间的相互转换,及flink sql和flink table的简单使用。
1. 需引入的maven依赖
org.apache.flink flink-streaming-java_2.11 1.7.1 org.apache.flink flink-connector-kafka-0.9_2.11 1.7.1 org.apache.flink flink-table_2.11 1.7.1

2. 读取kafka数据,获取DataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", broker); properties.setProperty("group.id", groupId); properties.setProperty("max.partition.fetch.bytes", "10485760"); properties.setProperty("request.timeout.ms", "120000"); properties.setProperty("session.timeout.ms", "60000"); properties.setProperty("heartbeat.interval.ms", "10000"); FlinkKafkaConsumer09 myConsumer = new FlinkKafkaConsumer09(topic, new SimpleStringSchema(), properties); DataStream sourceStream = env.addSource(myConsumer);

3. DataStream注册为Table 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);

注册Table的几种方式
tableEnv.registerDataStreamInternal(“tableName”, sourceStream); Table table = tableEnv.scan("tableName");

tableEnv.registerDataStream("tableName", sourceStream); Table table = tableEnv.scan("tableName");

tableEnv.registerDataStream("tableName", sourceStream, "fieldName"); Table table = tableEnv.scan("tableName");

Table table = tableEnv.fromDataStream(sourceStream, "fieldName");

Table table = tableEnv.fromDataStream(sourceStream);

本例中DataStream数据类型为String,注册table时,fieldName只有一个。
若DataStream数据类型为POJO,有多个字段,传入fieldName之间以逗号间隔。
若为DataStream,直接注册为Table时,无法获取TypeInfomation,解决办法见博文
https://blog.csdn.net/weixin_44056920/article/details/104797904
4. Table操作 Table API
table.select(...).filter(...);

SQL API
tableEnv.sqlQuery("SELECT * FROM tableName WHERE ...");

5. Table转换为Datastream Table转换为DataStream
DataStream sinkStream = tableEnv.toAppendStream(table, String.class);

【Flink DataStream与Table之间的相互转换】Table转换为带flag的DataStream(flag=true表示该条数据为新增,flag=false表示撤回该条数据)
DataStream> sinkStream = tableEnv.toRetractStream(table, String.class);

最后附上Table API官网链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#operations

    推荐阅读