文章目录
- 原理介绍
- Flink on Yarn模式
-
- Session-Cluster模式
- Per-Job-Cluster模式
Flink on Yarn 模式的原理是依靠 YARN 来调度 Flink 任务,目前在企业中使用较多。这种模式的好处是可以充分利用集群资源,提高集群机器的利用率,并且只需要 1 套 Hadoop集群,就可以执行 MapReduce 、Spark和Flink 任务,操作非常方便,运维方面也很轻松。Flink on Yarn 模式需要依赖 Hadoop 集群,并且Hadoop 的版本需要是 2.2 及以上。
原理介绍
文章图片
1)当启动一个新的 Flink YARN Client 会话时,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传 Flink 配置和 JAR 文件到 HDFS。
2)客 户 端 请 求 一个 YARN 容 器 启动 ApplicationMaster 。 JobManager 和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM 就能够知道JobManager 的地址,它会为 TaskManager 生成一个新的 Flink 配置文件(这样它才能连上 JobManager),该文件也同样会被上传到 HDFS。另外,AM 容器还提供了 Flink 的Web 界面服务。Flink 用来提供服务的端口是由用户和应用程序 ID 作为偏移配置的,这使得用户能够并行执行多个 YARN 会话。
3)之后,AM 开始为 Flink 的 TaskManager 分配容器(Container),从 HDFS 下载 JAR 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了
【Flink|Flink on Yarn 部署Session-Cluster和Per-Job-Cluster】Flink n on n Yarn 模式在使用的时候又可以分为两Session-Cluster和Per-Job-Cluster
Session-Cluster
这种模式是在 YARN 中提前初始化一个 Flink 集群(称为 Flinkyarn-session),开辟指定的资源,以后的 Flink 任务都提交到这里。这个 Flink 集群会常驻在 YARN 集群中,除非手工停止。这种方式创建的 Flink 集群会独占资源,不管有没有 Flink 任务在执行,YARN 上面的其他任务都无法使用这些资源
文章图片
Per-Job-Cluster
这种模式,每次提交 Flink 任务都会创建一个新的 Flink 集群,每个 Flink 任务之间相互独立、互不影响,管理方便。任务执行完成之后创建的 Flink集群也会消失,不会额外占用资源,按需使用,这使资源利用率达到最大,在工作中推荐使用这种模式。
Flink on Yarn模式 link on Yarn 模式需要如下条件:
1)安装配置好 hadoop集群 ,参见:https://blog.csdn.net/zhangxm_qz/article/details/106695347
2)配置好 HADOOP_HOME 环境变量
3)下载 Flink 提交到 Hadoop 的连接器(jar 包),并把 jar 拷贝到 Flink 的 lib 目录下
我这里hadoop版本是2.6.2 jar包下载地址:https://download.csdn.net/download/zhangxm_qz/12737715
Session-Cluster模式 启动hadoop集群
[root@server01 hadoop]# sbin/start-all.sh
通过flink命令启动flink集群
[root@server01 flink]# bin/yarn-session.sh -n 3 -s 3 -nm bjsxt1
yarn-session.sh 参数说明如下:
-n,--container 表示分配容器的数量(也就是 TaskManager 的数量)。
-D 动态属性。
-d,--detached 在后台独立运行。
-jm,--jobManagerMemory :设置 JobManager 的内存,单位是 MB。
-nm,--name:在 YARN 上为一个自定义的应用设置一个名字。
-q,--query:显示 YARN 中可用的资源(内存、cpu 核数)。
-qu,--queue :指定 YARN 队列。
-s,--slots :每个 TaskManager 使用的 Slot 数量。
-tm,--taskManagerMemory :每个 TaskManager 的内存,单位是 MB。
-z,--zookeeperNamespace :针对 HA 模式在 ZooKeeper 上创建 NameSpace。
-id,--applicationId :指定 YARN 集群上的任务 ID,附着到一个后台独立运行的 yarn session 中
问题
我这里启动过程中一直报一个错误:
Container [] is running beyond virtual memory limits. Current Usage: 200MB of 1GB physical memory used; 2.6GB of 2.1GB virtual memory used. Killing container.
提示虚拟内存使用超出范围
网上查找资料 找到原因:
yarn分配的物理内存为1GB,默认的虚拟内存与物理内存的比例为2.1。因此虚拟内存不够,增大虚拟内存与物理内存的比例即可。
修改hadoop 中yarn-site.xml 配置文件 ,增加如下内容,并重启hadoop集群和flink(我这里重启了好几次都不行,但是后来可以了,可以通过http://server01:8088/conf 查看 修改的配置是否生效)
yarn.nodemanager.vmem-pmem-ratio
5
访问FlinkwebUI
执行上述命令启动Flink集群,后日志中会显示 webUI地址,通过该地址可访问 flink webUI
JobManager Web Interface: http://server03:36941
文章图片
yarn界面中可以看到我们提交的application
文章图片
提交任务
flink成功启动后,可以看到本地文件系统中有一个临时文件。有了这个文件可以提交 job 到 Yarn
[root@server01 hadoop]# cat/tmp/.yarn-properties-root
#Generated YARN properties file
#Fri Aug 21 02:39:00 EDT 2020
parallelism=9
dynamicPropertiesString=
applicationID=application_1597991682418_0002
命令提交任务
[root@server01 flink]# bin/flink run -d -c com.test.flink.wc.StreamWordCount ./appjars/test-1.0-SNAPSHOT.jar
2020-08-21 05:40:25,763 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- Found Yarn properties file under /tmp/.yarn-properties-root.
2020-08-21 05:40:25,763 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- Found Yarn properties file under /tmp/.yarn-properties-root.
2020-08-21 05:40:27,148 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- YARN properties set default parallelism to 9
2020-08-21 05:40:27,148 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- YARN properties set default parallelism to 9
YARN properties set default parallelism to 9
2020-08-21 05:40:27,217 INFOorg.apache.hadoop.yarn.client.RMProxy- Connecting to ResourceManager at server01/192.168.204.10:8032
2020-08-21 05:40:27,445 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-08-21 05:40:27,445 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-08-21 05:40:27,454 WARNorg.apache.flink.yarn.AbstractYarnClusterDescriptor- Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2020-08-21 05:40:27,560 INFOorg.apache.flink.yarn.AbstractYarnClusterDescriptor- Found application JobManager host name 'server03' and port '36941' from supplied application id 'application_1597991682418_0002'
Starting execution of program
Job has been submitted with JobID 714f6e03bf0dd2dba7d23db45a5961ab
通过webUI 可以看到任务正常执行
文章图片
Per-Job-Cluster模式 通过命令停止Flink Session-Cluster
[root@server01 flink]# yarn application -kill application_1597991682418_0002
20/08/21 05:47:19 INFO client.RMProxy: Connecting to ResourceManager at server01/192.168.204.10:8032
Killing application application_1597991682418_0002
20/08/21 05:47:20 INFO impl.YarnClientImpl: Killed application application_1597991682418_0002
提交任务
bin/flink run -m yarn-cluster -yn 3 -ys 3 -ynm bjsxt02-c com.test.flink.wc.StreamWordCount ./appjars/test-1.0-SNAPSHOT.jar
任务提交参数说明:相对于 Yarn-Session 参数而言,只是前面加了 y,详情如下:
-yn,--container 表示分配容器的数量,也就是 TaskManager 的数量。
-d,--detached:设置在后台运行。
-yjm,--jobManagerMemory:设置 JobManager 的内存,单位是 MB。
-ytm,--taskManagerMemory:设置每个 TaskManager 的内存,单位是 MB。
-ynm,--name:给当前 Flink application 在 Yarn 上指定名称。
-yq,--query:显示 yarn 中可用的资源(内存、cpu 核数)
-yqu,--queue :指定 yarn 资源队列
-ys,--slots :每个 TaskManager 使用的 Slot 数量。
-yz,--zookeeperNamespace:针对 HA 模式在 Zookeeper 上创建 NameSpace
-yid,--applicationID : 指定 Yarn 集群上的任务 ID,附着到一个后台独立运行的 Yarn Session 中。
上传任务后可以看到application
文章图片
推荐阅读
- flink on yarn Per-job模式High Availability 配置
- 大数据|Hadooop-Yarn
- flink|Flink 的学习笔记
- 大数据|FLIP-147(支持包含结束任务的 Checkpoint 操作与作业结束流程修正)
- 大数据技术栈|网易互娱基于 Flink 的支付环境全关联分析实践
- StarRocks|使用StarRocks内置工具Routine Load同步Mysql/TiDB/PG等增量更新数据到StarRocks
- flink|Flink CDC 同步mysql数据
- java|阿里巴巴开源的 Blink 实时计算框架真香
- flink|【大数据面经系列】大数据开发(实时计算方向)面试题(一)