构建大数据平台(六)Kafka集群搭建

一、前提:
(1)Kafka服务,是基于zookeeper的。
(2)Kafka使用稳定版本:kafka_2.11-1.0.0.tgz;
(3)可在master上安装后,scp到其他slave机器上。
二、配置Hbase:
首先要注意在生产环境中目录结构要定义好,防止在项目过多的时候找不到所需的项目

  1. 目录统一放在/home下面 ,首先创建kafka项目目录
cd /home#创建项目目录 mkdir kafka cd /home/kafka#创建kafka消息目录,主要存放kafka消息 mkdir kafkalogs

  1. 将 kafka_2.11-1.0.0.tgz 放在 /home/kafka下,并解压缩,解压缩完成后可删除安装包:
#解压缩 tar -zxvf kafka_2.11-1.0.0.tgz#完成后可删除 rm -f kafka_2.11-1.0.0.tgz

  1. 修改配置文件
(1)进入到config目录
cd /home/kafka/kafka_2.11-1.0.0/config/

主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

(2)修改配置文件 server.properties:
其中broker.id=,host.name=,listeners=,每台服务器都不能相同,scp完后注意修改
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements.See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License.You may obtain a copy of the License at # #http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker. broker.id=0############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. #FORMAT: #listeners = listener_name://host_name:port #EXAMPLE: #listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://master:9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured.Otherwise, it will use the valuenum.network.threads=3num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # A comma seperated list of directories under which to store log files log.dirs=/home/kafka/kafkalogsnum.recovery.threads.per.data.dir=1############################# Internal Topic Settings############################# ############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: #1. Durability: Unflushed data may be lost if you are not using replication. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can # from the end of the log.message.max.byte=5242880replica.fetch.max.bytes=5242880# segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824log.segment.bytes=1073741824default.replication.factor=2# The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000log.cleaner.enable=false############################# Zookeeper ############################## root directory for all kafka znodes. ############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0

(3) 将安装配置好的kafka目录复制到所有slave节点中:
scp -r /home/kafka/ root@slave1:/home/

(4)修改所有slave节点中server.properties中的broker.id=,host.name=,listeners=。
三、启动kafka:
  1. 启动kafka集群,命令如下:
#从后台启动Kafka集群(所有机器都需要启动) cd #进入到kafka的bin目录 cd /home/kafka/kafka_2.11-1.0.0/bin #启动服务 ./kafka-server-start.sh -daemon ../config/server.properties

  1. 使用命令:jps,检查服务是否启动
  2. 创建Topic来验证是否创建成功
(1)创建Topic:
cd /home/kafka/kafka_2.11-1.0.0/bin ./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic test1#解释 --replication-factor 2#复制3份 --partitions 1 #创建1个分区 --topic #主题为test1

(2)在一台服务器上创建一个发布者,这里我们选用slave1:
#创建一个broker,发布者 ./kafka-console-producer.sh --broker-list slave1:9092 --topic test1

(3)在一台服务器上创建一个订阅者,这里我们选择master:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

(4)测试
在发布者那里发布消息看看订阅者那里是否能正常收到。
如果收到,至此,Kafka集群环境搭建完成!
四、补充:
【构建大数据平台(六)Kafka集群搭建】大部分命令可以去官方文档查看
  1. 查看topic
#就会显示我们创建的所有topic ./kafka-topics.sh --list --zookeeper localhost:2181

  1. 查看topic状态
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1 #下面是显示信息 Topic:test1 PartitionCount:1ReplicationFactor:3 Configs: Topic: test1Partition: 0Leader: 0Replicas: 0,1,2 Isr: 0,1,2 #分区为为1复制因子为2test1的分区为0 #Replicas: 0,1复制的为0,1,,3

  1. 上面的大家你完成之后可以登录zk来查看zk的目录情况
#使用客户端进入zk ./zkCli.sh -server 127.0.0.1:2181#默认是不用加’-server‘参数的因为我们修改了他的端口#标注一个重要的 [zk: 127.0.0.1:2181(CONNECTED) 1] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://master:9092"],"jmx_port":-1,"host":"master","timestamp":"1519797571108","port":9092,"version":4} cZxid = 0x700000055 ctime = Wed Feb 28 13:59:30 CST 2018 mZxid = 0x700000055 mtime = Wed Feb 28 13:59:30 CST 2018 pZxid = 0x700000055 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x361dafd39070000 dataLength = 182 numChildren = 0#还有一个是查看partion [zk: 127.0.0.1:2181(CONNECTED) 3] get /brokers/topics/test1/partitions/0 null cZxid = 0x700000066 ctime = Wed Feb 28 14:33:35 CST 2018 mZxid = 0x700000066 mtime = Wed Feb 28 14:33:35 CST 2018 pZxid = 0x700000067 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1

    推荐阅读