Kafka -- 元数据的拉取流程
上一篇已经知道元数据在什么时候拉取的,我们这里看看他整个拉取的流程。
topics
元素的拉取,是根据生产者的发送的topic来拉取的,并不是拉取所有的元数据,所以在发送消息的时候,就会把这个topic存放在topics中,这个topics就是记录了当前已有的topic,数据类型是map,他的key就是topic,value刚开始默认为-1。
所以topic1发送的时候,就会在topics新增一个topic1->-1的键值对。
文章图片
版本号和更新标识
由于元数据的拉取,是由Sender线程来执行的(这个后面流程会讲到),所以需要一个更新标识needUpdate,来告知Sender线程,我需要拉取元数据。
同时,Sender线程在成功拉取元数据后,也需要告知其他线程,这里就需要一个版本号version,每次成功拉取,就进行累加,所以其他线程就会拿自己的版本号跟内存里的版本号进行对比,发现内存的版本号比自己大,那就是拉取元数据成功了。
【Kafka -- 元数据的拉取流程】在拉取的时候,此时version=0,当前线程保存的version也是0,needUpdate为true,说明需要拉取元数据。
文章图片
sender线程
sender线程有一个死循环,他会一直运行,元数据的发送以及消息的发送,都要通过ender线程。
版本号和更新标识准备好后,就开始唤醒sender线程,实际上sender线程最终会唤醒NIO的Selector上的线程。
这里涉及到网络传输部分,我们就简要的讲一下,网络传输等后面再讲。简单的说,就是sender线程会把请求交给NIO的Selector,然后再处理Selector收到的请求。
文章图片
sender线程处理完消息后,就会更新元数据里的信息,然后版本号累加,needUpdate改为false。
上面topics中,topic1对应的value是1,此时已经拉取到元数据了,这里的value就要改为当前时间+5 60 1000,即当前时间+5分钟。意思是当某个topic超过5分钟没有发送消息,就会从topics中移除,下次更新的时候,就不会拉取这个topic的元数据信息。
文章图片
休眠
sender线程辛辛苦苦拉取元数据的时候,发送消息的线程在干什么呢?
他进入了休眠,也就是wait,被唤醒有两种情况,一个是sender线程更新完元数据再唤醒他,另外一个是休眠时间到了,他会有一个最大等待时间,默认60s。
被唤醒后,就会检查当前拉取的时间是不是超过了,如果超过60s还没有拉取到元数据,此时就要抛异常的。
另外还要判断,当前保存的版本号是不是小于内存的版本号,如果小于,他就知道更新成功了,反之,说明还没更新成功,他就会继续休眠,等待下一次唤醒。
文章图片
缓存
Sender拉取元数据后,是保存在内存中的,这样下次发送消息的时候,就直接从内存拿了,并不会每次都向上面的流程一样,一次次的从Kafka拉取元数据,既降低了生产者发送消息的效率,也加大了Kafka的压力。
如果某个topic在5分钟内会至少发一条消息,那个这个topic就会保留在topics中。生产者客户端每5分钟就会更新元数据,所以持续发送的消息,他在缓存中的元数据都会一直的更新。
推荐阅读
- Docker应用:容器间通信与Mariadb数据库主从复制
- 标签、语法规范、内联框架、超链接、CSS的编写位置、CSS语法、开发工具、块和内联、常用选择器、后代元素选择器、伪类、伪元素。
- 我用芋圆和芋饺祝大家元宵节快乐
- 使用协程爬取网页,计算网页数据大小
- Java|Java基础——数组
- Python数据分析(一)(Matplotlib使用)
- Jsr303做前端数据校验
- Spark|Spark 数据倾斜及其解决方案
- 数据库设计与优化
- 默示录【二】