阅读 108

RocketMQ源码学习(三)-消息存储

前面介绍了Producer把消息发到了Broker,接下来研究一下Broker是如何进行消息存储的,最终存储文件的有哪些?

  • commitLog: 消息存储目录;

  • config:运行期间的配置信息;

  • consumerqueue:消息消费队列存储目录;

  • index:消息索引文件存储目录;

  • abort:如果该文件存在,则表明Broker非正产关闭;

  • checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间、index索引文件最后一次刷盘时间戳

消息存储入口

org.apache.rocketmq.store.DefaultMessageStore#putMessage

这个是根据视频找的,Netty一层层调用太多了,最终会调用到这个方法执行消息的存储。

执行commitLog写入的方法是

// 消息写入由这个方法执行 PutMessageResult result = this.commitLog.putMessage(msg); 复制代码

延迟消息Topic替换

org.apache.rocketmq.store.CommitLog#putMessage

// 主体逻辑: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE     || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {     // Delay Delivery     // 延迟消息:延迟消息写入时会转为写到SCHEDULE_TOPIC_xxxx这个Topic中     if (msg.getDelayTimeLevel() > 0) {         if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {             msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());         }         topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;         queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());         // Backup real topic, queueId         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));         msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));         // 这里会替换我们设置的topic, 改为MQ自己定义的Topic名称,头天换日了。         msg.setTopic(topic);         msg.setQueueId(queueId);     } } 复制代码

有意思的是直接修改了消息的topic和id,在延时消息中有种偷梁换柱的感觉。

文件写入

// 零拷贝实现->mmp MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 线程锁,注意使用锁的这种方式 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config 复制代码

使用mmp技术拷贝文件,如果文件满了,会新创建一个文件,然后重写消息

// 以追加的方式写入文件 result = mappedFile.appendMessage(msg, this.appendMessageCallback); // 文件写入结果 switch (result.getStatus()) {     case PUT_OK:         break;     case END_OF_FILE:   // 如果文件写满了,重新创建一个文件,重写消息         unlockMappedFile = mappedFile;         // Create a new file, re-write the message         mappedFile = this.mappedFileQueue.getLastMappedFile(0);         if (null == mappedFile) {             // XXX: warn and notify me             log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());             beginTimeInLock = 0;             return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);         }         result = mappedFile.appendMessage(msg, this.appendMessageCallback);         break;     case MESSAGE_SIZE_EXCEEDED:     case PROPERTIES_SIZE_EXCEEDED:         beginTimeInLock = 0;         return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);     case UNKNOWN_ERROR:         beginTimeInLock = 0;         return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);     default:         beginTimeInLock = 0;         return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } 复制代码

接着刷盘、同步文件

// 文件刷盘 handleDiskFlush(result, putMessageResult, msg); // 主从同步 handleHA(result, putMessageResult, msg); 复制代码

刷盘分为同步刷盘和异步,对应不同的处理逻辑。

org.apache.rocketmq.store.CommitLog#handleDiskFlush

// 文件刷盘方法 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {     // Synchronization flush 同步     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { ...             // 同步等待文件刷新             try {                 flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),                         TimeUnit.MILLISECONDS);             } catch (InterruptedException | ExecutionException | TimeoutException e) {                 //flushOK=false;             }             if (flushStatus != PutMessageStatus.PUT_OK) {                 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                     + " client address: " + messageExt.getBornHostString());                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);             }         } else {             service.wakeup();         }     }     // Asynchronous flush 异步     else {         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {             flushCommitLogService.wakeup();         } else {             commitLogService.wakeup();         }     } } 复制代码

在异步休眠启动线程中,会有500ms的休眠,这也是为什么数据会丢失的原因:

org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run

if (flushCommitLogTimed) {     Thread.sleep(interval); } else {     this.waitForRunning(interval); } 复制代码

主从同步

ToDo:涉及到Raft协议,Raft包含很多模块,后面与其他协议一起做对比分析。

org.apache.rocketmq.store.CommitLog#handleHA

写入之后,要进行消息分发,来看一下消息分发的逻辑。

消息分发

org.apache.rocketmq.broker.BrokerController#start

在启动MessageStore的时候

org.apache.rocketmq.store.DefaultMessageStore#start

// ToDo: k2 Broker启动时,会启动一个线程来更新Consumerqueue索引文件 this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); this.reputMessageService.start(); 复制代码

调用这里的start方法启动线程将CommitLog中的消息分发到consumequeue中

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run

@Override public void run() {     DefaultMessageStore.log.info(this.getServiceName() + " service started");     while (!this.isStopped()) {         try {             Thread.sleep(1);             this.doReput();         } catch (Exception e) {             DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);         }     }     DefaultMessageStore.log.info(this.getServiceName() + " service end"); } 复制代码

每隔1ms分发一次消息。

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput

... for (int readSize = 0; readSize < result.getSize() && doNext; ) {     // 从CommitLog中获取一个dispatchRequest,拿到一个需要进行转发的消息,即从commitlog中读取的     DispatchRequest dispatchRequest =         DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);     int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();     if (dispatchRequest.isSuccess()) {         if (size > 0) {             // 分发CommitLog写入消息             DefaultMessageStore.this.doDispatch(dispatchRequest);             // ToDo: k2->长轮训:如果消息到了主节点,并且开启了长轮训             if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()                 && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {                 // 唤醒NotifyMessageArrivingListener方法进行一次请求线程的检查                 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),                     dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,                     dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),                     dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());             }                          ... 复制代码

消息分发具体有两个任务:分发ConsumeQueue、分发IndexFile

org.apache.rocketmq.store.DefaultMessageStore#doDispatch

// k2 将commitlog写入的事件转发到ConsumeQueue和IndexFile public void doDispatch(DispatchRequest req) {     for (CommitLogDispatcher dispatcher : this.dispatcherList) {         dispatcher.dispatch(req);     } } 复制代码

ConsumerQueue分发构建

org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch

// ToDo: k1->ConsumereQueue消息分发构建器 class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {     @Override     public void dispatch(DispatchRequest request) {         final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());         switch (tranType) {             case MessageSysFlag.TRANSACTION_NOT_TYPE:             case MessageSysFlag.TRANSACTION_COMMIT_TYPE:                 DefaultMessageStore.this.putMessagePositionInfo(request);                 break;             case MessageSysFlag.TRANSACTION_PREPARED_TYPE:             case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:                 break;         }     } } 复制代码

indexFile文件分发构建

org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch

// ToDo: k1->IndexFile文件分发构建器 class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {     @Override     public void dispatch(DispatchRequest request) {         if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {             DefaultMessageStore.this.indexService.buildIndex(request);         }     } } 复制代码

一般做定制化也是从这两个方法下手。

过期文件删除

org.apache.rocketmq.store.DefaultMessageStore#start

// ToDo: k2->Broker启动删除过期文件的定时任务 this.addScheduleTask(); 复制代码

两种文件定期删除

org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask

// ToDo: k1->定时删除过期消息的任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {     @Override     public void run() {         //         DefaultMessageStore.this.cleanFilesPeriodically();     } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); 复制代码

org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically

private void cleanFilesPeriodically() {     // 定时删除过期的commitlog     this.cleanCommitLogService.run();     // 定时删除过期的Consumerqueue     this.cleanConsumeQueueService.run(); } 复制代码

至此,消息存储的大致流程以及入口代码都陈列了出来。


作者:YuJian
链接:https://juejin.cn/post/7021042530137407519

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐