rocketmq2

Broker处理Topic创建

1.更改本地topic配置缓存topicConfigTable 2.将缓存topicConfigTable配置信息写入磁盘 3.向NameServer上报变更信息 4.主从同步变更信息源码入口 broker端AdminBrokerProcessor#processRequest

broker定时任务
ScheduleMessageService:每隔10S持久化每个延时队列的投递进度ConsumerOffsetManager:Broker每隔5S持久化消费进度,将 ConsumerOffsetManager#offsetTable 属性序列化到consumerOffset.json 文件,以覆盖的形式重新写入,offsetTable 是一个Map类型的属性,key 是:topic@consumeGroup ,value是每个ConsumeQueue的消费进度,也是一个集合,key是id,value是offsetFlushConsumeQueueService: 每隔1S执行刷新ConsumeQueue,当某个ConsumeQueue新写入的数据超过2页(8kb),强制Flush数据至磁盘;同时每隔60S对所有的ConsumeQueue执行一次flush,不管新写入数据量ReputMessageService:每隔1ms进行一次Reput工作,将新消息的位置信息存入ConsumeQueue,key信息存入IndexFile,同时唤醒那些订阅了新消息所属队列的消费者请求,让它们执行消息的拉取工作CommitRealTimeService(开启写入缓冲池):将缓冲池中的数据Commit到CommitLog的FileChannel中FlushRealTimeService(异步写):每500ms对CommitLog进行一次Flush,当新写入数据超过16KB,或者距离上次Flush的时间间隔超过10S,将CommitLog位于内存中的数据同步到磁盘文件CleanCommitLogService:每隔10S执行一次清理失效CommitLog日志文件,默认清理72h之前的CleanConsumeQueueService:每隔10S执行一次清理失效ConsumeQueue和IndexFile文件PullRequestHoldService:持有针对每个ConsumeQueue的消息PullRequest,每隔5S,根据条件:maxOffset > pullFromOffset 来确定是否要唤醒订阅相应ConsumeQueue的PullRequestClientHousekeepingService:每隔10S扫描持有的ProducerChannel,ConsumerChannel,FilterChannel,将那些超过2m没有发送心跳的连接关闭掉每隔30S向指定的一个或多个Namesrc注册 Broker信息

consumeQueue生成
ReputMessageService继承ServiceThread是一个线程服务,服务启动后每间隔1毫秒调用一次doReput方法,doReput方法会调用CommitLogDispatcher进行消息分发。 步骤: 获取index文件的mapFile mapFile放入msgconsumerQueue是帮助消费者找到消息的索引文件。里面存放的就是每条消息的起始位点和消息的大小还有tag的hashCode。

index生成及结构
index文件根据key可以快速的检索到对应的消息,key分为两种,一种是UniqKey,系统自动生成的(createUniqID()函数生成,类似uuid)。还有一种是自定义的key,放在message的property中的,生产者指定的。总的来说就是解决一个文件中查找一条消息。 入口: ReputMessageService继承ServiceThread是一个线程服务,服务启动后每间隔1毫秒调用一次doReput方法,doReput方法会调用CommitLogDispatcher进行消息分发。 步骤: 获取index文件的mapFile mapFile放入msg

MappedFileQueue
CommitLog消息存储、ConsumeQueue等通常会记录大量的数据,一个MappedFile具有固定大小(默认1G),所以一个MappedFile不能记录所有的内容,于是CommitLog、ConsumeQueue通常会使用多个MappedFile记录数据,RocketMQ则使用MappedFileQueue组织一系列的MappedFile,处在MappedFileQueue队尾的通常是刚写满的或者还有部分空间或者刚分配的MappedFile,每次写操作时,都会从队尾拿到最后一个MappedFile进行写。如果启动时配置了启用transientStorePoolEnable,那么在DefaultMessageStore构造函数中会调用TransientStorePool.init方法,预分配ByteBuffer并放入队列中,并且会锁住这些内存,防止操作系统进行置换。只有主Broker、刷盘方式为异步刷盘且transientStorePoolEnable为true才会启用暂存池。没有开启transientStorePoolEnable,创建的时候就会生成一个文件,然后对文件进行映射。 如果transientStorePoolEnable,那么每次在创建的时候,不但会生成一个文件,然后对文件进行映射,还会从TransientStorePool里面"借"一块堆外内存(堆外内存已经申请好了)作为writeBuffer,然后每次写数据的时候,优先写入writeBuffer。每次写数据时,都会从MappedFileQueue中获取最后一个MappedFile,如果MappedFileQueue为空,或者最后一个MappedFile已经写满,则会重新分配一个新的MappedFile。如果写数据的时候,剩余的空间不够写入(写入的数据大小+结束标志>剩余容量),就会把剩余的空间写入文件结束标志,然后返回 END_OF_FILE。然后重新申请一个mappedFile,重新写入。MappedFile刷盘操作根据具体配置分为同步和异步刷盘两种方式,这里不管同步异步,其操作类似,都是通过MappedFile.commit和MappedFile.flush,如果启用了暂存池TransientStorePool则会先调用MappedFile.commit把writeBuffer中的数据写入fileChannel中,然后再调用MappedFile.flush;而MappedFile.flush通过fileChannel.force或者mappedByteBuffer.force()进行实际的刷盘动作。如果writeBuffer写满了,就会归还给TransientStorePool,writeBuffer置为null。

mq储存对比
https://www.zhihu.com/question/346540432

mq事务
服务端 1:收到事务 修改消息topic为RMQ_SYS_TRANS_HALF_TOPIC,并备份消息原有topic,供后续commit消息时还原消息topic使用 修改消息queueId为0,并备份消息原有queueId,供后续commit消息时还原消息queueId使用 正常的存储信息。因为topic被改变,所以无法消费 2:事务响应 判断来自事务检查 是则 进行执行操作 判断事务状态 是提交 还是回滚 还是pending状态 构造OperationResult 根据提交事务还是回滚事务进行提交或者回滚消息 提交事务 检查准备消息 返回remotingCommand 结束消息事务 endMessageTransaction使用之前存储的真实topic和queueId重新构建一个新的消息 刷盘处理 将新的消息写入到commitLog 刷盘成功 进行删除 deletePrepareMessage,将消息的offset当作消息提放入opQueue 回滚事务 检查准备消息 返回remotingCommand 返回成功状态 不需要做什么 因为真实的消息一直还没落盘 故也不需要删除 进行删除 deletePrepareMessage,将消息的offset当作消息提放入opQueue 返回OperationResult结果 3:回查 开了一个定时任务定时回查。不断地消费halfQueue里面的消息, 如果发现opQueue中含有这个消息的offset,代表已经处理过了。 没有发现,就检测重插次数、事务超时时间、立马检测事务的时间等,把这条消息重新放回halfQueue,发送回查请求给客户端。客户端根据本地事务状态发送提交、回滚。https://blog.csdn.net/hosaos/article/details/90240260

nameserver
nameServer管理broker。在broker注册的时候,会通过registerBrokerAll把topic信息(包含queue的信息)、地址、brokerId、brokerName上报到nameServer。broker在启动的时候,定时任务调用registerBrokerAll。在更新broker配置、创建topic的时候,也会调用registerBrokerAll。brokerName->broker地址 topic->queueData cluster->brokerName brokerAddr->brokerl心跳信息 brokerAddr->filterhttps://www.jianshu.com/p/3d8d594d9161

pagecache的读写
读: 加pagecache锁,从hash表中查找page, 如果存在 page引用计数加一,放锁(我们只在对PageCache相关的数据结构进行读写时需要加pagecache锁,当我们将page取出来,并且引用计数加了一以保证page不会被回收,之后的page读写,就会以page为粒度进行并发控制) 如果page是最新的:读它!然后引用减少一 如果page不是最新的 对page加独占锁(可能会sleep) 拿到锁后,查看page是否是最新的(因此sleep时,其他进程可能已经读了此page) 调用mapping->a_ops->readpage来读入page 如果出错:释放引用,并返回。注意,如果出错,page锁会被直接放掉 否则wait_on_page:这个读是由底层完成的,读期间,page是锁着的,当读操作完成,page会被放锁,于是wait_on_page会醒来 现在page是最新的了,化归成之前的情况了 如果不存在 放pagecache锁 使用page_cache_alloc分配一个page 加pagecache锁 再次查找hash表,如果存在,则可以化归成之前的情况了。注意,为什么这里要再次查找呢?因为分配page期间放开了pagecache锁,此时,其他线程可能已经分配了需要的page,于是,我们需要进行一次双重检查 将page加入到pagecache 放pagecache锁 将page加入lru,注意,lru使用单独的锁 于是,问题又化归为上面的情况(代码中用了goto)写: 计算要写的page在文件地址空间中的index __grab_cache_page(mapping, index, &cached_page):从PageCache中取出需要的page,并加锁,如果page不存在,会分配一个,并且加入到PageCache中 注意,此时,当前线程持有page的引用,并且page处于加锁状态,此过程和读过程类似,只不过还没有放锁。 mapping->a_ops->prepare_write(file, page, offset, offset+bytes) __copy_from_user mapping->a_ops->commit_write(file, page, offset, offset+bytes) 对page放锁 释放page的引用计数冲突: 读和写在寻找对应page的时候都需要加pagecache锁。 在写请求对page加锁后,此一页的读请求需要等到写请求锁释放才能读。//https://zhuanlan.zhihu.com/p/42364591

put消息时加锁
RocketMQ在写入消息到CommitLog中时,使用了锁机制,即同一时刻只有一个线程可以写CommitLog文件同步刷盘时,刷盘一次需要的时间相对较长,锁竞争激烈,会有较多的线程处于等待阻塞等待锁的状态,如果采用自旋锁会浪费很多的CPU时间,所以“同步刷盘建议使用重入锁”。异步刷盘是间隔一定的时间刷一次盘,写入隐射内存就返回,对锁的持有不久,所以锁竞争不激烈,不会存在大量阻塞等待锁的线程,偶尔锁等待就自旋等待一下很短的时间,不要进行上下文切换了,所以采用自旋锁更合适。

request_reply
Producer发送request时创建一个RequestResponseFuture,以correlationId为key,RequestResponseFuture为value存入map,同时请求中带上RequestResponseFuture中的correlationId已经客户端的clientId,收到回包后根据correlationId拿到对应的RequestResponseFuture,并设置回包内容。Producer端还启动了一个定时任务扫描map,检测request是否超时。request就是用send发送。 同步request:每个RequestResponseFuture里面有一个闭锁countDownLatch,当收到此条消息的reply后解锁。producer在发送消息的时候,会给每条消息生成唯一的标识符,同时还带上了producer的clientId。当consumer收到并消费消息后,从消息中取出消息的标识符correlationId和producer的标识符clientId,放入响应消息,用来确定此响应消息是哪条请求消息的回包,以及此响应消息应该发给哪个producer。同时响应消息中设置了消息的类型以及响应消息的topic,然后consumer将消息发给broker。服务端接受request和接受send没区别。服务端有处理reply请求的逻辑,根据配置还可以存储reply消息。broker收到响应消息后,需要将消息发回给指定的producer。Broker如何知道发回给哪个producer?因为消息中包含了producer的标识符clientId,在ProducerManager中,维护了标识符和channel信息的对应关系,通过这个对应关系,就能把回包发给对应的producer。

retry消息
Retry消息即consumer消费失败,要求broker重发的消息。失败的原因有两种,一种是业务端代码处理失败;还有一种是消息在consumer的缓存队列中待的时间超时,consumer会将消息从队列中移除,然后退回给Broker重发消息会重新发往%RETRY%+consumerGroup topic,这个topic每个broker上有一个,读和写队列数量为1。消息到达broker上,会放入SCHEDULE_TOPIC_XXXX下,然后定时任务会读取对应的consumerQueue,从commitlog中读出消息放入retry topic的队列中,客户端都订阅了retry topic,此时就能将消息拉取下来了。retry topic是一个consumer一个。每个broker上会有一个。

tag使用
同一个group 使用的tag必须一致,不然后注册的会覆盖前面注册的,导致消息丢失,tag过滤在服务端通过hash过滤,客户端通过equal过滤。会导致tag1的消息在服务端被过滤了,tag2的部分消息负载到了consumer1,导致consumer1过滤了。consumer2只能够消费部分消息。

rebalance时机
触发条件这个rebalance的条件有:每20s定时刷新(准确说上次刷新后等20s, @see RebalanceService#run 收到Broker告知的Consume变化通知时@see ClientRemotingProcessor#notifyConsumerIdsChanged 每次Client启动时@see DefaultMQPushConsumerImpl#start。每次client上报心跳,服务端会判断是否有配置变化或者是新的id加入。如果是,那么会通知这个group下所有的client进行rebalance。client下线也会通知这个group下所有的client进行rebalance

【rocketmq2】磁盘指标
rrqm/s:每秒进行merge的读操作数目。即delta(rmerge)/s wrqm/s:每秒进行merge的写操作数目。即delta(wmerge)/s r/s:每秒完成的读I/O设备次数。即delta(rio)/s w/s:每秒完成的写I/0设备次数。即delta(wio)/s rsec/s:每秒读扇区数。即delta(rsect)/s wsec/s:每秒写扇区数。即delta(wsect)/s rKB/s:每秒读K字节数。是rsec/s的一半,因为每扇区大小为512字节 wKB/s:每秒写K字节数。是wsec/s的一半 avgrq-sz:平均每次设备I/O操作的数据大小(扇区)。即delta(rsect+wsect)/delta(rio+wio) avgqu-sz:平均I/O队列长度。即delta(aveq)/s/1000(因为aveq的单位为毫秒) duiw await:平均每次设备I/O操作的等待时间(毫秒)。即delta(ruse+wuse)/delta(rio+wio) svctm:平均每次设备I/O操作的服务时间(毫秒)。即delta(use)/delta(rio+wio)!!!svctm越接近于await则说明等待时间少 %util:一秒中有百分之多少的时间用于I/O操作,或者说一秒中有多少时间I/O队列是非空的。80%表示设备已经很忙了,即delta(usr)/s/1000(因为use的单位为毫秒)rqsize:The average size (in sectors) of the requests that were issued to the device. qusize:The average queue length of the requests that were issued to the device. qusize = (rs+ws)*await/1000 rocketmq如果磁盘压力大,表现为: 1:io util 几乎达到100% 2:r/s(每秒完成的读I/O设备次数),上升 解释:rocketmq如果正常情况下,都是读热数据,基本不需要去磁盘读,如果读历史消息,会导致io指向磁盘,r/s读的次数增加,此时因为读写都用的是pagecache,所以写请求的处理会变慢,w/s写的次数会变多。

    推荐阅读