kafka原理及设计思想
1.基本介绍 kafka是一个分布式消息系统中间件,主要在分布式环境下为各个系统提供消息传递服务。其最令人印象深刻的特点是高吞吐量、超强消息堆积、持久化能力、快速的消息get、put。 基本介绍环节对kafka的主要组成部分以及一些名字做一些解释。 PS: 我们这里部分设置细节以kafka.0.10.x为例 1.1 consumer,producer,broker
文章图片
文章图片
从上图可以看到kafka的核心组件是Producer、broker和consumer。其名次定义如下:
- Consumer:用于从Broker中取出/消费Message。
- Producer:用于往Broker中发送/生产Message。
- Broker:Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。
文章图片
文章图片
文章图片
文章图片
该图可以看到,消息是按照主题来提交到Partition当中的。Partition当中的消息是有序的,consumer从一个有序的分区消息队列中顺序获取消息。相关名次定义如下: 1.Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上。 2.Partition:是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition。 3.offset:消息在Partition中的编号,编号顺序不跨Partition。
文章图片
文章图片
总结如下: 1.分区目的:Kafka中采用分区的设计有几个目的。一是可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以不受限的处理更多的数据。第二,分区可以作为并行处理的单元。 2.offset:由消费者控制offset,因此分区本身所在broker是无状态的。消费者可以自由控制offset,很灵活 3.同个分区内有序消费:每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。 1.3 partition复制 每个分区都有自己的镜像分区,来保证分区的高可用。其中一个称为leader。如果leader挂掉了,也会有相应的选举算法来选新的leader。 1.Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。 2.Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。 3.ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。
文章图片
1.4 消费者组 消费者组提供两种消费TOPIC的方式:
- 只有一个消费者组:保证消费者组内负载均衡的读取消息
- 多个消费者组:每个消费者组理解为一个独立的个体看成订阅了tipic。下图可以看到分区上的消息会完整、负载均衡地广播给一个消费者组内的消费者
文章图片
1.5 消息分发语义(消息可靠性问题) 对于kafka来说,以下两个方面来保障消息分发的可靠性: 消息发送的可靠性保障(producer) 消息消费的可靠性保障(consumer) 1.5.1 消息发送可靠性保证
- acks=0 --- producer不等待broker的acks。发送的消息可能丢失,但永远不会重发。
- acks=1 --- leader不等待其他follower同步,leader直接写log然后发送acks给producer。这种情况下会有重发现象,可靠性比only once好点,但是仍然会丢消息。例如leader挂了了,但是其他replication还没同步完成。
- acks=all --- leader等待所有follower同步完成才返回acks。消息可靠不丢失(丢了会重发),没收到ack会重发。
文章图片
acks在producer配置文件中配置 0.10版的kafka参数配置具体见 apache kafka官网 1.5.2 消息消费可靠性保障 消费者的可靠性保障(关键是保存offset的时机):
- 至多一次(at most once):读取消息->保存offset->处理消息。处理消息时崩溃则会丢失消息,因为此时offset已经改变了。
- 至少一次(at least once):读取消息->处理消息->保存offset。保存offset失败,会造成重复消费,但是不会丢消息。如果重读消费时幂等操作,那就不会出现重复消息了。前面2个步骤失败可以在offset位置重新消费。
- 有且仅有一次(exactly once):保存offset和处理消息这两个环节采用two-phase commit(2PC)。但是,在Kafka中,一种更简单的方法就是可以把offset和处理后的结果一起存储。有点把处理结果和offset做成原子性的感觉。这样可以避免重复消费。
文章图片
也就是说broker在分发消息的时候暂还不支持exactly once的分发语义。 2. 组件实现和设计原理 2.1 分区 之所以设计分区的概念是从以下几个角度来考虑的: 2.1.1 灵活性(负载均衡控制、灵活消费)
- Kafka允许Partition在集群内的Broker之间任意移动,以此来均衡可能存在的数据倾斜问题。
- Partition支持自定义的分区算法,例如可以将同一个Key的所有消息都路由到同一个Partition上去。
- 同时Leader也可以在In-Sync的Replica中迁移。由于针对某一个Partition的所有读写请求都是只由Leader来处理,所以Kafka会尽量把Leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。
- 分区有偏移量的概念。消费者通过控制偏移量,可以灵活的消费消息。
- 数据重排序、MessageSet等手段来使得消息批量顺序写入
- 数据压缩
- 异步发送
- 负载均衡
- acks=0:关闭ack,以全速发送
- acks=1:消息只需要被Leader接收并确认即可,其他的Replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低
- acks=all:消息要Commit到该Partition的ISR集合中的所有Replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着Replica的数量正比增长,这里就需要根据不同的需求做相应的优化。
- 如果在Heap内管理缓存,JVM的GC线程会频繁扫描Heap空间,带来不必要的开销。如果Heap过大,执行一次Full GC对系统的可用性来说将是极大的挑战。
- 所有在在JVM内的对象都不免带有一个Object Overhead(千万不可小视),内存的有效空间利用率会因此降低。
- 所有的In-Process Cache在OS中都有一份同样的PageCache。所以通过将缓存只放在PageCache,可以至少让可用缓存空间翻倍。
- 如果Kafka重启,所有的In-Process Cache都会失效,而OS管理的PageCache依然可以继续使用。
- OS 从硬盘把数据读到内核区的PageCache。
- 用户进程把数据从内核区Copy到用户区。
- 然后用户进程再把数据写入到Socket,数据流入内核区的Socket Buffer上。
- OS 再把数据从Buffer中Copy到网卡的Buffer上,这样完成一次发送。
文章图片
整个过程共经历两次Context Switch,四次System Call。同一份数据在内核Buffer与用户Buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是Sendfile所解决的问题,经过Sendfile优化后,整个I/O过程就变成了下面这个样子。
文章图片
3.5 消息压缩 Producer支持End-to-End的压缩。数据在本地压缩后放到网络上传输,在Broker一般不解压(除非指定要Deep-Iteration),直至消息被Consume之后在客户端解压。 当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟Kafka目前支持的压缩算法有限,只有GZIP和Snappy),不过这样做反而会意外的降低效率!!!! Kafka的End-to-End压缩与MessageSet配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。 4. kafka分区repelication实现高可用原理 kafka每个主题分区的复制日志跨多个可配置的服务器(可设置 topic-by-topic 的复制因子),允许自动故障转到这些副本,当集群服务器发生故障时,消息仍可用。 kafka通过分区的复制,来实现高可用。当leader挂了,可以重新选举新的leader来保证消费的高可用. 4.1 选举算法(选日志最完整的作为新leader) 和ZK不同,不采用议员投票(Quorum)的方式,而是选取复制日志最完整的节点作为leader。这里相比Quorum就需要一些额外的操作,比如判断到底怎样才算是“日志最完整”,这样就需要一些额外的开销。 kafka采用了一种稍微不同的方法选择quorum集,而不是多数投票,kafka动态维护一组同步副本(ISR),就是以后的leader,只有这个组的成员才又资格当选leader,kafka副本写入不被认为是已提交,直到所有的同步副本已经接收才认为。这组ISR保存在zookeeper,正因为如此,在ISR中的任何副本都有资格当选leader,这是kafka的使用模型,有多个分区和确保leader平衡是很重要的一个重要因素。有了这个模型,ISR和f+1副本,kafka的主题可以容忍f失败而不会丢失已提交的消息。 这种投票表决的方式有一个非常好的特性:仅依赖速度最快的服务器,也就是说,如果复制因子为三个,由最快的一个来确定。 如何定义一个节点是活着: 与大多数分布式系统自动处理失败需要精确的定义一个节点什么是“活着”,对于kafka的节点活着有2个条件:
- 一个节点必须能维持与zookeeper的会话(通过zookeeper的心跳机制)
- 如果它是一个slave,它必须复制写入的leader并且不能落后"太多"
- Partition的数量尽量提前预分配,虽然可以在后期动态增加Partition,但是会冒着可能破坏Message Key和Partition之间对应关系的风险。
- Replica的数量不要过多,如果条件允许尽量把Replica集合内的Partition分别调整到不同的Rack。
- 尽一切努力保证每次停Broker时都可以Clean Shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。
- 单机分区数不宜过多,否则会造成发端到端延迟变长。如果比较重视延迟,建议分区数的值小于100乘以broker数量再乘以复制因子。该公式来自于confluent的文章:How to choose the number of topics/partitions in a Kafka cluster?
- 虽然跨分区不能保证全局有序消费,但是一般只要按照消息有序的KEY散列到不同的分区上,然后由多个不同的消费者并发消费。最后做排序也很简单。因为每个分区的消费都是有序的。如果一定要一开始就做到全局严格有序,可以只用一个分,当然效率会低不少。
文章图片
> bin/kafka-mirror-maker.sh --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic PS:白名单支持JAVA STYLE的正则 6.2 硬件 CPU和内存:Linkdin采用双路四核Intel Xeon,24GB内存。足够的内存来缓冲活跃的读和写。 磁盘吞吐量:磁盘越多越好。Linkdin采用8*7200 rpm SATA驱动器(经常强制刷新建议上SAS) 6.3 JAVA相关 最佳实践:建议采用最新的JDK8(7当然也支持) Linkedin配置参考: -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 Linkedin一个kafka商用集群,峰值时的数据: 60 brokers50k partitions (replication factor 2)800k messages/sec in300 MB/sec inbound, 1 GB/sec+ outbound90%的broker GC暂停时间为21ms左右。每秒进行的young GC小于1次 6.4 操作系统 最佳实践:
- 使用LINUX,WINDOWS的支持不是最好。
- 提升文件描述符的数量从而支持大量会话和连接
- 增大socket buffer保证数据中心之间高性能数据传输
- 负载失衡:如果你配置多个数据目录分区,将会被循环分配数据目录,每个分区将完全在一个数据目录。如果数据没有被分区之间很好的平衡,可能导致磁盘之间负载失衡
- 为较大的写入吞吐量做优化,会减少可用磁盘空间
- 容忍磁盘故障,有磁盘冗余
- 使用SSL或SASL(Kerberos),验证连接到broker上的客户端(生产者或消费者)、其他的broker和工具。也可以在最新的0.10.0.0中使用SASL/PLAIN.
- 从broker连接到Zookeeper的身份验证。
- broker和client之间的数据传输,broker之间,或使用SSL的broker和工具之间的数据加密(注意,当SSL时,性能会降低,其幅度取决于CPU类型和JVM)。
- client的read/write操作验证。
- 验证是插拔的,支持外部认证服务集成。 值得注意的是,安全是可选的 - 支持非安全集群,以及混合认证,未经认证,加密和非加密的客户端。下面的指南介绍如何配置和使用client和broker的安全特性。
推荐阅读
- JS中的各种宽高度定义及其应用
- 做一件事情的基本原理是什么()
- 参保人员因患病来不及到指定的医疗机构就医,能否报销医疗费用()
- MybatisPlus|MybatisPlus LambdaQueryWrapper使用int默认值的坑及解决
- 【读书笔记】贝叶斯原理
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- 经历了人生,才知道人生的艰难!及精彩!
- SG平滑轨迹算法的原理和实现
- 罗塞塔石碑的意义(古埃及文字的起源,圣书体文字是如何被破解的)
- 以太坊中的计量单位及相互转换