ELK之kafka篇

与天地兮比寿,与日月兮齐光。这篇文章主要讲述ELK之kafka篇相关的知识,希望能为你提供帮助。
1. 异步通信原理
1.1、观察者模式

  • 观察者模式(observer),又叫发布-订阅模式(publish/subscribe)
  • 定义对象间一种一对多的依赖关系,使得每当一个对象(目标对象)改变状态,则所有依赖于它的对象(观察者对象)都会得到通知并自动更新。

现实生活中的引用场景:淘宝天猫的到货通知等


1.2、生产者和消费者模式
  • 传统模式
  • 生产者直接将消息传递给指定的消费者
  • 耦合性特别高,当生产者或消费者发送变化,都需要重新写业务逻辑
  • 生产者消费者模式
  • 通过一个容器来解决生产者和消费者的强耦合问题,生产者和消费者彼此之间不直接通信,而通过阻塞队列来进行通信。
【ELK之kafka篇】


  • 数据传递流程
  • 生产者消费者模式,即n个线程进行生产,同时n个线程进行消费,两种角色通过内存缓冲区进行通信。
  • 生产者负责向缓冲区里面添加数据
  • 消费者负责从缓冲区读取数据,一般遵循先进先出的原则。
1.3、缓冲区
  • 解耦
  • 假设生产者和消费者分别是两个类,如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖。
  • 支持并发
  • 生产者直接调用消费者的某个方法过程中,函数调用是同步的。
  • 万一消费者处理数据很慢,生产者就会糟蹋大好时光
  • 支持忙闲不均
  • 缓冲区还有一个好处是,如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中
  • 等生产者的制造速度慢下来,消费者再慢慢处理掉。
1.4、同步与异步区别
  1. 同步要求接收端时钟频率和发送端时钟频率一致,发送端发送连续的比特流;异步通信时不要求接收端时钟和发送端时钟同步,发送端发送完一个字节后,可经过任意长的时间间隔再发送下一个字节。这是个人理解。


2. 消息系统原理
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无须关注数据在应用间是如何传递的。
2.1点对点消息传递
  • 在点对点消息系统中,消息持久化到一个队列中,此时,将有一个或多个消费者消费队列中的数据,但是一条消息只能被消费一次。
  • 当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。
  • 该模式有多个消费者同时消费数据,也能保证数据处理的顺序。
  • 基于推送模型的消息系统,有消息代理记录消费状态。
消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。 

2.2发布订阅消息传递
  • 在发布-订阅消息系统中,消息被持久化到一个topic中
  • 消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。
  • kafka采取拉取模型,由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费

3.kafka介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,它是apahe基金会开源的软件。它是消息中间件的一种,由scala和java语言开发。

#特性s
高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
可靠性:消息被持久化到本地磁盘,并且支持数据备份防止丢失。
容错性:允许集群中节点失败。(若副本数量为n,则允许n-1个节点失败)
持久性:存放数据在磁盘,可以设置存放时间,设置永久都可以。
顺序保证:在大多数情况下,数据处理的顺序很重要,大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理,
kafka一个partition内的消息的有序性。注意是一个partition,如果数据分开保存在多个partition的话,
顺序是会不分先后可以读取到的。
异步通信: 很多时候,用户不想也不需要立即处理消息,消息队列提供了异步处理机制,允许用户把一个消息放入队列,
但并不立即处理它,想向队列中放入多少消息就放多少,然后在需要的时候再去处理。

4.kafka架构


ps:从上面看出每个partition分区有一个leader,不是所有的分区就1个。
      有很多partition就会有很多的leader和follower
5.术语
"Broker" Kafka角色被称为broker,它只负责消息传递。
"Topic" 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,
逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处);
类似于文件系统中的文件夹,事件是该文件夹中的文件。
Kafka 中的主题始终是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、
一个或多个订阅这些事件的消费者。主题中的事件可以根据需要随时读取——与传统的消息传递系统不同,事件在消费后不会被删除。
相反,您可以通过每个主题的配置设置来定义 Kafka 应该将您的事件保留多长时间,之后旧事件将被丢弃。Kafka 的性能在数据大小
方面实际上是恒定的,因此长时间存储数据是非常好的。
我个人把topic理解为一个表,每个事件是一行,在读取和写入都要指定这个表名字;其实官方理解为文件夹,这也是很直观的。
物理上不同topic的消息分开存储。

"Producer" 负责发布消息到Kafka broker,通常是应用服务。
"Consumer" 消息消费者,向Kafka broker读取消息的客户端,通常是logstash或es。
"Consumer Group" 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,
若不指定group name则属于默认的group)。
"ISR" (in sync replicas)加入同步队列的副本,可以理解为一个加入同步队列清单

  • partition
"Partition" Partition是物理上的概念,每个Topic包含一个或多个Partition,partition有的人理解为分片。
每个topic至少有一个partition,当生产者产生数据的时候,根据分配策略,选择分区然后将消息追加到指定分区的末尾队列。
#partation数据路由规划
1.指定了partition,则直接使用
2.未指定partition,但指定key,通过对key的value进行hash选出一个partition
3.partition和key都未指定,使用轮询选出一个partition
每条消息都会有一个自增的编号,用于标识顺序,标识消息的偏移量
每个partition中的数据使用多个segment文件存储
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序;
严格保证消息的消费顺序场景下,需要将partition数目设置为1

#注:参数调整优化建议章节有调整正则


  • replication
数据会存放到topic的partition中,但是有可能partition会损坏,我们需要对partition数据进行备份,备份多少是有参数的。
我们将分区分为leader(1)和follower(n)
1.leader负责写入和读取数据,follower只负责备份,保证了数据的一致性
2.备份数设置为n,表示主+备=n,不是主1个,备1个(注意这里的理解)
3.通常副本数设置2,3,最高9,不建议设置太多,具体看你集群情况。永远不要在生产环境设置为1

  • leader
每个partition有多个副本,其中有且仅有一个作为leader,leader是当前负责数据的读写的partition(每个partition一个leader,
两个分区两个leader,架构图可以看出)
1.producer先中zookeeper的“/brokers/.../state” 节点找到该partition的leader
2.producer将消息发送给该leader
3.leader将消息写入本地log
4.followers从leader pull消息,写入本地log后给leader发送ack
5.leader收到所有isr的中replica的ack后,增加HW(high watermark,最后提交的offset),并向producer发送ack

  • follower
跟随leader,所有写请求都通过leader路由,数据变更会广播给所有follower,follower与leader保持数据同步。
如果leader失效,从follower重新选举出一个新的leader
当follower挂掉,卡住或者同步太慢超时,leader会把这个follower从ISR(in sync replicas)列表中删除,
重新创建一个新的follower。

  • producer
生产者即数据的发布者,该角色将消息发布到kafka的topic中
broker接受到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中,生产者发送的消息存储到一个
partition中,生产者也可以指定数据存储的partition。


  • consumer group
每个consumer属于一个特定的consumer group,若不指定则属于默认的group
将多个消费者集中到一起去处理某一个topic数据,可以更快的提高数据的消费能力。
整个消费者组共享一组偏移量防止数据被重复读取,因为一个topic有多个分区。
一个组里的不同consumer,不会消费到同一个数据。
消费者组的使用也提高了分区的访问速度,提高了并发量,下图所示,2个消费者同时消费一个topic,2个消费者不会消费到同一个
消息内容,因为他们对应不同的分区。它其实是增加并发的一种小方式。
group_id通过消费者的配置指定: group.id=xxxxx,消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费,
所以不要让一个组内消费者的数量超过主题分区的数量。



  1. 向消费组添加消费者是横向扩展消费能力的主要方式。必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者。但是不要让消费者的数量超过主题分区的数量。
  2. 为每个需要获取一个或多个主题全部消息的应用创建一个消费组,然后向消费组添加消费者来横向扩展消费能力和应用的处理能力,则每个消费者只处理一部分消息。

  • offset偏移量
可以唯一的标识一条消息
偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息。
消费被消费后,不会马上删除,默认保存一周(看你是什么版本kafka,不同版本可能区别),也可以修改参数。
我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。

6.参数调整建议
  • Partition 数目
一般来说,每个partition能处理的吞吐为几MB/s (仍需要基于根据本地环境测试后获取准确指标),增加更多的partitions意味着:
。更高的并行度与吞吐
。可以扩展更多的(同一个consumer group中的)consumers
。若是集群中有较多的brokers,则可更大程度上利用闲置的brokers
。但是会造成Zookeeper的更多选举
。也会在Kafka中打开更多的文件(文件打开数,文件数,所以默认设置的时候把文件设置为65535,官网也说系统有几个重要设置
https://kafka.apache.org/documentation/#os)

  • 调整准则
。一般来说,若是集群较小(小于6个brokers) ,则配置2 x broker数的partition数(其实partition数=broker数也可以,因为后面还有个副本数,或者考虑后面可能集群扩容,配置partition数=broker数+1或者partition数=broker数+2).在这里主要考虑的是之后的扩展.若是集群扩展了一倍(例如12个),则不用担心会有partiiion不足的现象发生。注意分区数下次修改不可以减少,只可以增加。

。一般来说,若是集群较大(大于12个),则配置1 x broker数的partition。因为这里不需要再考虑集群的扩展情况,与broker数相同的partition数已经足够应付常规场景。若有必要,则再手动调整

。考虑最高峰吞吐需要的并行consumer数,调整pariilion的数目。若是应用场景需要有20个(同一个consumer group中的)
consumer并行消费,则据此设置为20个partition

。考虑producer所需的吞吐,调整partition数目(如果producer的吞吐非常高,或是在接下来两年内都比较高,则增加partition的数目)
  • Replication factor
  • 此参数决定的是records复制的数目,建议至少设置为2,一般是3,最高设置为9
  • 更高的replicalion facior (假设数目为N,简称RF) 意味者:
。系统更稳定(允许N-1个broker宕机)
。更多的副本(如果acks=all,则会造成较高的延迟)
。系统磁盘的使用率会更高(一般若是RF为3,则相对于RF为2时,会占据更多50%的磁盘空间)
。以3为起始(当然至少需要有3个brokers,同时也不建议一个Kafka集群中节点数少于3个节点)
。如果replication性能成为了瓶颈或是一个issue,则建议使用一个性能更好的broker,而不是降低RF的数目
。永远不要在生产环境中设置RF为1

  • 批量写入
为了大幅度提高producer写入吞吐,需要定期批量写文件
?每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000

?每间隔1秒钟,刷数据到磁盘
log.flush.interval.ms=1000

7. 原理
#原理:
kafka是一个消息队列服务器,角色是broker,消息发送者称为producer,接收者称为consumer;通常我们部署多个broker以提供高
可用性的消息服务集群,典型的是3个broker;消息以topic的形式发送到broker,消费者订阅topic,实现按需取用的消费模式;
创建topic需要指定replication-factor(复制数目,副本数,通常等于broker数目);每个topic可能有多个分区(partition),
每个分区的消息内容不会重复。
事件: 事件记录了世界或您的业务中“发生了某事” 的事实。在文档中也称为记录或消息。当您向 Kafka 读取或写入数据时,
您以事件的形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的元数据标头。

主题是分区的,这意味着一个主题分布在位于不同 Kafka 代理上的多个“桶”中。数据的这种分布式放置对于可伸缩性非常重要,
因为它允许客户端应用程序同时从多个代理读取和写入数据。当一个新事件发布到一个主题时,它实际上是附加到主题的分区之一。
具有相同事件键(例如,客户或车辆 ID)的事件被写入同一个分区,并且 Kafka保证给定主题分区的任何消费者将始终以与写入
事件完全相同的顺序读取该分区的事件。

#个人理解
kafka是把数据存在topic中,每个topic包括1个或者多个partition,partition可以分布在各个broker(这个实现了kafka的分布式
特性),其中partition有副本数,副本支持读操作。
一个分区保证了数据的顺序性,多个分区数据顺序性就没有了;分区里面是切片,切片是一个顺序的存储数据。
集群是用zookeeper来管理,默认带的。看架构图,其实架构图已经全部解释了。

8. 部署
#步骤:
1、安装java运行环境jdk,因为kafka是java开发的。
2、下载kafka,解压。
3、先配置zookeeper,zookeeper管理kafka集群。
4、再配置kafka(有个连接zookeeper参数要配置),启动kafka
5、先启动zookeeper,再启动kafka。
6、测试

  • zookeeper配置
#安装java运行环境jdk
yum install -y epel-release
yum install -y java-1.8.0#java运行环境在epel仓库,updates仓库也可以

#下载kafka,解压,配置zookeeper,配置kafka,启动。
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar -xf kafka_2.12-2.8.0.tgz
cd /opt/elk/
mv kafka_2.12-2.8.0 kafka

#下面先进入zookeeper配置,kafka配置。
#第一台机器开始:
[root@elk3 config]# vi zookeeper.properties
dataDir=/opt/elk/kafka/data#zk数据存放目录
dataLogDir=/opt/elk/kafka/logs#zk日志存放目录
# the port at which the clients will connect
clientPort=2181#客户端连接zk服务的端口
tickTime=2000#表示心跳时间间隔。zk服务器之间,或客户端与服务器之间维持心跳的时间间隔。
initLimit=20#允许follower连接并同步到leader的初始化连接时间,当初始化连接超过该值,表示连接失败
syncLimit=10#leader和follower之间发送消息时候,两者不能相互通信,那么follower将会被丢弃。
server.1=192.168.68.129:2888:3888
server.2=192.168.68.22:2888:3888
server.3=192.168.68.19:2888:3888
#2888是follower与leader交换信息的端口;3888是当leader挂了用来执行选举时候相互通信的端口。是关于zk集群的配置,
index代表自定义的各服务器序号(对应写在各自机器的myid中),IP即各服务器IP,A表示集群服务器之间通讯组件的端口,
B表示选举组件的端口。如果配成集群,则zk状态(mode)为leader或follower;如果只配一个单机,mode为standalone。

# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
#zookeeper(3.5版本后)内置jetty管理控制台,可以通过http://localhost:8080/commands/stat访问。关闭该控制台或者重新
设置端口可以通过如下几种方式操作:
#1)启动脚本中修改 -Dzookeeper.admin.serverPort=xxx
#2)zoo.cfg配置文件中 修改配置admin.serverPort=xxx
#3)-Dzookeeper.admin.enableServer=false 在启动脚本中关闭管理控制台

[root@elk1 config]# mkdir /opt/elk/kafka/data,logs
[root@elk3 config]# echo 1 > /opt/elk/kafka/data/myid#表示这台机器在机器里的顺序,按id顺序排
[root@elk3 config]# cat /opt/elk/kafka/data/myid
1

#第二台机器开始
zookeeper.properties和机器1一样
[root@elk1 config]# mkdir /opt/elk/kafka/data,logs
[root@elk1 config]# echo 2 > /opt/elk/kafka/data/myid

#第三台机器开始
zookeeper.properties和机器1一样
[root@elk1 config]# mkdir /opt/elk/kafka/data,logs
[root@elk1 config]# echo 3 > /opt/elk/kafka/data/myid

  • 配置kafka
#server.properties配置文件分为一下几个部分:
Server Basics
Socket Server Settings
Log Basics
Internal Topic Settings
Log Flush Policy
Log Retention Policy
Zookeeper
Group Coordinator Settings

#第一台机器
[root@elk1 config]# cat server.properties
#### Server Basics #####
broker.id=1#改
##### Socket Server Settings ####
listeners=PLAINTEXT://192.168.68.129:9092#改,写具体的ip地址,不要写0.0.0.0否则,advertised.listeners参数不支
持0.0.0.0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/elk/kafka/logs
### Log Basics ####
num.partitions=1
num.recovery.threads.per.data.dir=1
### Internal Topic Settings###
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
##### Log Flush Policy ###
#### Log Retention Policy ####
log.retention.hours=168 #数据保留7*24小时
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#### Zookeeper ####
zookeeper.connect=192.168.68.129:2181,192.168.68.22:2181,192.168.68.19:2181
zookeeper.connection.timeout.ms=12000
#### Group Coordinator Settings ###
group.initial.rebalance.delay.ms=3

#第二台,第三台修改broker.id和listeners就可以了。

  • 启动和测试
#先启动zookeeper,再启动kafka。
#1,先启动zookeeper,3台机器都启动,然后看下2181端口启动起来没。
[root@elk3 kafka]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
[root@elk3 kafka]# ss -ant|grep 2181
LISTEN050:::2181:::*

#2,启动kafka
nohup bin/kafka-server-start.sh config/server.properties &

    推荐阅读