RocketMQ阅读笔记之消息存储

消息存储部分是RocketMQ的重要组成部分,良好的存储机制会有效降低延迟,提高整体效率。RocketMQ利用到了文件系统,将消息存放在磁盘中实现持久化。
提示: 上传图片失败,大家可以到我的个人网站阅读 https://spurstong.github.io/
首先先介绍消息存储的整体结构,先大体了解其处理机制。
[图片上传失败...(image-d5ad98-1573734040039)]
从上图中可以看出有几个重要的文件。

  • CommitLog: 消息存储文件,所有消息主题的消息都存储在CommitLog文件夹中,它以物理文件的方式存放,每台Broker上有一个CommitLog文件夹,里面有多个文件,单个文件的大小默认为1G,文件名表示的是该文件中的第1条信息的物理偏移量。每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的。每条消息的前4个字节存储该条消息的总长度,其余的信息就是在该长度下的消息内容。
  • ConsumerQueue: 消息队列文件,在ConsumerQueue文件夹中存在以topic(主题名)为文件夹名的多个文件夹。然后在每个文件夹里根据消息队列数创建了多个文件夹,文件夹名为0,1,2,..。然后在最里面的文件记录的是某主题下的某消息队列下的文件信息,但是它并没有存储消息的具体信息,它起到了类似于索引的功能,更够使消费者可以快速的在CommitLog文件中查找到所需的信息。
    单个ConsumerQueue文件中默认包含30万个条目,单个文件的长度为30W * 20字节,即每条信息占20字节。前8个字节为commitlog offset,为该条消息在commitlog的实际偏移量,中间4个字节为size,即该条消息的长度,最后8字节为tag hashcode,即存储该条消息的tag的哈希值,用于订阅时消息过滤。
  • IndexFile:索引文件。如果消息包含key值的话,会使用IndexFile来存储消息索引。具体的逻辑结构在后面讲解到。
内存映射技术 在Linux操作系统中分为“用户态”和“内核态”,在进行IO操作时,会涉及到这两种形态的切换及read()和write()操作。在执行read()操作时,是系统调用的,期间进行了数据拷贝,它首先将文件内容从硬盘拷贝到内核空间的一个缓冲区,然后再将这些数据拷贝到用户空间,在这个过程中实际上完成了两次的数据拷贝。这样会对访问速度有所影响。
针对这种情况,提出了mmap内存映射技术,mmap内存映射和普通标准IO操作的本质区别是它并不需要将文件中的数据先拷贝到OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间的一块区域与文件对象建立映射关系,就好像可以直接从内存中完成对文件读写操作一样。只有当缺页中断发生时,直接将磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。
[图片上传失败...(image-b07093-1573734137255)]
高可用机制 RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的,在Broker的配置文件中,参数brokerId的值为0表示这个Broker是Master,大于0表示该Broker是Slave。Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是说,Producer只能和Master角色的Broker连接写入信息;Consumer可以连接Master角色和Slave角色的Broker来读取消息。
对于Consumer,当Master不可用或者繁忙的时候,Consumer会自动切换到Slave进行读取消息,实现了消费端的高可用性。
对于Producer,在创建topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同BrokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用候,其他组的Master依然可用,Producer仍然可以发送消息。
内存映射相关类 由上面可知,RocketMQ通过使用内存映射文件来提高IO访问性能,现在介绍几个相关的类。
MappedFile
它是CommitLog里面具体文件的映射。
public class MappedFile extends ReferenceResource { public static final int OS_PAGE_SIZE = 1024 * 4; protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // 当前JVM中映射的虚拟内存总大小 private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); // 当前JVM中mmap句柄数量 private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); // 当前写到什么位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); //ADD BY ChenYang // Flush到什么位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); // 映射的文件大小,定长 protected int fileSize; // 映射的FileChannel对象 protected FileChannel fileChannel; /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ protected ByteBuffer writeBuffer = null; protected TransientStorePool transientStorePool = null; // 映射的文件名 private String fileName; // 映射的起始偏移量 private long fileFromOffset; // 映射的文件 private File file; // 映射的内存对象,position永远不变 private MappedByteBuffer mappedByteBuffer; // 最后一条消息存储时间 private volatile long storeTimestamp = 0; private boolean firstCreateInQueue = false;

MappedFileQueue
MappedFileQueue是MappedFile的管理容器,MappedFileQueue是对存储目录的封装。
public class MappedFileQueue { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); // 每次触发删除文件,最多删除多少个文件 private static final int DELETE_FILES_BATCH_MAX = 10; // 文件存储位置 private final String storePath; // 每个文件的大小 private final int mappedFileSize; // 各个文件 private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); // 预分配MapedFile对象服务 private final AllocateMappedFileService allocateMappedFileService; // 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘 private long flushedWhere = 0; // 当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于等于flushedWhere. private long committedWhere = 0; // 最后一条消息存储时间 private volatile long storeTimestamp = 0;

根据消息偏移量offset查找MappedFile时,不会直接使用offset/mappedFileSize,因为消息文件不是永久存在的,RocketMQ会定时删除存储文件,第一个文件不一定是000000000000000000,可能最初的起始文件已经被删除,那么利用上述方法是会产生错误的。
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { # 获取第一个MappedFile MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); if (index < 0 || index >= this.mappedFiles.size()) { LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " + "mappedFileSize: {}, mappedFiles count: {}", mappedFile, offset, index, this.mappedFileSize, this.mappedFiles.size()); }try { return this.mappedFiles.get(index); } catch (Exception e) { if (returnFirstOnNotFound) { return mappedFile; } LOG_ERROR.warn("findMappedFileByOffset failure. ", e); } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); }return null; }

其中,重要的一个公式是(int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); offset / this.mappedFileSize得到的是假设所有文件没有被删除的情况下该偏移量所处的文件,然后mappedFile.getFileFromOffset() / this.mappedFileSize,获取第一个文件的起始偏移量,如果之前有文件删除的话,计算已删除的文件数。这样就可以得到定位到实际的文件。
ConsumeQueue
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的指向物理存储的地址,每个topic下的每一个Message Queue都有一个对应的ConsumeQueue文件。
public class ConsumeQueue { // 存储单元大小 public static final int CQ_STORE_UNIT_SIZE = 20; private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); // 存储顶层对象 private final DefaultMessageStore defaultMessageStore; // 存储消息索引的队列 private final MappedFileQueue mappedFileQueue; // Topic private final String topic; // queueId private final int queueId; // 写索引时用到的ByteBuffer private final ByteBuffer byteBufferIndex; // 配置 private final String storePath; private final int mappedFileSize; // 最后一个消息对应的物理Offset private long maxPhysicOffset = -1; // 逻辑队列的最小Offset,删除物理文件时,计算出来的最小Offset // 实际使用需要除以 StoreUnitSize private volatile long minLogicOffset = 0; private ConsumeQueueExt consumeQueueExt = null;

IndexFile
如果该消息存在key,可以根据key进行查询,这时需要用到IndexFile索引文件。
public class IndexFile { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static int hashSlotSize = 4; private static int indexSize = 20; private static int invalidIndex = 0; private final int hashSlotNum; private final int indexNum; private final MappedFile mappedFile; private final FileChannel fileChannel; private final MappedByteBuffer mappedByteBuffer; private final IndexHeader indexHeader;

[图片上传失败...(image-eb1267-1573733989072)]
从大的方面来说,分为3个部分,IndexHead,500w个hsah槽和2000w个Index条目。
IndexHead头部,包含40个字节,记录该IndexFile的统计信息,其结构如下:
  • beginTimestamp: 该索引文件中包含信息的最小存储时间。
  • endTimestamp: 该索引文件中包含信息的最大存储时间。
  • beginPhyoffset: 该索引文件中包含的消息在commitlog文件中的最小物理偏移量。
  • endPhyoffset: 该索引文件中包含的消息在commitlog文件中的最大物理偏移量。
  • hashlogCount: hashlot个数。
  • indexCount: Index条目列表当前已使用的个数。
500w个hsah槽
每个Hash槽存储的是落在该Hash槽的hashcode最新的Index的索引,每个Hash槽占4个字节。
2000w个Index
每一个Index条目结构如下:
  • hashcode: key的hashcode
  • phyoffset: 消息对应的物理偏移量
  • timedif: 该消息存储时间与第一条消息的时间戳的差值
  • preIndexNo: 该条目的前一条记录的Index索引
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { if (this.indexHeader.getIndexCount() < this.indexNum) { # 消息key的hashcode int keyHash = indexKeyHashMethod(key); # 将key的hashcode与hash槽数量取余,得到该hashcode下的hash槽下标int slotPos = keyHash % this.hashSlotNum; # 该槽的物理地址 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try {// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, // false); # 获取该hashcode的存储的Index索引, int slotValue = https://www.it610.com/article/this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue> this.indexHeader.getIndexCount()) { slotValue = https://www.it610.com/article/invalidIndex; } # 计算待存储消息的时间戳与第一条消息时间戳的差值,并转换为秒 long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); // 时间差存储单位由毫秒改为秒 timeDiff = timeDiff / 1000; // 25000天后溢出 if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff> Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; }int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; // 写入真正索引 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); // 更新哈希槽 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); // 第一次写入 if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); }this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { e.printStackTrace(); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); }return false; }

将该条目信息存储在IndexFile中,在之前的所存储的最后一个Index的后面添加,依次存放hashcode、消息物理偏移值、消息存储时间戳与第一条消息时间戳的差值、上一条相同hashcode的索引位置。并将当前新放入的Index条目个数即下标放入到该key值所处的卡槽位置中,即会覆盖掉原来记录的Index条目的下标,它只会记录最新的Index条目的下标。
但会存在一种情况,即不同的key可能会存在相同的hashcode,在查找hash槽是根据hashcode查找的,只能记录一个,所以会产生冲突,面对这种情况,在Index索引记录时有一个属性记录了该条目的前一条记录的索引值,即他们的hashcode是相同的。
下面是根据索引key查找消息,其中有一步是解决了hashcode冲突问题。
public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); }int slotValue = https://www.it610.com/article/this.mappedByteBuffer.getInt(absSlotPos); // if (fileLock != null) { // fileLock.release(); // fileLock = null; // }if (slotValue <= invalidIndex || slotValue> this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { // TODO NOTFOUND } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; }int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); // int转为long,避免下面计算时间差值时溢出 long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); // 读到了未知数据 if (timeDiff < 0) { break; }// 时间差存储的是秒,再还原为毫秒, long避免溢出 timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); }if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; }nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { e.printStackTrace(); } }this.mappedFile.release(); } } } }

由于会存在hash冲突,根据slotValue定位该hash槽最新的一个Item条目,经过一些处理后,会通过 prevIndexRead 获取到该Hashcode下的上一个Index下标,如果大于等于1并且小于最大条目数,则继续查找,否则结束查找。
文件刷盘机制 RocketMQ的存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘,如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端,RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。
Consume与IndexFile文件的实时更新 当消息提交存储在Commitlog文件中,ConsumeQueue、IndexFile文件需要及时更新,否则消息无法及时被消费,如何确保消息内容能及时更新到ConsumeQueue、IndexFile等文件,RocketMQ通过开启一个线程ReputMessageService来实时更新消息信息。
在ReputMessageService线程中有一个重要的参数reputFromOffset,该参数表示从哪个物理偏移量开始转发消息给ConsumeQueue和IndexFile,如果允许重复转发,reputFromOffset设置为CommitLog的提交指针,如果不允许重复转发,reputFromOffset设置为CommitLog的内存中最大偏移量。读取的时候会读取该偏移量后的所有消息。
ReputMessageService线程每执行一次任务推送休息1毫秒就继续推送消息到消息消费队列和索引文件。
获取到新增的消息后,会采用不同的方法来对ConsumeQueue和IndexFile文件进行更新。
根据消息更新对应文件 获取到新增的消息后,要将这些消息更新到ConsumeQueue或IndexFile中,现以ConsumeQueue文件为例。
  • 根据消息主题与队列ID,获取到对应的ConsumeQueue文件,ConsumeQueue文件实际是对应的该队列ID下的文件夹,文件夹里有很多文件,在ConsumeQueue属性里面有一个MappedFileQueue,对该文件夹下的所有文件做映射。
  • 依次将消息偏移量、消息长度、tag hashcode写入到ByteBuffer中,将内容追加到ConsumeQueue的内存映射文件中,ConsumeQueue的刷盘方式固定为异步刷盘方式。
过期文件删除机制 CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久的存储在消息服务器上。
RocketMQ顺序写CommitLog文件、ConsumeQueue文件,所有写操作全部落在了最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再更新,如果非当前写文件在下一个文件创建后将不会再被更新,则认为是过期文件,默认的过期时间是72小时。
【RocketMQ阅读笔记之消息存储】RocketMQ会在下面几种情况下执行删除文件操作。
  • 指定删除文件的时间点,通过设置deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认是凌晨4点
  • 磁盘空间是否充足,如果磁盘空间不充足,则会触发过期文件删除操作。
  • 可以通过调用excuteDeleteFilesManualy方法手动触发过期文件删除。

    推荐阅读