RocketMQ源码学习(三)-消息存储
前面介绍了Producer把消息发到了Broker,接下来研究一下Broker是如何进行消息存储的,最终存储文件的有哪些?
commitLog: 消息存储目录;
config:运行期间的配置信息;
consumerqueue:消息消费队列存储目录;
index:消息索引文件存储目录;
abort:如果该文件存在,则表明Broker非正产关闭;
checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间、index索引文件最后一次刷盘时间戳
消息存储入口
org.apache.rocketmq.store.DefaultMessageStore#putMessage
这个是根据视频找的,Netty一层层调用太多了,最终会调用到这个方法执行消息的存储。
执行commitLog写入的方法是
// 消息写入由这个方法执行 PutMessageResult result = this.commitLog.putMessage(msg); 复制代码
延迟消息Topic替换
org.apache.rocketmq.store.CommitLog#putMessage
// 主体逻辑: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery // 延迟消息:延迟消息写入时会转为写到SCHEDULE_TOPIC_xxxx这个Topic中 if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 这里会替换我们设置的topic, 改为MQ自己定义的Topic名称,头天换日了。 msg.setTopic(topic); msg.setQueueId(queueId); } } 复制代码
有意思的是直接修改了消息的topic和id,在延时消息中有种偷梁换柱的感觉。
文件写入
// 零拷贝实现->mmp MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 线程锁,注意使用锁的这种方式 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config 复制代码
使用mmp技术拷贝文件,如果文件满了,会新创建一个文件,然后重写消息
// 以追加的方式写入文件 result = mappedFile.appendMessage(msg, this.appendMessageCallback); // 文件写入结果 switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: // 如果文件写满了,重新创建一个文件,重写消息 unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } 复制代码
接着刷盘、同步文件
// 文件刷盘 handleDiskFlush(result, putMessageResult, msg); // 主从同步 handleHA(result, putMessageResult, msg); 复制代码
刷盘分为同步刷盘和异步,对应不同的处理逻辑。
org.apache.rocketmq.store.CommitLog#handleDiskFlush
// 文件刷盘方法 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush 同步 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { ... // 同步等待文件刷新 try { flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { //flushOK=false; } if (flushStatus != PutMessageStatus.PUT_OK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush 异步 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } } 复制代码
在异步休眠启动线程中,会有500ms的休眠,这也是为什么数据会丢失的原因:
org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } 复制代码
主从同步
ToDo:涉及到Raft协议,Raft包含很多模块,后面与其他协议一起做对比分析。
org.apache.rocketmq.store.CommitLog#handleHA
写入之后,要进行消息分发,来看一下消息分发的逻辑。
消息分发
org.apache.rocketmq.broker.BrokerController#start
在启动MessageStore的时候
org.apache.rocketmq.store.DefaultMessageStore#start
// ToDo: k2 Broker启动时,会启动一个线程来更新Consumerqueue索引文件 this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); this.reputMessageService.start(); 复制代码
调用这里的start方法启动线程将CommitLog中的消息分发到consumequeue中
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); } 复制代码
每隔1ms分发一次消息。
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
... for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 从CommitLog中获取一个dispatchRequest,拿到一个需要进行转发的消息,即从commitlog中读取的 DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { // 分发CommitLog写入消息 DefaultMessageStore.this.doDispatch(dispatchRequest); // ToDo: k2->长轮训:如果消息到了主节点,并且开启了长轮训 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { // 唤醒NotifyMessageArrivingListener方法进行一次请求线程的检查 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } ... 复制代码
消息分发具体有两个任务:分发ConsumeQueue、分发IndexFile
org.apache.rocketmq.store.DefaultMessageStore#doDispatch
// k2 将commitlog写入的事件转发到ConsumeQueue和IndexFile public void doDispatch(DispatchRequest req) { for (CommitLogDispatcher dispatcher : this.dispatcherList) { dispatcher.dispatch(req); } } 复制代码
ConsumerQueue分发构建
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
// ToDo: k1->ConsumereQueue消息分发构建器 class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } } 复制代码
indexFile文件分发构建
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch
// ToDo: k1->IndexFile文件分发构建器 class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } } } 复制代码
一般做定制化也是从这两个方法下手。
过期文件删除
org.apache.rocketmq.store.DefaultMessageStore#start
// ToDo: k2->Broker启动删除过期文件的定时任务 this.addScheduleTask(); 复制代码
两种文件定期删除
org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
// ToDo: k1->定时删除过期消息的任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); 复制代码
org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() { // 定时删除过期的commitlog this.cleanCommitLogService.run(); // 定时删除过期的Consumerqueue this.cleanConsumeQueueService.run(); } 复制代码
至此,消息存储的大致流程以及入口代码都陈列了出来。
作者:YuJian
链接:https://juejin.cn/post/7021042530137407519