kafka原理剖析(3)-producer消息发送之缓冲区
1 整体发消息流程
文章图片
(1)第一步,等元数据拉取,上一回说过。
(2)元数据到位,对topic和key进行序列化。
(3)选取partition,3种情况
a 如果消息里指定了patition的序号,先用指定的。但一般不会这么
b 没指定key,就用个原子int自增,和size取模选择partition,相当于轮询。
C 指定了key,那就把序列化后的key+topic,转化成hash,再取模。
文章图片
(4)对消息做大小校验,包括消息本身大小是否超过单条限制,是否超过缓冲区大小。
(5)消息进入缓冲区,Acumulator,重点,后面
sender线程会操作缓冲区,进行网络发送/接收
。(6)如果batch满了,或者有新batch了,唤醒send,准备发老batch。
2 缓冲区RecordAcumulator里加入消息-概览
文章图片
(1)RecordAccumulator的数据结构,重点是batches这个map,
topic->deque队列->batchs的三层结构
(2)发消息的第一步,就是把消息放入这个accumulator里,里面的deque,batch,batch对应的buffer,都需要初始化过程,初始化完成以后,只需要把对应消息放到某个topic的batch里就行了。
(3)每个组件的初始化,都保证了线程安全。
3 RecordAccumulator加入消息细节解析
根据上图,看看实现细节:
(1)getOrCreateDeque, 第一次来是空的,建立一个加入batches,上图看到batches是个concurrentHashMap,key是topicPartition,对应的hash值是topic + "-" + partition,就算是并发send,这里也能保证线程安全。
文章图片
(2)tryappend, 尝试写入消息,
文章图片
a 可以看到,tryappend是拿到topic+partition的最近一个recordbatch,加入消息。
b RecordBatch里面有MemoryRecords,封装了底层buffer。
c 写入数据的过程,其实是通过compressor组件,把消息格式解析成offset|size|crc|magic|attributes|timestamp|key size|key| value size | value 固定的kafka报文格式,然后通过输入流写入对应的buffer。
对于一般的异步写消息过程,其实这里写入成功了send就返回了。
由此可见,一个topic+patition,对应一个发送队列,对应n个发送批次,同时也对应n个发送缓冲区。
(3)第一次发这个topic-partition的消息,是没有batch的,也就没有对应的内存缓冲区,对应上图右上角返回null的时候,就要去
申请内存缓冲buffer
。(4)得到buffer以后,buffer封装到MemoryRecords里面,然后封装到RecordBatch代表一个批次,然后调用该batch的tryAppend,这次就可以写入了,最后把batch放入acummulator对应的topic-partition队列里。
4 申请内存缓冲区的过程
文章图片
(1)第一次准备往某个topic的partition发消息的时候,RecordAccumulator里的对应的deque里肯定没有它的缓冲队列batch,那么就需要申请batch批次。
bach批次的本质是封装的ByteBuffer
,这又需要从机器内存申请。(2)申请内存,需要从RecordAccumulator的BufferPool来申请,这是个buffer池,记录了buffer队列Deque
(3)申请缓存,如果剩余缓存够,那么分两种情况
a如果来申请的内存就是批次大小,并且buffer池里有现成的,那么直接poll一个返回
文章图片
b如果申请的大小大于一个批次大小,并且剩余缓存够,那么就从buffer池里释放一些缓存出来,分配一个大的缓存buffer出去。freeup就是while循环释放buffer池的buffer,直到剩余缓存够分配buffer为止。
【kafka原理剖析(3)-producer消息发送之缓冲区】
文章图片
文章图片
(4)如果剩余缓存不够申请的,那就阻塞,等到别的线程释放出内存资源再来唤醒本线程,这里的唤醒可能是发送完一个batch的时候,batch空了就把占用的buffer还回队列里。
文章图片
假设唤醒了,这里要检查两个方面,一个是唤醒后剩余缓存或buffer池有没有空余了,一个是等待时间有没有超时(这里时间指的是producer的send的超时时间)。然后从
buffer池里分配一个或者内存池里分配一份buffer出来
。文章图片
5 发消息缓冲区的数据结构
由此可见,简化版的发送缓冲区的数据结构,
集成了消息批次划分,缓冲区暂存和系统内存分配三个功能
文章图片
推荐阅读
- 做一件事情的基本原理是什么()
- 【读书笔记】贝叶斯原理
- SG平滑轨迹算法的原理和实现
- “写作宝典”《金字塔原理》之读书笔记
- 深入浅出谈一下有关分布式消息技术(Kafka)
- Spring|Spring 框架之 AOP 原理剖析已经出炉!!!预定的童鞋可以识别下发二维码去看了
- Spring|Spring Boot 自动配置的原理、核心注解以及利用自动配置实现了自定义 Starter 组件
- Vue源码分析—响应式原理(二)
- MYSQL主从同步的实现
- (1)redis集群原理及搭建与使用(1)