盛年不重来,一日难再晨,及时当勉励,岁月不待人。这篇文章主要讲述Flink流处理API大合集:掌握所有flink流处理API技术,看这一篇就够了相关的知识,希望能为你提供帮助。
【Flink流处理API大合集(掌握所有flink流处理API技术,看这一篇就够了)】@TOC
==注:本文内容为纯干货,字数较多,建议先点赞收藏慢慢学习研读!==
前言在之前的文章中有提到过,一个flink应用程序开发的步骤大致为五个步骤:构建执行环境、获取数据源、操作数据源、输出到外部系统、触发程序执行。由这五个模块组成了一个flink任务,接下来围绕着每个模块对应的API进行梳理。
以下所有的代码案例都已收录在本人的Gitee仓库,有需要的同学点击链接直接获取:
Gitee地址:https://gitee.com/xiaoZcode/flink_test
文章图片
一、构建流执行环境(Environment)getExecutionEnvironment()
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。它会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
代码如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
createLocalEnvironment()
返回本地执行环境,需要在调用时指定默认的并行度。
代码如下:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
createRemoteEnvironment()
返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。
代码如下:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");
二、加载数据源(Source)案例场景:
public class SensorReading private String id;
private Long timestamp;
private Double temperature;
public SensorReading() public SensorReading(String id, Long timestamp, Double temperature)
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
public String getId()
return id;
public void setId(String id)
this.id = id;
public Long getTimestamp()
return timestamp;
public void setTimestamp(Long timestamp)
this.timestamp = timestamp;
public Double getTemperature()
return temperature;
public void setTemperature(Double temperature)
this.temperature = temperature;
@Override
public String toString()
return "SensorReading" +
"id=" + id + \\ +
", timestamp=" + timestamp +
", temperature=" + temperature +
;
从集合读取数据
public class SourceTest1_Collection
public static void main(String[] args) throws Exception
// 创建执行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为 1
env.setParallelism(1);
//从集合中读取数据
DataStream<
SensorReading>
dataStream= env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718199L, 35.0),
new SensorReading("sensor_3", 1547718199L, 38.8),
new SensorReading("sensor_4", 1547718199L, 39.8)
));
DataStream<
Integer>
integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);
//打印输出
dataStream.print("data");
integerDataStream.print("int");
//执行程序
env.execute();
从文件读取数据
从文件中获取数据源的核心代码部分:
DataStream<
String>
dataStream = env.readTextFile("xxx ");
public class SourceTest2_File
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
dataStream = env.readTextFile("sensor.txt");
dataStream.print();
env.execute();
从Kafka读取数据
首先需要引入Kafka的以来到工程中
<
dependency>
<
groupId>
org.apache.flink<
/groupId>
<
artifactId>
flink-connector-kafka-0.11_2.12<
/artifactId>
<
version>
1.10.1<
/version>
<
/dependency>
public class SourceTest3_Kafka
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");
DataStream<
String>
dataStream=env.addSource(new FlinkKafkaConsumer011<
String>
("sensor",new SimpleStringSchema(),properties));
dataStream.print();
env.execute();
自定义数据源Source
除了从集合、文件以及Kafka中获取数据外,还给我们提供了一个自定义source的方式,需要传入sourceFunction函数。核心代码如下:
DataStream<
SensorReading>
dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
SensorReading>
dataStream = env.addSource(new MySensorSource());
dataStream.print();
env.execute();
// 实现自定义数据源
public static class MySensorSource implements SourceFunction<
SensorReading>
// 定义一个标记位,控制数据产生
private boolean running = true;
@Override
public void run(SourceContext<
SensorReading>
ctv) throws Exception
// 随机数
Random random=new Random();
//设置10个初始温度
HashMap<
String, Double>
sensorTempMap = new HashMap<
>
();
for (int i = 0;
i <
10;
i++)
sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20);
// 正态分布while (running)
for (String sensorId: sensorTempMap.keySet())
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId,newTemp);
ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
Thread.sleep(1000);
@Override
public void cancel()
running=false;
三、转换算子(Transform)获取到指定的数据源后,还要对数据源进行分析计算等操作,
基本转换算子:Map、flatMap、Filter
文章图片
public class TransformTest1_Base
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("sensor.txt");
// 1. map 把String转换成长度生成
DataStream<
Integer>
mapStream = inputStream.map(new MapFunction<
String, Integer>
()
@Override
public Integer map(String value) throws Exception
return value.length();
);
//2. flatmap 按逗号切分字段
DataStream<
String>
flatMapStream = inputStream.flatMap(new FlatMapFunction<
String, String>
()
@Override
public void flatMap(String value, Collector<
String>
out) throws Exception
String[] fields=value.split(",");
for (String field : fields)
out.collect(field);
);
//3. filter ,筛选sensor_1 开头对id对应的数据
DataStream<
String>
filterStream=inputStream.filter(new FilterFunction<
String>
()
@Override
public boolean filter(String value) throws Exception
return value.startsWith("sensor_1");
);
//打印输出
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
// 执行程序
env.execute();
KeyBy、滚动聚合算子【sum()、min()、max()、minBy()、maxBy()】
- KeyBy:DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
- 如上算子可以针对 KeyedStream 的每一个支流做聚合。
文章图片
public class TransformTest2_RollingAggregation
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<
SensorReading>
dataStream=inputStream.map(new MapFunction<
String, SensorReading>
()
@Override
public SensorReading map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
);
// DataStream<
SensorReading>
dataStream = inputStream.map(line ->
//String[] fields = line.split(",");
//return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// );
// 分组
KeyedStream<
SensorReading, Tuple>
keyedStream = dataStream.keyBy("id");
// KeyedStream<
SensorReading, String>
keyedStream1 = dataStream.keyBy(SensorReading::getId);
//滚动聚合,取当前最大的温度值
// DataStream<
SensorReading>
resultStream = keyedStream.maxBy("temperature");
DataStream<
SensorReading>
resultStream = keyedStream.maxBy("temperature");
resultStream.print();
env.execute();
Reduce
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
public class TransformTest3_Reduce
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<
SensorReading>
dataStream=inputStream.map(new MapFunction<
String, SensorReading>
()
@Override
public SensorReading map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
);
// 分组
KeyedStream<
SensorReading, Tuple>
keyedStream = dataStream.keyBy("id");
// reduce 聚合,取最大的温度,以及当前最新对时间戳
DataStream<
SensorReading>
resultStream = keyedStream.reduce(new ReduceFunction<
SensorReading>
()
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
);
resultStream.print();
env.execute();
分流【Split 、Select】、合流【Connect 、CoMap、union】
Split
CoMap、CoFlatMap
文章图片
Union
DataStream<
SensorReading>
unionStream = xxxstream.union(xxx);
==Connect 与 Union 区别:==
- Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
- Connect 只能操作两个流,Union 可以操作多个。
public class TransformTest4_MultipleStreams
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<
SensorReading>
dataStream=inputStream.map(new MapFunction<
String, SensorReading>
()
@Override
public SensorReading map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
);
// 1。分流 按照温度值30度为界进行分流
SplitStream<
SensorReading>
splitStream = dataStream.split(new OutputSelector<
SensorReading>
()
@Override
public Iterable<
String>
select(SensorReading value)
return (value.getTemperature() >
30) ? Collections.singletonList("high") : Collections.singletonList("low");
);
// 通过条件选择对应流数据
DataStream<
SensorReading>
highTempStream = splitStream.select("high");
DataStream<
SensorReading>
lowTempStream = splitStream.select("low");
DataStream<
SensorReading>
allTempStream = splitStream.select("high","low");
highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
// 2。合流 connect,先将高温流转换为二元组,与低温流合并后,输出状态信息。
DataStream<
Tuple2<
String, Double>
>
warningStream = highTempStream.map(new MapFunction<
SensorReading, Tuple2<
String, Double>
>
()
@Override
public Tuple2<
String, Double>
map(SensorReading value) throws Exception
return new Tuple2<
>
(value.getId(), value.getTemperature());
);
// 只能是两条流进行合并,但是两条流的数据类型可以不一致
ConnectedStreams<
Tuple2<
String, Double>
, SensorReading>
connectStream = warningStream.connect(lowTempStream);
DataStream<
Object>
resultStream = connectStream.map(new CoMapFunction<
Tuple2<
String, Double>
, SensorReading, Object>
()
@Override
public Object map1(Tuple2<
String, Double>
value) throws Exception
return new Tuple3<
>
(value.f0, value.f1, "high temp warning");
@Override
public Object map2(SensorReading value) throws Exception
return new Tuple2<
>
(value.getId(), "normal");
);
resultStream.print();
// 3。union联合多条流 限制就是每条流数据类型必须一致
DataStream<
SensorReading>
union = highTempStream.union(lowTempStream, allTempStream);
union.print("union stream");
env.execute();
四、数据输出(Sink)Flink官方提供了一部分框架的Sink,用户也可以自定义实现Sink。flink将任务进行输出的操作核心代码:
stream.addSink(new MySink(xxxx))
。Kafka
引入Kafka依赖:
<
dependency>
<
groupId>
org.apache.flink<
/groupId>
<
artifactId>
flink-connector-kafka-0.11_2.12<
/artifactId>
<
version>
1.10.1<
/version>
<
/dependency>
public class SinkTest1_Kafka
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<
String>
dataStream=inputStream.map(new MapFunction<
String, String>
()
@Override
public String map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();
);
//输出到外部系统
dataStream.addSink(new FlinkKafkaProducer011<
String>
("localhost:9092","sinktest",new SimpleStringSchema()));
env.execute();
Redis
引入Redis依赖:
<
dependency>
<
groupId>
org.apache.bahir<
/groupId>
<
artifactId>
flink-connector-redis_2.11<
/artifactId>
<
version>
1.0<
/version>
<
/dependency>
public class SinkTest2_Redis
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<
SensorReading>
dataStream=inputStream.map(new MapFunction<
String, SensorReading>
()
@Override
public SensorReading map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
);
// jedis配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
dataStream.addSink(new RedisSink<
>
(config,new MyRedisMapper()));
env.execute();
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<
SensorReading>
//自定义保存数据到Redis的命令,存成hash表Hset
@Override
public RedisCommandDescription getCommandDescription()
return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
@Override
public String getKeyFromData(SensorReading data)
return data.getId();
@Override
public String getValueFromData(SensorReading data)
return data.getTemperature().toString();
Elasticsearch
引入依赖:
<
dependency>
<
groupId>
org.apache.flink<
/groupId>
<
artifactId>
flink-connector-elasticsearch6_2.12<
/artifactId>
<
version>
1.10.1<
/version>
<
/dependency>
public class SinkTest3_ES
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env;
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<
SensorReading>
dataStream=inputStream.map(new MapFunction<
String, SensorReading>
()
public SensorReading map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
);
// 定义ES的链接配置
ArrayList<
HttpHost>
httpHosts = new ArrayList<
>
();
httpHosts.add(new HttpHost("localhost",9200));
dataStream.addSink(new ElasticsearchSink.Builder<
SensorReading>
(httpHosts,new MyEsSinkFunction()).build());
env.execute();
//实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<
SensorReading>
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer)
// 定义写入的数据source
HashMap<
String, String>
dataSource = new HashMap<
>
();
dataSource.put("id",element.getId());
dataSource.put("temp",element.getTemperature().toString());
dataSource.put("ts",element.getTimestamp().toString());
// 创建请求作为向ES发起的写入命令
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// 用indexer发送请求
indexer.add(indexRequest);
自定义Sink(JDBC)
引入依赖:
<
dependency>
<
groupId>
mysql<
/groupId>
<
artifactId>
mysql-connector-java<
/artifactId>
<
version>
5.1.44<
/version>
<
/dependency>
public class SinkTest4_JDBC
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<
SensorReading>
dataStream=inputStream.map(new MapFunction<
String, SensorReading>
()
@Override
public SensorReading map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
);
dataStream.addSink(new MyJDBCSink());
env.execute();
// 实现自定义SinkFunction
public static class MyJDBCSink extends RichSinkFunction<
SensorReading>
//声明连接和预编译
Connection connection=null;
PreparedStatement insert=null;
PreparedStatement update=null;
@Override
public void open(Configuration parameters) throws Exception
connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");
// 每来一条数据,调用链接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception
// 直接执行更新
update.setDouble(1,value.getTemperature());
update.setString(2,value.getId());
update.execute();
if (update.getUpdateCount() == 0)
insert.setString(1,value.getId());
insert.setDouble(2,value.getTemperature());
insert.execute();
// 关闭连接流
@Override
public void close() throws Exception
connection.close();
insert.close();
update.close();
五、数据类型、UDF 函数、富函数Flink支持的数据类型
- Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String等
DataStream<
Integer>
numberStream = env.fromElements(1, 2, 3, 4);
- Java 和 Scala 元组(Tuples)
DataStream<
Tuple2<
String, Integer>
>
personStream = env.fromElements(
new Tuple2("Adam", 17),
new Tuple2("Sarah", 23) );
personStream.filter(p ->
p.f1 >
18);
- Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的
ArrayList,HashMap,Enum 等等
==Rich Function 有一个生命周期的概念。典型的生命周期方法有:==
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函 数执行的并行度,任务的名字,以及state 状态。
public class TransformTest5_RichFunction
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//从文件读取数据
DataStream<
String>
inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<
SensorReading>
dataStream=inputStream.map(new MapFunction<
String, SensorReading>
()
@Override
public SensorReading map(String s) throws Exception
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
);
DataStream<
Tuple2<
String,Integer>
>
resultStream=dataStream.map(new MyMapper());
resultStream.print();
env.execute();
public static class MyMapper0 implements MapFunction<
SensorReading,Tuple2<
String,Integer>
>
@Override
public Tuple2<
String, Integer>
map(SensorReading value) throws Exception
return new Tuple2<
>
(value.getId(),value.getId().length());
// 继承富函数
public static class MyMapper extends RichMapFunction<
SensorReading,Tuple2<
String,Integer>
>
@Override
public Tuple2<
String, Integer>
map(SensorReading value) throws Exception
// getRuntimeContext().getState()
return new Tuple2<
String,Integer>
(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
@Override
public void open(Configuration parameters) throws Exception
// 初始化工作,一般是定义状态,或者创建数据库链接
System.out.println("open");
// super.open(parameters);
@Override
public void close() throws Exception
// 关闭链接,收尾状态
System.out.println("close");
// super.close();
文章图片
推荐阅读
- maven仓库类型说明hosted/proxy/group
- 解决secureFX中出现中文乱码问题
- 既然JVM有Full GC,为什么还会出现OutOfMemoryError?
- centos修改主机名
- oracle “日期date”与“字符串”相互转换TO_DATE,TO_CHAR
- 为什么要重写equals()方法 和 hashCode()方法
- 文字 banner 生成利器 http://patorjk.com/software/taag
- 阿里公约推荐慎用 Object 的 clone 方法来拷贝对象。
- nodejs执行cmdshell命令