  • CommitLog: 消息存储文件,所有消息主题的消息都存储在CommitLog文件夹中,它以物理文件的方式存放,每台Broker上有一个CommitLog文件夹,里面有多个文件,单个文件的大小默认为1G,文件名表示的是该文件中的第1条信息的物理偏移量。每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的。每条消息的前4个字节存储该条消息的总长度,其余的信息就是在该长度下的消息内容。
  • ConsumerQueue: 消息队列文件,在ConsumerQueue文件夹中存在以topic(主题名)为文件夹名的多个文件夹。然后在每个文件夹里根据消息队列数创建了多个文件夹,文件夹名为0,1,2,..。然后在最里面的文件记录的是某主题下的某消息队列下的文件信息,但是它并没有存储消息的具体信息,它起到了类似于索引的功能,更够使消费者可以快速的在CommitLog文件中查找到所需的信息。
    单个ConsumerQueue文件中默认包含30万个条目,单个文件的长度为30W * 20字节,即每条信息占20字节。前8个字节为commitlog offset,为该条消息在commitlog的实际偏移量,中间4个字节为size,即该条消息的长度,最后8字节为tag hashcode,即存储该条消息的tag的哈希值,用于订阅时消息过滤。
  • IndexFile:索引文件。如果消息包含key值的话,会使用IndexFile来存储消息索引。具体的逻辑结构在后面讲解到。
内存映射技术 在Linux操作系统中分为“用户态”和“内核态”,在进行IO操作时,会涉及到这两种形态的切换及read()和write()操作。在执行read()操作时,是系统调用的,期间进行了数据拷贝,它首先将文件内容从硬盘拷贝到内核空间的一个缓冲区,然后再将这些数据拷贝到用户空间,在这个过程中实际上完成了两次的数据拷贝。这样会对访问速度有所影响。
高可用机制 RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的,在Broker的配置文件中,参数brokerId的值为0表示这个Broker是Master,大于0表示该Broker是Slave。Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是说,Producer只能和Master角色的Broker连接写入信息;Consumer可以连接Master角色和Slave角色的Broker来读取消息。
对于Producer,在创建topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同BrokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用候,其他组的Master依然可用,Producer仍然可以发送消息。
内存映射相关类 由上面可知,RocketMQ通过使用内存映射文件来提高IO访问性能,现在介绍几个相关的类。
public class MappedFile extends ReferenceResource { public static final int OS_PAGE_SIZE = 1024 * 4; protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // 当前JVM中映射的虚拟内存总大小 private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); // 当前JVM中mmap句柄数量 private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); // 当前写到什么位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); //ADD BY ChenYang // Flush到什么位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); // 映射的文件大小,定长 protected int fileSize; // 映射的FileChannel对象 protected FileChannel fileChannel; /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ protected ByteBuffer writeBuffer = null; protected TransientStorePool transientStorePool = null; // 映射的文件名 private String fileName; // 映射的起始偏移量 private long fileFromOffset; // 映射的文件 private File file; // 映射的内存对象,position永远不变 private MappedByteBuffer mappedByteBuffer; // 最后一条消息存储时间 private volatile long storeTimestamp = 0; private boolean firstCreateInQueue = false;

public class MappedFileQueue { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); // 每次触发删除文件,最多删除多少个文件 private static final int DELETE_FILES_BATCH_MAX = 10; // 文件存储位置 private final String storePath; // 每个文件的大小 private final int mappedFileSize; // 各个文件 private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); // 预分配MapedFile对象服务 private final AllocateMappedFileService allocateMappedFileService; // 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘 private long flushedWhere = 0; // 当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于等于flushedWhere. private long committedWhere = 0; // 最后一条消息存储时间 private volatile long storeTimestamp = 0;

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { # 获取第一个MappedFile MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); if (index < 0 || index >= this.mappedFiles.size()) { LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " + "mappedFileSize: {}, mappedFiles count: {}", mappedFile, offset, index, this.mappedFileSize, this.mappedFiles.size()); }try { return this.mappedFiles.get(index); } catch (Exception e) { if (returnFirstOnNotFound) { return mappedFile; } LOG_ERROR.warn("findMappedFileByOffset failure. ", e); } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); }return null; }

其中,重要的一个公式是(int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); offset / this.mappedFileSize得到的是假设所有文件没有被删除的情况下该偏移量所处的文件,然后mappedFile.getFileFromOffset() / this.mappedFileSize,获取第一个文件的起始偏移量,如果之前有文件删除的话,计算已删除的文件数。这样就可以得到定位到实际的文件。
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的指向物理存储的地址,每个topic下的每一个Message Queue都有一个对应的ConsumeQueue文件。
public class ConsumeQueue { // 存储单元大小 public static final int CQ_STORE_UNIT_SIZE = 20; private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); // 存储顶层对象 private final DefaultMessageStore defaultMessageStore; // 存储消息索引的队列 private final MappedFileQueue mappedFileQueue; // Topic private final String topic; // queueId private final int queueId; // 写索引时用到的ByteBuffer private final ByteBuffer byteBufferIndex; // 配置 private final String storePath; private final int mappedFileSize; // 最后一个消息对应的物理Offset private long maxPhysicOffset = -1; // 逻辑队列的最小Offset,删除物理文件时,计算出来的最小Offset // 实际使用需要除以 StoreUnitSize private volatile long minLogicOffset = 0; private ConsumeQueueExt consumeQueueExt = null;

public class IndexFile { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static int hashSlotSize = 4; private static int indexSize = 20; private static int invalidIndex = 0; private final int hashSlotNum; private final int indexNum; private final MappedFile mappedFile; private final FileChannel fileChannel; private final MappedByteBuffer mappedByteBuffer; private final IndexHeader indexHeader;

  • beginTimestamp: 该索引文件中包含信息的最小存储时间。
  • endTimestamp: 该索引文件中包含信息的最大存储时间。
  • beginPhyoffset: 该索引文件中包含的消息在commitlog文件中的最小物理偏移量。
  • endPhyoffset: 该索引文件中包含的消息在commitlog文件中的最大物理偏移量。
  • hashlogCount: hashlot个数。
  • indexCount: Index条目列表当前已使用的个数。
  • hashcode: key的hashcode
  • phyoffset: 消息对应的物理偏移量
  • timedif: 该消息存储时间与第一条消息的时间戳的差值
  • preIndexNo: 该条目的前一条记录的Index索引
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { if (this.indexHeader.getIndexCount() < this.indexNum) { # 消息key的hashcode int keyHash = indexKeyHashMethod(key); # 将key的hashcode与hash槽数量取余,得到该hashcode下的hash槽下标int slotPos = keyHash % this.hashSlotNum; # 该槽的物理地址 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try {// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, // false); # 获取该hashcode的存储的Index索引, int slotValue =; if (slotValue <= invalidIndex || slotValue> this.indexHeader.getIndexCount()) { slotValue =; } # 计算待存储消息的时间戳与第一条消息时间戳的差值,并转换为秒 long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); // 时间差存储单位由毫秒改为秒 timeDiff = timeDiff / 1000; // 25000天后溢出 if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff> Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; }int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; // 写入真正索引 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); // 更新哈希槽 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); // 第一次写入 if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); }this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { e.printStackTrace(); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); }return false; }

public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); }int slotValue =; // if (fileLock != null) { // fileLock.release(); // fileLock = null; // }if (slotValue <= invalidIndex || slotValue> this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { // TODO NOTFOUND } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; }int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); // int转为long,避免下面计算时间差值时溢出 long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); // 读到了未知数据 if (timeDiff < 0) { break; }// 时间差存储的是秒,再还原为毫秒, long避免溢出 timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); }if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; }nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { e.printStackTrace(); } }this.mappedFile.release(); } } } }

由于会存在hash冲突,根据slotValue定位该hash槽最新的一个Item条目,经过一些处理后,会通过 prevIndexRead 获取到该Hashcode下的上一个Index下标,如果大于等于1并且小于最大条目数,则继续查找,否则结束查找。
文件刷盘机制 RocketMQ的存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘,如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端,RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。
Consume与IndexFile文件的实时更新 当消息提交存储在Commitlog文件中,ConsumeQueue、IndexFile文件需要及时更新,否则消息无法及时被消费,如何确保消息内容能及时更新到ConsumeQueue、IndexFile等文件,RocketMQ通过开启一个线程ReputMessageService来实时更新消息信息。
根据消息更新对应文件 获取到新增的消息后,要将这些消息更新到ConsumeQueue或IndexFile中,现以ConsumeQueue文件为例。
  • 根据消息主题与队列ID,获取到对应的ConsumeQueue文件,ConsumeQueue文件实际是对应的该队列ID下的文件夹,文件夹里有很多文件,在ConsumeQueue属性里面有一个MappedFileQueue,对该文件夹下的所有文件做映射。
  • 依次将消息偏移量、消息长度、tag hashcode写入到ByteBuffer中,将内容追加到ConsumeQueue的内存映射文件中,ConsumeQueue的刷盘方式固定为异步刷盘方式。
过期文件删除机制 CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久的存储在消息服务器上。
  • 指定删除文件的时间点,通过设置deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认是凌晨4点
  • 磁盘空间是否充足,如果磁盘空间不充足,则会触发过期文件删除操作。
  • 可以通过调用excuteDeleteFilesManualy方法手动触发过期文件删除。
