20190721_flink安装

查看系统版本号
cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
查看系统磁盘挂载情况
df -h
/home 目录下有 873G 磁盘存储空间
故把程序应该安装在 /home 目录下
一、Java JDK 安装
上传jdk安装包 jdk-8u191-linux-x64.tar.gz 至 /home/bigdata 目录下
cd /home/bigdata
ls
sudo tar -zxvf jdk-8u191-linux-x64.tar.gz
创建软链接
ln -s jdk1.8.0_191 jdk
配置 java 环境变量
在 vi 编辑中, 使用 Shift + g 快速定位至最后一行
sudo vi /etc/profile
export JAVA_HOME=/home/bigdata/jdk
export PATH=$JAVA_HOME/bin:$PATH
source /etc/profile
验证 java jdk 是否安装成功
java -version
二、Scala安装
上传scala安装包 scala-2.11.11.tgz 至 /home/bigdata 目录下
cd /home/bigdata
ls
sudo tar -zxvf scala-2.11.11.tgz
创建软链接
ln -s scala-2.11.11 scala
配置 scala 环境变量
在 vi 编辑中, 使用 Shift + g 快速定位至最后一行
sudo vi /etc/profile
export SCALA_HOME=/home/bigdata/scala
export PATH=$SCALA_HOME/bin:$PATH
source /etc/profile
验证 scala 是否安装成功
scala -version
测试scala环境
scala
1+1
:q退出
三、ssh 免密登录设置
cd /home/nifi
pwd
ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
sudo chmod 600 ~/.ssh/authorized_keys
验证 ssh 免密登录是否设置成功
ssh singlecluster
备注:配置 ssh 免密登录应注意
配置 ssh 免密登录时,应设置 /home/nifi 目录权限为 755,
/home/nifi/.ssh/authorized_keys 目录权限为 600,
否则可能 ssh 免密登录设置不成功!!
四、hadoop安装
4.1、上传hadoop安装包 hadoop-2.7.3.tar.gz 至 /home/bigdata 目录下
cd /home/bigdata
sudo tar -zxvf hadoop-2.7.3.tar.gz
创建软链接
ln -s hadoop-2.7.3 hadoop
ll
4.2、在/home/bigdata/hadoop目录下,建立tmp、hdfs/name、hdfs/data目录,执行如下命令
cd /home/bigdata/hadoop
sudo chmod -R 777 /home
mkdir -p tmp hdfs/name hdfs/data
4.3、配置 hadoop 环境变量
在 vi 编辑中, 使用 Shift + g 快速定位至最后一行
sudo vi /etc/profile
export HADOOP_HOME=/home/bigdata/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
source /etc/profile
验证 hadoop 变量是否配置成功
hadoop version
4.4、Hadoop配置文件配置
进入/home/bigdata/hadoop/etc/hadoop目录,配置 hadoop-env.sh等。涉及的配置文件如下:
hadoop-env.sh
yarn-env.sh
mapred-env.sh
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
1)配置hadoop-env.sh
vi hadoop-env.sh
# The java implementation to use.
#export JAVA_HOME=${JAVA_HOME}
export JAVA_HOME=/home/bigdata/jdk
2)配置yarn-env.sh
vi yarn-env.sh
#export JAVA_HOME=/home/y/libexec/jdk1.7.0/
export JAVA_HOME=/home/bigdata/jdk
3)配置mapred-env.sh
vi mapred-env.sh
# export JAVA_HOME=/home/y/libexec/jdk1.6.0/
export JAVA_HOME=/home/bigdata/jdk
4)配置core-site.xml
cd /home/hadoop/etc/hadoop
vi core-site.xml
【20190721_flink安装】添加如下配置:


fs.default.name
hdfs://singlecluster:9000
HDFS的URI,文件系统://namenode标识:端口号


hadoop.tmp.dir
/home/bigdata/hadoop/tmp
namenode上本地的hadoop临时文件夹


5)配置hdfs-site.xml
vi hdfs-site.xml
添加如下配置


dfs.name.dir
/home/bigdata/hadoop/hdfs/name
namenode上存储hdfs名字空间元数据


dfs.data.dir
/home/bigdata/hadoop/hdfs/data
datanode上数据块的物理存储位置


dfs.replication
3
副本个数,配置默认是3,应小于datanode机器数量


dfs.permissions
false


6)配置mapred-site.xml
cp mapred-site.xml.template mapred-site.xml
vi mapred-site.xml
添加如下配置:


mapreduce.framework.name
yarn



mapreduce.job.ubertask.enable
true



mapreduce.job.ubertask.maxmaps
9



mapreduce.job.ubertask.maxreduces
1


mapred-site.xml文件全部内容






mapreduce.framework.name
yarn


7)配置yarn-site.xml
vi yarn-site.xml
添加如下配置:


yarn.nodemanager.aux-services
mapreduce_shuffle


8)修改masters
sudo vi masters
singlecluster
9)修改slaves
sudo vi slaves
singlecluster
10)修改 /etc/hosts
sudo vi /etc/hosts
192.168.1.26 singlecluster
4.5、进入/home/bigdata/hadoop目录,格式化namenode节点
bin/hadoop namenode -format
4.6、进入/home/bigdata/hadoop目录下启动hadoop集群
cd /home/bigdata/hadoop
ls
sbin/start-all.sh
使用jps命令查看进程,检验hadoop集群是否已经启动。
jps
3393 NameNode
4481 Jps
3907 ResourceManager
3543 DataNode
4023 NodeManager
3737 SecondaryNameNode
进入/home/bigdata/hadoop目录下关闭hadoop集群
cd /home/bigdata/hadoop
sbin/stop-all.sh
4.7、打开浏览器查看 HDFS、YARN 的运行状态
查看 HDFS 运行状态
http://192.168.1.26:50070/
查看 YARN 运行状态
http://192.168.1.26:8088
备注:若页面输入对应的ip端口号地址查看不了网页,需验证服务器防火墙是否关闭,应保持防火墙关闭
关闭Centos7防火墙,不然端口无法访问
查看防火墙状态
systemctl status firewalld.service
临时关闭防火墙(下次重启防火墙再次开启)
systemctl stop firewalld.service
永久关闭防火墙
systemctl disable firewalld.service
4.8、Hadoop程序实例 wordcount程序运行
hadoop fs -mkdir -p /user/nifi/jobs/wordcount/input
hadoop fs -put /home/bigdata/hadoop/etc/hadoop/*.xml /user/nifi/jobs/wordcount/input
hadoop jar /home/bigdata/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar wordcount /user/nifi/jobs/wordcount/input /user/nifi/jobs/wordcount/output
hadoop fs -ls -R /
hadoop fs -cat /user/nifi/jobs/wordcount/output/*
五、安装 kafka
5.1、上传kafka安装包 kafka_2.11-2.1.1.tgz 至 /home/bigdata 目录下
cd /home/bigdata
ls
sudo tar -zxvf kafka_2.11-2.1.1.tgz
创建软链接
ln -s kafka_2.11-2.1.1 kafka
5.2、修改 zookeeper、kafka 配置文件 zookeeper.properties,server.properties
cd /home/bigdata/kafka/config
sudo vi zookeeper.properties
dataDir=/home/bigdata/kafka/tmp/zookeeper
sudo vi server.properties
log.dirs=/home/bigdata/kafka/tmp/kafka-logs
5.3、启动zookeeper和kafka(这里,zookeeper集成于kafka中)
cd /home/bigdata/kafka
启动zookeeper(后台运行)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
查看zookeeper是否运行
jps
6740 QuorumPeerMain
关闭zookeeper
bin/zookeeper-server-stop.sh config/zookeeper.properties &
启动kafka(后台运行)
nohup bin/kafka-server-start.sh config/server.properties &
或者
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
查看kafka是否运行
jps
7587 Kafka
关闭kafka
bin/kafka-server-stop.sh config/server.properties
5.4、使用kafka
1)、创建topic:
/home/bigdata/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic apache-flink-test
2)、查看topic:
/home/bigdata/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
3)、生产者
/home/bigdata/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic apache-flink-test
4)、消费者
/home/bigdata/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic apache-flink-test --from-beginning
该命令中含有过时方法 --zookeeper,该方法在老版本kafka0.90之前使用
/home/bigdata/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:9092 --topic apache-flink-test --from-beginning
备注:消费kafka时遇到的问题:kafka 创建消费者报错 consumer zookeeper is not a recognized option
在做kafka测试的时候,使用命令bin/kafka-console-consumer.sh --zookeeper 192.168.0.140:2181,192.168.0.141:2181 --topic test --from-beginning启动消费者,发现一只报错consumer zookeeper is not a recognized option,搜索了半天,一只没有解决,最后,换了一个低版本的kakfa,发现在启动的时候说使用 --zookeeper是一个过时的方法,此时,才知道原来在最新的版本中,这种启动方式已经被删除了,
最后附上0.90版本之后启动消费者的方法: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
CSDN参考链接:
kafka 创建消费者报错 consumer zookeeper is not a recognized option
https://blog.csdn.net/csdn_sunlighting/article/details/81516646
kafka中消费 kafka topic 后应该关闭消费进程
(1)使用消费命令时,用 Ctrl + C 关闭消费进程
(2)jps -m 查看kafka消费进程号,之后杀死对应的进程
jps -m
kill -9 进程号
5)、删除topic
/home/bigdata/kafka/bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】
[server.properties需要 设置delete.topic.enable=true]
六、安装 flink
6.1、上传flink安装包 flink-1.7.2-bin-hadoop27-scala_2.11.tgz 至 /home/bigdata 目录下
cd /home/bigdata
ls
sudo tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz
创建软链接
ln -s flink-1.7.2 flink
6.2、配置 flink 环境变量
在 vi 编辑中, 使用 Shift + g 快速定位至最后一行
sudo vi /etc/profile
export FLINK_HOME=/home/bigdata/flink
export PATH=$FLINK_HOME/bin:$PATH
source /etc/profile
验证 flink 变量是否配置成功
flink --version
或者
flink --v
6.3、启动flink
cd /home/bigdata/flink
./bin/start-cluster.sh
查看flink是否成功启动(新增两个进程)
jps
18469 TaskManagerRunner
18025 StandaloneSessionClusterEntrypoint
6.4、打开浏览器查看Flink任务运行状态
http://192.168.1.26:8081
6.5、flink伪分布式验证
使用 start-scala-shell.sh 来验证
${FLINK_HOME}/bin/start-scala-shell.sh是flink提供的交互式clinet,可以用于代码片段的测试,方便开发工作,
它有两种启动方式,一种是工作在本地,另一种是工作到集群。
本例中因为机器连接非常方便,就直接使用集群进行测试,在开发中,如果集群连接不是非常方便,可以连接到本地,在本地开发测试通过后,再连接到集群进行部署工作。
如果程序有依赖的jar包,则可以使用 -a 或 --addclasspath 参数来添加依赖。
1)本地连接
${FLINK_HOME}/bin/start-scala-shell.sh local
2)集群连接
${FLINK_HOME}/bin/start-scala-shell.sh remote
3)带有依赖包的格式
${FLINK_HOME}/bin/start-scala-shell.sh [local|remote] --addclasspath
4)查看帮助
${FLINK_HOME}/bin/start-scala-shell.sh --help
cd /home/bigdata/flink
bin/start-scala-shell.sh --help
Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] ...
Command: local [options]
Starts Flink scala shell with a local Flink cluster
-a, --addclasspath
Specifies additional jars to be used in Flink
Command: remote [options]
Starts Flink scala shell connecting to a remote cluster
Remote host name as string
Remote port as integer
-a, --addclasspath
Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster
-n, --container argNumber of YARN container to allocate (= Number of TaskManagers)
-jm, --jobManagerMemory arg
Memory for JobManager container
-nm, --name Set a custom name for the application on YARN
-qu, --queueSpecifies YARN queue
-s, --slotsNumber of slots per TaskManager
-tm, --taskManagerMemory
Memory per TaskManager container
-a, --addclasspath
Specifies additional jars to be used in Flink
--configDir The configuration directory.
-h, --helpPrints this usage text


5)使用集群模式去验证
cd /home/bigdata/flink
bin/start-scala-shell.sh remote 192.168.1.26 8081
批处理验证:
val text = benv.fromElements("To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts.print()
流处理验证:
val dataStream = senv.fromElements(1, 2, 3, 4)
dataStream.countWindowAll(2).sum(0).print()
senv.execute("My streaming program")
参考CSDN博客链接:
Flink部署-standalone模式 - kwame211的博客 - CSDN博客
https://blog.csdn.net/kwame211/article/details/89332391

    推荐阅读