Apache|Apache Flink 零基础入门(二)(开发环境搭建和应用的配置、部署及运行)
作者:成阳
前言
【Apache|Apache Flink 零基础入门(二)(开发环境搭建和应用的配置、部署及运行)】本文主要面向于初次接触 Flink、或者对 Flink 有了解但是没有实际操作过的同学。希望帮助大家更顺利地上手使用 Flink,并着手相关开发调试工作。
课程内容包括:
- Flink 开发环境的部署和配置
- 运行 Flink 应用(包括:单机 Standalone 模式、多机 Standalone 模式和 Yarn 集群模式)
关于开发测试环境,Mac OS、Linux 系统或者 Windows 都可以。如果使用的是 Windows 10 系统,建议使用 Windows 10 系统的 Linux 子系统来编译和运行。
工具 | 注释 |
---|---|
Java | Java 版本至少是Java 8,且最好选用 Java 8u51 及以上版本 |
Maven | 必须使用 Maven 3,建议使用 Maven 3.2.5。Maven 3.3.x 能够编译成功,但是在 Shade 一些 Dependencies 的过程中有些问题 |
Git | Flink 的代码仓库是: https://github.com/apache/flink |
1. 编译 Flink 代码
在我们配置好之前的几个工具后,编译 Flink 就非常简单了,执行如下命令即可:
mvn clean install -DskipTests
# 或者
mvn clean package -DskipTests
常用编译参数:
-Dfast主要是忽略QA plugins和JavaDocs的编译
-Dhadoop.version=2.6.1指定hadoop版本
--settings=${maven_file_path}显式指定maven settings.xml配置文件
当成功编译完成后,能在当前 Flink 代码目录下的 flink-dist/target/子目录 中看到如下文件(不同的 Flink 代码分支编译出的版本号不同,这里的版本号是 Flink 1.5.1):
文章图片
1.jpg 其中有三个文件可以留意一下:
版本 | 注释 |
---|---|
flink-1.5.1.tar.gz | Binary 的压缩包 |
flink-1.5.1-bin/flink-1.5.1 | 解压后的 Flink binary 目录 |
flink-dist_2.11-1.5.1.jar | 包含 Flink 核心功能的 jar 包 |
注意:2. 开发环境准备
国内用户在编译时可能遇到编译失败“Build Failure”(且有 MapR 相关报错),一般都和 MapR 相关依赖的下载失败有关,即使使用了推荐的 settings.xml 配置(其中 Aliyun Maven 源专门为 MapR 相关依赖做了代理),还是可能出现下载失败的情况。问题主要和 MapR 的 Jar 包比较大有关。遇到这些问题时,重试即可。在重试之前,要先根据失败信息删除 Maven local repository 中对应的目录,否则需要等待 Maven 下载的超时时间才能再次出发下载依赖到本地。
推荐使用 IntelliJ IDEA IDE 作为 Flink 的 IDE 工具。官方不建议使用 Eclipse IDE,主要原因是 Eclipse 的 Scala IDE 和 Flink 用 Scala 的不兼容。
如果你需要做一些 Flink 代码的开发工作,则需要根据 Flink 代码的 tools/maven/目录 下的配置文件来配置 Checkstyle ,因为 Flink 在编译时会强制代码风格的检查,如果代码风格不符合规范,可能会直接编译失败。
二、运行 Flink 应用 1. 基本概念
运行 Flink 应用其实非常简单,但是在运行 Flink 应用之前,还是有必要了解 Flink 运行时的各个组件,因为这涉及到 Flink 应用的配置问题。图 1 所示,这是用户用 DataStream API 写的一个数据处理程序。可以看到,在一个 DAG 图中不能被 Chain 在一起的 Operator 会被分隔到不同的 Task 中,也就是说 Task 是 Flink 中资源调度的最小单位。
文章图片
2.jpg 图 1 Parallel Dataflows
图 2 所示,Flink 实际运行时包括两类进程:
- JobManager(又称为 JobMaster):协调 Task 的分布式执行,包括调度 Task、协调创 Checkpoint 以及当 Job failover 时协调各个 Task 从 Checkpoint 恢复等。
- TaskManager(又称为 Worker):执行 Dataflow 中的 Tasks,包括内存 Buffer 的分配、Data Stream 的传递等。
文章图片
3.jpg 图 2 Flink Runtime 架构图
图 3 所示,Task Slot 是一个 TaskManager 中的最小资源分配单位,一个 TaskManager 中有多少个 Task Slot 就意味着能支持多少并发的 Task 处理。需要注意的是,一个 Task Slot 中可以执行多个 Operator,一般这些 Operator 是能被 Chain 在一起处理的。
文章图片
4.jpg 图 3 Process
2. 运行环境准备
- 准备 Flink binary
○ 直接从 Flink 官网上下载 Flink binary 的压缩包
○ 或者从 Flink 源码编译而来 - 安装 Java,并配置 JAVA_HOME 环境变量
(1)基本的启动流程 最简单的运行 Flink 应用的方法就是以单机 Standalone 的方式运行。
启动集群:
./bin/start-cluster.sh
打开 http://127.0.0.1:8081/ 就能看到 Flink 的 Web 界面。尝试提交 Word Count 任务:
./bin/flink run examples/streaming/WordCount.jar
大家可以自行探索 Web 界面中展示的信息,比如,我们可以看看 TaskManager 的 stdout 日志,就可以看到 Word Count 的计算结果。
我们还可以尝试通过“--input”参数指定我们自己的本地文件作为输入,然后执行:
./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}
停止集群:
./bin/stop-cluster.sh
(2)常用配置介绍
- conf / slaves
也可以直接通过“ ./bin/taskmanager.sh start ”这个命令来追加一个新的 TaskManager:
./bin/taskmanager.sh start|start-foreground|stop|stop-all
- conf/flink-conf.yaml
# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024# The heap size for the TaskManager JVM
taskmanager.heap.mb: 1024# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4# the managed memory size for each task manager.
taskmanager.managed.memory.size: 256
Standalone 集群启动后,我们可以尝试分析一下 Flink 相关进程的运行情况。执行 jps 命令,可以看到 Flink 相关的进程主要有两个,一个是 JobManager 进程,另一个是 TaskManager 进程。我们可以进一步用 ps 命令看看进程的启动参数中“-Xmx”和“-Xms”的配置。然后我们可以尝试修改 flink-conf.yaml 中若干配置,然后重启 Standalone 集群看看发生了什么变化。
需要补充的是,在 Blink 开源分支上,TaskManager 的内存计算上相对于现在的社区版本要更精细化,TaskManager 进程的堆内存限制(-Xmx)一般的计算方法是:
TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(默认值为128MB)
而最新的 Flink 社区版本 Release-1.7 中 JobManager 和 TaskManager 默认内存配置方式为:
# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
Flink 社区 Release-1.7 版本中的“taskmanager.heap.size”配置实际上指的不是 Java heap 的内存限制,而是 TaskManager 进程总的内存限制。我们可以同样用上述方法查看 Release-1.7 版本的 Flink binary 启动的 TaskManager 进程的 -Xmx 配置,会发现实际进程上的 -Xmx 要小于配置的“taskmanager.heap.size”的值,原因在于从中扣除了 Network buffer 用的内存,因为 Network buffer 用的内存一定是 Direct memory,所以不应该算在堆内存限制中。
(3)日志的查看和配置 JobManager 和 TaskManager 的启动日志可以在 Flink binary 目录下的 Log 子目录中找到。Log 目录中以“flink-{id}-${hostname}”为前缀的文件对应的是 JobManager 的输出,其中有三个文件:
- flink-{id}-${hostname}.log:代码中的日志输出
- flink-{id}-${hostname}.out:进程执行时的stdout输出
- flink-{id}-${hostname}-gc.log:JVM的GC的日志
日志的配置文件在 Flink binary 目录的 conf 子目录下,其中:
- log4j-cli.properties:用 Flink 命令行时用的 log 配置,比如执行“ flink run”命令
- log4j-yarn-session.properties:用 yarn-session.sh 启动时命令行执行时用的 log 配置
- log4j.properties:无论是 Standalone 还是 Yarn 模式,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties
- log4j-cli.properties -> logback-console.xml
- log4j-yarn-session.properties -> logback-yarn.xml
- log4j.properties -> logback.xml
(4)进一步探索 尝试重复执行“./bin/start-cluster.sh”命令,然后看看 Web 页面(或者执行jps命令),看看会发生什么?可以尝试看看启动脚本,分析一下原因。接着可以重复执行“./bin/stop-cluster.sh”,每次执行完后,看看会发生什么。
4. 多机部署 Flink Standalone 集群
部署前要注意的要点:
- 每台机器上配置好 Java 以及 JAVA_HOME 环境变量
- 每台机器上部署的 Flink binary 的目录要保证是同一个目录
- 如果需要用 HDFS,需要配置 HADOOP_CONF_DIR 环境变量配置
修改 conf/flink-conf.yaml 配置,注意要确保和 Masters 文件中的地址一致:
jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net
确保所有机器的 Flink binary 目录中 conf 中的配置文件相同,特别是以下三个:
conf/masters
conf/slaves
conf/flink-conf.yaml
然后启动 Flink 集群:
./bin/start-cluster.sh
提交 WordCount 作业:
./bin/flink run examples/streaming/WordCount.jar
上传 WordCount 的 Input 文件:
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
提交读写 HDFS 的 WordCount 作业:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
增加 WordCount 作业的并发度(注意输出文件重名会提交失败):
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20
5. Standalone 模式的 HighAvailability(HA)部署和配置
通过图 2 Flink Runtime 架构图,我们可以看到 JobManager 是整个系统中最可能导致系统不可用的角色。如果一个 TaskManager 挂了,在资源足够的情况下,只需要把相关 Task 调度到其他空闲 TaskSlot 上,然后 Job 从 Checkpoint 中恢复即可。而如果当前集群中只配置了一个 JobManager,则一旦 JobManager 挂了,就必须等待这个 JobManager 重新恢复,如果恢复时间过长,就可能导致整个 Job 失败。
因此如果在生产业务使用 Standalone 模式,则需要部署配置 HighAvailability,这样同时可以有多个 JobManager 待命,从而使得 JobManager 能够持续服务。
文章图片
5.jpg 图 4 Flink JobManager HA 示意图
注意:
- 如果想使用 Flink standalone HA 模式,需要确保基于 Flink Release-1.6.1 及以上版本,因为这里社区有个 bug 会导致这个模式下主 JobManager 不能正常工作。
- 接下来的实验中需要用到 HDFS,所以需要下载带有 Hadoop 支持的 Flink Binary 包。
# The port at which the clients will connect
clientPort=3181server.1=z05f06378.sqa.zth.tbsite.net:4888:5888
server.2=z05c19426.sqa.zth.tbsite.net:4888:5888
server.3=z05f10219.sqa.zth.tbsite.net:4888:5888
然后启动 Zookeeper:
./bin/start-zookeeper-quorum.sh
jps 命令看到 ZK 进程已经启动:
文章图片
6.jpg 停掉 Zookeeper 集群的命令:
./bin/stop-zookeeper-quorum.sh
(2)修改 Flink Standalone 集群的配置 修改 conf/masters 文件,增加一个 JobManager:
$cat conf/masters
z05f06378.sqa.zth.tbsite.net:8081
z05c19426.sqa.zth.tbsite.net:8081
之前修改过的 conf/slaves 文件保持不变:
$cat conf/slaves
z05f06378.sqa.zth.tbsite.net
z05c19426.sqa.zth.tbsite.net
z05f10219.sqa.zth.tbsite.net
修改 conf/flink-conf.yaml 文件:
# 配置high-availability mode
high-availability: zookeeper# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181# (可选)设置zookeeper的root目录
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root# (可选)相当于是这个standalone集群中创建的zk node的namespace
high-availability.cluster-id: /test_dir/test_standalone2# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针
high-availability.storageDir: hdfs:///test_dir/recovery2/
需要注意的是,在 HA 模式下 conf/flink-conf.yaml 中的这两个配置都失效了(想想为什么)。
jobmanager.rpc.address
jobmanager.rpc.port
修改完成后,确保配置同步到其他机器。
启动 Zookeeper 集群:
./bin/start-zookeeper-quorum.sh
再启动 Standalone 集群(要确保之前的 Standalone 集群已经停掉):
./bin/start-cluster.sh
分别打开两个 Master 节点上的 JobManager Web 页面:
http://z05f06378.sqa.zth.tbsite.net:8081
http://z05c19426.sqa.zth.tbsite.net:8081
可以看到两个页面最后都转到了同一个地址上,这个地址就是当前主 JobManager 所在机器,另一个就是 Standby JobManager。以上我们就完成了 Standalone 模式下 HA 的配置。
接下来我们可以测试验证 HA 的有效性。当我们知道主 JobManager 的机器后,我们可以把主 JobManager 进程 Kill 掉,比如当前主 JobManager 在 z05c19426.sqa.zth.tbsite.net 这个机器上,就把这个进程杀掉。
接着,再打开这两个链接:
http://z05f06378.sqa.zth.tbsite.net:8081
http://z05c19426.sqa.zth.tbsite.net:8081
可以发现后一个链接已经不能展示了,而前一个链接可以展示,说明发生主备切换。
然后我们再重启前一次的主 JobManager:
./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081
再打开 http://z05c19426.sqa.zth.tbsite.net:8081 这个链接,会发现现在这个链接可以转到 http://z05f06378.sqa.zth.tbsite.net:8081 这个页面上了。说明这个 JobManager 完成了一个 Failover Recovery。
6. 使用 Yarn 模式跑 Flink job
文章图片
7.jpg 图 5 Flink Yarn 部署流程图
相对于 Standalone 模式,Yarn 模式允许 Flink job 的好处有:
- 资源按需使用,提高集群的资源利用率
- 任务有优先级,根据优先级运行作业
- 基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover
○ JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
○ 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
○ 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager
./bin/yarn-session.sh -h
创建一个 Yarn 模式的 Flink 集群:
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
其中用到的参数是:
- -n,--containerNumber of TaskManagers
- -jm,--jobManagerMemoryMemory for JobManager Container with optional unit (default: MB)
- -tm,--taskManagerMemoryMemory per TaskManager Container with optional unit (default: MB)
- -qu,--queueSpecify YARN queue.
- -s,--slotsNumber of slots per TaskManager
- -t,--shipShip files in the specified directory (t for transfer)
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
这次提交 Flink job,虽然没有指定对应 Yarn application 的信息,却可以提交到对应的 Flink 集群,原因在于“/tmp/.yarn-properties-${user}”文件中保存了上一次创建 Yarn session 的集群信息。所以如果同一用户在同一机器上再次创建一个 Yarn session,则这个文件会被覆盖掉。
- 如果删掉“/tmp/.yarn-properties-${user}”或者在另一个机器上提交作业能否提交到预期到yarn session中呢?
可以配置了“high-availability.cluster-id”参数,据此从 Zookeeper 上获取到 JobManager 的地址和端口,从而提交作业。 - 如果 Yarn session 没有配置 HA,又该如何提交呢?
/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
我们可以发现,每次跑完任务不久,TaskManager 就被释放了,下次在提交任务的时候,TaskManager 又会重新拉起来。如果希望延长空闲 TaskManager 的超时时间,可以在 conf/flink-conf.yaml 文件中配置下面这个参数,单位是 milliseconds:
slotmanager.taskmanager-timeout: 30000L# deprecated, used in release-1.5
resourcemanager.taskmanager-timeout: 30000L
(2)在 Yarn 上运行单个 Flink job(Job Cluster 模式) 如果你只想运行单个 Flink Job 后就退出,那么可以用下面这个命令:
./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
常用的配置有:
- -yn,--yarncontainerNumber of Task Managers
- -yqu,--yarnqueueSpecify YARN queue.
- -ys,--yarnslotsNumber of slots per TaskManager
- -yqu,--yarnqueueSpecify YARN queue.
./bin/flink run -h
我们可以看到,“./bin/flink run -h”看到的“Options for yarn-cluster mode”中的“-y”和“--yarn”为前缀的参数其实和“./bin/yarn-session.sh -h”命令是一一对应的,语义上也基本一致。
关于“-n”(在yarn session模式下)、“-yn”在(yarn single job模式下)与“-p”参数的关系:
- “-n”和“-yn”在社区版本中(Release-1.5 ~ Release-1.7)中没有实际的控制作用,实际的资源是根据“-p”参数来申请的,并且 TM 使用完后就会归还
- 在 Blink 的开源版本中,“-n”(在 Yarn Session 模式下)的作用就是一开始启动指定数量的 TaskManager,之后即使 Job 需要更多的 Slot,也不会申请新的 TaskManager
- 在 Blink 的开源版本中,Yarn single job 模式“-yn”表示的是初始 TaskManager 的数量,不设置 TaskManager 的上限。(需要特别注意的是,只有加上“-yd”参数才能用 Single job 模式(例如:命令“./bin/flink run -yd -m yarn-cluster xxx”)
首先要确保启动 Yarn 集群用的“yarn-site.xml”文件中的这个配置,这个是 Yarn 集群级别 AM 重启的上限。
yarn.resourcemanager.am.max-attempts
100
然后在 conf/flink-conf.yaml 文件中配置这个 Flink job 的 JobManager 能够重启的次数。
yarn.application-attempts: 10# 1+ 9 retries
最后再在 conf/flink-conf.yaml 文件中配置上 ZK 相关配置,这几个配置的配置方法和 Standalone 的 HA 配置方法基本一致,如下所示。
# 配置high-availability mode
high-availability: zookeeper# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181# (可选)设置zookeeper的root目录
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root# 删除这个配置
# high-availability.cluster-id: /test_dir/test_standalone2# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针
high-availability.storageDir: hdfs:///test_dir/recovery2/
需要特别注意的是:“high-availability.cluster-id”这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
推荐阅读
- 日志打卡
- Apache多路复用模块(MPMs)介绍
- 八零后也已经老了
- 织网布局,社群营销走进山东玖零落地企业
- 零基础学习Python作业本(13)
- 浅析(成人情趣用品智能无人自动售货机是新零售的下一个风口吗())
- 依赖注入模块
- 零言碎语
- 如果有一百个理由放弃,请给自己一百零一个理由坚持
- 零长度数组与柔性数组