RocketMQ设计之同步刷盘
同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
文章图片
在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog
文件。
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); }} else {service.wakeup(); }}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup(); } else {commitLogService.wakeup(); }}}class GroupCommitService extends FlushCommitLogService {private volatile ListrequestsWrite = new ArrayList (); private volatile List requestsRead = new ArrayList (); //提交刷盘任务到任务列表public synchronized void putRequest(final GroupCommitRequest request) {synchronized (this.requestsWrite) {this.requestsWrite.add(request); }if (hasNotified.compareAndSet(false, true)) {waitPoint.countDown(); // notify}}private void swapRequests() {List tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }private void doCommit() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (GroupCommitRequest req : this.requestsRead) {// There may be a message in the next file, so a maximum of// two times the flushboolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) {flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) {CommitLog.this.mappedFileQueue.flush(0); }}req.wakeupCustomer(flushOK); }long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); }this.requestsRead.clear(); } else {// Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0); }}}public void run() {CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) {try {this.waitForRunning(10); this.doCommit(); } catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); }}// Under normal circumstances shutdown, wait for the arrival of the// request, and then flushtry {Thread.sleep(10); } catch (InterruptedException e) {CommitLog.log.warn("GroupCommitService Exception, ", e); }synchronized (this) {this.swapRequests(); }this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }@Overrideprotected void onWaitEnd() {this.swapRequests(); }@Overridepublic String getServiceName() {return GroupCommitService.class.getSimpleName(); }@Overridepublic long getJointime() {return 1000 * 60 * 5; }}
GroupCommitRequest
是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程GroupCommitService
每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。- putRequest(request) 提交刷盘任务到任务列表
- request.waitForFlush同步等待
GroupCommitService
将任务列表中的任务刷盘完成。
requestsWrite
是写队列,用户保存添加进来的刷盘任务,requestsRead
是读队列,在刷盘之前会把写队列的数据放入读队列。CommitLog的doCommit方法:
private void doCommit() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (GroupCommitRequest req : this.requestsRead) {// There may be a message in the next file, so a maximum of// two times the flushboolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) {//根据offset确定是否已经刷盘flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) {CommitLog.this.mappedFileQueue.flush(0); }}req.wakeupCustomer(flushOK); }long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); }//清空已刷盘的列表this.requestsRead.clear(); } else {// Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0); }}}
- 刷盘的时候依次读取
requestsRead
中的数据写入磁盘, - 写入完成后清空
requestsRead
。
CommitLog.this.mappedFileQueue.flush(0); 是刷盘操作:
public boolean flush(final int flushLeastPages) {boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp; }}return result; }
通过MappedFile映射的CommitLog文件写入磁盘
这就是RocketMQ高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而GroupCommitService每隔10毫秒写一批数据到磁盘。
【RocketMQ设计之同步刷盘】到此这篇关于RocketMQ设计之同步刷盘的文章就介绍到这了,更多相关RocketMQ同步刷盘内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- 剑指Offer之Java算法习题精讲字符串与二叉搜索树
- 剑指Offer之Java算法习题精讲数组与字符串题
- RabbitMQ|RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略问题
- MySQL索引之我见
- CMPUTC语言设计
- 戏说领域驱动设计(十六)——实体概念
- 低代码开发|驰骋BPM低代码快速开发平台之—.NET版准备工作篇
- 金针探底技术分析(上)
- 数据结构|数据结构课程设计——学生成绩查询与分析系统(简单详细版,含讲解)
- python学了基础之后方向_毫无基础的人如何入门Python(Python400集大型视频,从正确的方向出发学习...)