Flink-dataStream的种类和基本使用


Flink-dataStream的种类和基本使用

  • map
  • flatMap
  • filter
  • keyBy
    • keyBy使用元组
    • keyBy自定义Bean
    • keyBy多字段分组
  • reduce
  • max/min

官方案例以及说明地址:
官方案例
map 取一个元素并产生一个元素。一个映射函数,将输入流的值加倍
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource dataStreamSource = environment.fromElements(1, 9, 4, 2, 8); // 用于映射,取一个元素,并返回一个新的元素,内部实现的是MapFunction SingleOutputStreamOperator dataStream = dataStreamSource.map(i -> i * 2); // 功能更强大的Map :RichMapFunction,常用的方法有3个。 SingleOutputStreamOperator dataStream2 = dataStreamSource.map(new RichMapFunction() { // 例如:open方法打开数据库连接 // open 在构造方法之后,map方法之前执行,执行一次,一般用于初始化一个连接 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); }// 对数据库数据进行操作 @Override public Integer map(Integer integer) throws Exception { return null; }// 一般用于资源释放,如关闭数据库连接 @Override public void close() throws Exception { super.close(); } }); dataStream.print(); environment.execute("MapDemo"); }

结果:
Flink-dataStream的种类和基本使用
文章图片

flatMap 取一个元素并产生零个,一个或多个元素。平面图功能可将句子拆分为单词
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataSource = env.fromElements("spark flink hadoop", "spark hello flink flink"); // 方式一:内置函数实现 SingleOutputStreamOperator> dataStream = dataSource.flatMap(new FlatMapFunction, String>() { @Override public void flatMap(String line, Collector> out) throws Exception { // Arrays.stream(line.split(" ")).forEach(out :: collect); String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }); // 方式二:用lambda表达式实现 SingleOutputStreamOperator> dataStream2 = dataSource.flatMap( (String line, Collector> out) -> Arrays.stream(line.split(" ")).forEach(out::collect)) .returns(Types.STRING); dataStream.print(); env.execute("FlatMapDemo"); }

结果:将文本切分成多个单词
Flink-dataStream的种类和基本使用
文章图片

filter 为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数。筛选出零值的筛选器
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource dataSource = environment.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9); // 方式一:实现函数 SingleOutputStreamOperator dataStream = dataSource.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value % 2 != 0; } }); // 方式二:lambda表达式实现 SingleOutputStreamOperator dataStream2 = dataSource.filter(i -> i % 2 != 0); dataStream2.print(); environment.execute("FilterDemo"); }

结果:过滤掉了偶数
Flink-dataStream的种类和基本使用
文章图片

keyBy 从逻辑上将流划分为不相交的分区。具有相同键的所有记录都分配给同一分区。在内部,keyBy()是通过哈希分区实现的。
keyBy使用元组
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataSource = env.fromElements("spark", "hello", "spark", "flink", "hive", "flink", "hello"); // 一个单词会变成 (word,1)的形式 SingleOutputStreamOperator> dataStream = dataSource.map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // java的下标是从0开始的,等于根据单词进行归类,等于把相同单词的个数进行统计 // 两个单词用Tuple2,如果是3个单词一组,可以用Tuple3。。。以此推类 KeyedStream, Tuple> dataStream2 = dataStream.keyBy(0); dataStream2.print(); env.execute("KeyByDemo"); }

结果:将单词拼装为根据key进行分组,形成键值对。
Flink-dataStream的种类和基本使用
文章图片

keyBy自定义Bean 1.创建一个实体类WordCounts
public class WordCounts { private String word; private Long counts; // 无参、有参。get/set也要

2.代码部分:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataSource = env.fromElements("spark", "hello", "spark", "flink", "hive", "flink", "hello"); SingleOutputStreamOperator dataStream = dataSource.map(new MapFunction, WordCounts>() { @Override public WordCounts map(String value) throws Exception { returnWordCounts.of(value, 1L); } }); // 根据word进行分组,这里的名称一定要和Bean当中的属性名称对应,否则报错。 KeyedStream keyedStream = dataStream.keyBy("word"); // 根据counts实现聚合,即累加 SingleOutputStreamOperator counts = keyedStream.sum("counts"); counts.print(); env.execute("KeyByDemo2"); }

keyBy多字段分组
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataSource = env.fromElements("浙江,杭州,3000", "浙江,杭州,666", "山东,济南,555"); SingleOutputStreamOperator> mapStream = dataSource.map(new MapFunction, Tuple3, String, Double>>() { @Override public Tuple3, String, Double> map(String line) throws Exception { String[] fields = line.split(","); String province = fields[0]; String city = fields[1]; Double money = Double.parseDouble(fields[2]); return Tuple3.of(province, city, money); } }); SingleOutputStreamOperator> sumStream = mapStream.keyBy(0, 1).sum(2); sumStream.print(); env.execute("KeyByDemo3"); }

结果:
Flink-dataStream的种类和基本使用
文章图片

reduce 对键控数据流进行“滚动”压缩。将当前元素与最后一个减少的值合并,并发出新值。
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataSource = env.fromElements("spark", "hello", "spark", "spark", "hive", "flink", "hello"); // 一个单词会变成 (word,1)的形式 SingleOutputStreamOperator> dataStream = dataSource.map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // java的下标是从0开始的,等于根据单词进行归类,等于把相同单词的个数进行统计 // 两个单词用Tuple2,如果是3个单词一组,可以用Tuple3。。。以此推类 KeyedStream, Tuple> keyedStream = dataStream.keyBy(0); SingleOutputStreamOperator> reduce = keyedStream.reduce(new ReduceFunction>() { @Override public Tuple2, Integer> reduce(Tuple2, Integer> value1, Tuple2, Integer> t1) throws Exception { // f0代表第一个字段,f1代表第二个字段 String key = value1.f0; Integer counts1 = value1.f1; Integer counts2 = t1.f1; // 将结果合并 return Tuple2.of(key, counts1 + counts2); } }); reduce.print(); env.execute("KeyByDemo"); }

结果:
Flink-dataStream的种类和基本使用
文章图片

max/min 在键控数据流上滚动聚合。min和minBy之间的区别在于min返回最小值,而minBy返回在此字段中具有最小值的元素
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataSource = env.fromElements("a,1", "a,2","b,10","b,3","b,6"); SingleOutputStreamOperator> map = dataSource.map(new MapFunction, Tuple2, Integer>>() { @Override public Tuple2, Integer> map(String line) throws Exception { String[] s = line.split(","); String word = s[0]; int nums = Integer.parseInt(s[1]); return Tuple2.of(word, nums); } }); SingleOutputStreamOperator> max = map.keyBy(0).min(1); max.print(); env.execute("MaxMinDemo"); }

【Flink-dataStream的种类和基本使用】结果:每次输出,都会根据第二个字段的最小值来互相比较,然后输出那个最小的
max同理
Flink-dataStream的种类和基本使用
文章图片

    推荐阅读