快速安装|快速安装 kafka 集群

前言 最近因为工作原因,需要安装一个 kafka 集群,目前网络上有很多相关的教程,按着步骤来也能完成安装,只是这些教程都显得略微繁琐。因此,我写了这篇文章帮助大家快速完成 kafka 集群安装。 安装步骤 准备多台服务器,数量建议为奇数(如:3,5,7 等),操作系统为 CentOS 7+。 这里使用 3 台服务器作为例子,IP 分别为 192.168.1.1、192.168.1.2、192.168.1.3,修改下述脚本文件的 IP 地址,并拷贝到 3 台服务器上分别执行即可完成安装。 快速安装|快速安装 kafka 集群
文章图片
快速安装|快速安装 kafka 集群
文章图片

#!/bin/bash# Modify the link if you want to download other version KAFKA_DOWNLOAD_URL="https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz"# Please use your own server ip SERVERS=("192.168.1.1" "192.168.1.2" "192.168.1.3")ID=0MECHINE_IP=$(hostname -i) echo "Mechine IP: "${MECHINE_IP}LENGTH=${#SERVERS[@]}for (( i=0; i<${LENGTH}; i++ )); do if [ "${SERVERS[$i]}" = "${MECHINE_IP}" ]; then ID=$((i+1)) fi doneecho "ID: "${ID}if [ "${ID}" -eq "0" ]; then echo "Mechine IP is not matched to server list" exit 1 fiZOOKEEPER_CONNECT=$(printf ",%s:2181" "${SERVERS[@]}") ZOOKEEPER_CONNECT=${ZOOKEEPER_CONNECT:1} echo "Zookeeper Connect: "${ZOOKEEPER_CONNECT}echo "---------- Update yum ----------" yum update -y yum install -y wgetecho "---------- Install java ----------" yum -y install java-1.8.0-openjdk java -versionecho "---------- Create kafka user & group ----------" groupadd -r kafka useradd -g kafka -r kafka -s /bin/falseecho "---------- Download kafka ----------" cd /opt wget ${KAFKA_DOWNLOAD_URL} -O kafka.tgz mkdir -p kafka tar -xzf kafka.tgz -C kafka --strip-components=1 chown -R kafka:kafka /opt/kafkaecho "---------- Install and start zookeeper ----------" mkdir -p /data/zookeeper chown -R kafka:kafka /data/zookeeper echo "${ID}" > /data/zookeeper/myid# zookeeper config # https://zookeeper.apache.org/doc/r3.1.2/zookeeperAdmin.html#sc_configuration cat < /opt/kafka/config/zookeeper-cluster.properties # the directory where the snapshot is stored. dataDir=/data/zookeeper# the port at which the clients will connect clientPort=2181# setting number of connections to unlimited maxClientCnxns=0# keeps a heartbeat of zookeeper in milliseconds tickTime=2000# time for initial synchronization initLimit=10# how many ticks can pass before timeout syncLimit=5# define servers ip and internal ports to zookeeper EOFfor (( i=0; i<${LENGTH}; i++ )); do INDEX=$((i+1)) echo "server.${INDEX}=${SERVERS[$i]}:2888:3888" >> /opt/kafka/config/zookeeper-cluster.properties done# zookeeper.service cat < /usr/lib/systemd/system/zookeeper.service [Unit] Description=Apache Zookeeper server (Kafka) Documentation=http://zookeeper.apache.org Requires=network.target remote-fs.target After=network.target remote-fs.target[Service] Type=simple User=kafka Group=kafka ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper-cluster.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh Restart=on-failure[Install] WantedBy=multi-user.target EOFsystemctl daemon-reload systemctl start zookeeper && systemctl enable zookeeperecho "---------- Install and start kafka ----------" mkdir -p /data/kafka chown -R kafka:kafka /data/kafka# kafka config # https://kafka.apache.org/documentation/#configuration cat < /opt/kafka/config/server-cluster.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=${ID}# Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured.Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://${MECHINE_IP}:9092# A comma separated list of directories under which to store log files log.dirs=/data/kafka# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1# The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168# The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=${ZOOKEEPER_CONNECT}/kafka# Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=60000 EOF# kafka.service cat < /usr/lib/systemd/system/kafka.service [Unit] Description=Apache Kafka server (broker) Documentation=http://kafka.apache.org/documentation.html Requires=network.target remote-fs.target After=network.target remote-fs.target kafka-zookeeper.service [Service] Type=simple User=kafka Group=kafka ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server-cluster.properties ExecStop=/opt/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target EOFsystemctl daemon-reload systemctl start kafka && systemctl enable kafka

setup.sh
基本操作
# 启动 zookeeper systemctl start zookeeper# 停止 zookeeper systemctl stop zookeeper# 重启 zookeeper systemctl restart zookeeper# 查看 zookeeper 日志 systemctl status zookeeper -l# 启动 kafka systemctl start kafka# 停止 kafka systemctl stop kafka# 重启 kafka systemctl restart kafka# 查看 kafka 日志 systemctl status kafka -l


简单测试
# 进入 kafka bin 目录 cd /opt/kafka/bin/# 创建一个 topic kafka-topics.sh --create --topic test --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092# 查看 topic 描述 kafka-topics.sh --topic test --describe --bootstrap-server localhost:9092# 启动生产者然后输入消息 kafka-console-producer.sh --topic test --bootstrap-server localhost:9092# 启动消费者消费消息 kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092# 删除 topic kafka-topics.sh --topic test --delete --bootstrap-server localhost:9092


脚本说明 【快速安装|快速安装 kafka 集群】1. 以下代码主要指定下载 kafka 的版本以及服务器 IP 列表,可根据实际情况进行调整。
# Modify the link if you want to download other version KAFKA_DOWNLOAD_URL="https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz"# Please use your own server ip SERVERS=("192.168.1.1" "192.168.1.2" "192.168.1.3")


2. 以下代码主要用于生成 zookeeper id 和 kafka broker id 以及拼接 kafka 配置中的 zookeeper 连接串,通过本机 IP 与填写的 IP 列表进行匹配,如果本机 IP 等于第一个服务器 IP,则 ID为 1,等于第二个服务器 IP,则 ID为 2,等于第二个服务器 IP,则 ID为 3,以此类推;本机 IP 不在填写的 IP 列表中,则会退出安装。
ID=0MECHINE_IP=$(hostname -i) echo "Mechine IP: "${MECHINE_IP}LENGTH=${#SERVERS[@]}for (( i=0; i<${LENGTH}; i++ )); do if [ "${SERVERS[$i]}" = "${MECHINE_IP}" ]; then ID=$((i+1)) fi doneecho "ID: "${ID}if [ "${ID}" -eq "0" ]; then echo "Mechine IP is not matched to server list" exit 1 fiZOOKEEPER_CONNECT=$(printf ",%s:2181" "${SERVERS[@]}") ZOOKEEPER_CONNECT=${ZOOKEEPER_CONNECT:1}


3. 更新 yum 源,并安装 wget 下载工具
yum update -y yum install -y wget


4. 安装 java 8
yum -y install java-1.8.0-openjdk java -version


5. 创建 kafka 用户及组
groupadd -r kafka useradd -g kafka -r kafka -s /bin/false


6. 下载并解压 kafka 可执行程序
cd /opt wget ${KAFKA_DOWNLOAD_URL} -O kafka.tgz mkdir -p kafka tar -xzf kafka.tgz -C kafka --strip-components=1 chown -R kafka:kafka /opt/kafka


7. 创建 zookeeper 目录,创建 zookeeper id
mkdir -p /data/zookeeper chown -R kafka:kafka /data/zookeeper echo "${ID}" > /data/zookeeper/myid


8. 生成 zookeeper 配置文件,详细说明可参考:https://zookeeper.apache.org/doc/r3.1.2/zookeeperAdmin.html#sc_configuration

cat < /opt/kafka/config/zookeeper-cluster.properties # the directory where the snapshot is stored. dataDir=/data/zookeeper# the port at which the clients will connect clientPort=2181# setting number of connections to unlimited maxClientCnxns=0# keeps a heartbeat of zookeeper in milliseconds tickTime=2000# time for initial synchronization initLimit=10# how many ticks can pass before timeout syncLimit=5# define servers ip and internal ports to zookeeper EOFfor (( i=0; i<${LENGTH}; i++ )); do INDEX=$((i+1)) echo "server.${INDEX}=${SERVERS[$i]}:2888:3888" >> /opt/kafka/config/zookeeper-cluster.properties done


9. 创建 zookeeper systemd 管理文件,启动并设置开机启动 zookeeper
cat < /usr/lib/systemd/system/zookeeper.service [Unit] Description=Apache Zookeeper server (Kafka) Documentation=http://zookeeper.apache.org Requires=network.target remote-fs.target After=network.target remote-fs.target[Service] Type=simple User=kafka Group=kafka ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper-cluster.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh Restart=on-failure[Install] WantedBy=multi-user.target EOFsystemctl daemon-reload systemctl start zookeeper && systemctl enable zookeeper


10. 创建 kafka 目录
mkdir -p /data/kafka chown -R kafka:kafka /data/kafka


11. 生成 kafka 配置文件,详细说明可参考:https://kafka.apache.org/documentation/#configuration
cat < /opt/kafka/config/server-cluster.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=${ID}# Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured.Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://${MECHINE_IP}:9092# A comma separated list of directories under which to store log files log.dirs=/data/kafka# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1# The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168# The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=${ZOOKEEPER_CONNECT}/kafka# Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=60000 EOF


12. 创建 kafka systemd 管理文件,启动并设置开机启动 kafka
cat < /usr/lib/systemd/system/kafka.service [Unit] Description=Apache Kafka server (broker) Documentation=http://kafka.apache.org/documentation.html Requires=network.target remote-fs.target After=network.target remote-fs.target kafka-zookeeper.service [Service] Type=simple User=kafka Group=kafka ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server-cluster.properties ExecStop=/opt/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target EOFsystemctl daemon-reload systemctl start kafka && systemctl enable kafka


总结 按照上述的操作,你将快速完成 kafka 集群安装,如有问题可以在文章留言。

    推荐阅读