历览千载书,时时见遗烈。这篇文章主要讲述MacOS下安装Apache Flink及测试WordCount相关的知识,希望能为你提供帮助。
1.安装java1.8版本
steven@wangyuxiangdeMacBook-Pro ~ java -version
java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)
2.安装flink
使用brew安装flink,命令如下:
brew install apache-flink
3.查看是否安装成功
steven@wangyuxiangdeMacBook-Pro ~flink -v
Version: 1.13.2, Commit ID: 5f007ff
4.查看flink安装目录
steven@wangyuxiangdeMacBook-Pro ~ brew info apache-flink
apache-flink: stable 1.13.2 (bottled), HEAD
Scalable batch and stream data processing
https://flink.apache.org/
/usr/local/Cellar/apache-flink/1.13.2 (164 files, 325.3MB) *
Poured from bottle on 2022-05-13 at 15:52:56
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-flink.rb
License: Apache-2.0
==> Dependencies
Required: openjdk@11 ?
==> Options
--HEAD
Install HEAD version
==> Analytics
install: 449 (30 days), 1,388 (90 days), 6,005 (365 days)
install-on-request: 451 (30 days), 1,392 (90 days), 5,997 (365 days)
build-error: 0 (30 days)
5.进入flink安装目录,启动flink
cd /usr/local/Cellar/apache-flink/1.13.2/
./libexec/bin/start-cluster.sh
steven@wangyuxiangdeMacBook-Pro /usr/local/Cellar/apache-flink/1.13.2 ./libexec/bin/start-cluster.sh
\\Starting cluster.
Starting standalonesession daemon on host wangyuxiangdeMacBook-Pro.local.
Starting taskexecutor daemon on host wangyuxiangdeMacBook-Pro.local.
6.进入web页面,可以看到启动成功:http://localhost:8081/【MacOS下安装Apache Flink及测试WordCount】
7.关闭集群的命令
cd /usr/local/Cellar/apache-flink/1.13.2/
./libexec/bin/stop-cluster.sh
8.用java写实时流的flink任务,代码如下:
package com.dangbei.flink_test.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
public class Test_WordCount
public static voidmain(String[] args) throwsException
// 创建Flink的代码执行实时流处理上下文环境变量
StreamExecutionEnvironmentenv = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义读取数据机器主机名称和端口
String host = "localhost";
int port = 9000;
// 获取输入对应的socket输入的实时流数据
DataStream< String> inputLineDataStream =env.socketTextStream(host, port);
// 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计
DataStream< Tuple2< String,Integer> > resultStream =inputLineDataStream.flatMap(newFlatMapFunction< String,Tuple2< String, Integer> > ()
public voidflatMap(String line, Collector< Tuple2< String,Integer> > out)throwsException
// 按空白符号分词
String[]wordArray = line.split("\\\\s");
// 遍历所有word,包成二元组输出
for(String word : wordArray)
out.collect(new Tuple2< String,Integer> (
word, 1));
).keyBy(0) //返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组,因为此实时流是无界的,即数据并不完整,故不用group
// by而是用keyBy来代替
.sum(1); // 将第二个位置上的freq=1的数据求和
// 打印出来计算出来的(word,freq)的统计结果对
resultStream.print();
// 正式启动实时流处理引擎
env.execute();
8.1pom.xml配置如下:
< ?xml version="1.0" encoding="UTF-8"?>
< project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
< modelVersion> 4.0.0< /modelVersion>
< groupId> com.dangbei< /groupId>
< artifactId> flink_test< /artifactId>
< version> 1.0< /version>
< packaging> jar< /packaging>
< name> Flink Quickstart Job< /name>
< properties>
< project.build.sourceEncoding> UTF-8< /project.build.sourceEncoding>
< flink.version> 1.13.2< /flink.version>
< java.version> 1.8< /java.version>
< scala.binary.version> 2.11< /scala.binary.version>
< maven.compiler.source> $java.version< /maven.compiler.source>
< maven.compiler.target> $java.version< /maven.compiler.target>
< hadoop.version> 3.0.0< /hadoop.version>
< flink.shaded.version> 9.0< /flink.shaded.version>
< /properties>
< dependencies>
< dependency>
< groupId> org.apache.flink< /groupId>
< artifactId> flink-java< /artifactId>
< version> 1.13.2< /version>
< /dependency>
< dependency>
< groupId> org.apache.flink< /推荐阅读
- VMware的三种网络模式以及虚拟机使用网络的方法
- (0基础学Linux系列)1.05 VMware安装问题
- Openfiler安装配置 #导入Word文档图片#
- go 多平台打包工具 gox
- docker存储卷
- 可视化运维解决方案-构建数据大屏 华汇数据
- docker基础用法
- 虚链路
- $‘‘: command not found的解决方法