RocketMQ 存储消息的流程
概要总结:
1、commitLog其实有两层够层,其中MappendFileQueue是逻辑的存储队列概念,里面保存着顺序增长的MappedFile文件。
2、MappedFile文件是真正存储实际数据的文件
3、在整个broker的存储体系中,MappedFile文件保存了commitLog、consumeQueue、Index等,是核心的数据结构。
RocketMQ存储时序图
SendMessageProcessor类:
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { //Netty命令 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); //响应头 final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); //构建一个内部消息数据结构MessageExtBrokerInner MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); PutMessageResult putMessageResult = null; Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); //判断是不是事务消息 if (traFlag != null && Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } //调用MessageExtBrokerInner putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { //普通消息发送 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }复制代码
DefaultMessageStore类
构造方法初始化ReputMessageService用来更新ConsumeQueue中消息偏移的
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; this.allocateMappedFileService = new AllocateMappedFileService(this); if (messageStoreConfig.isEnableDLegerCommitLog()) { this.commitLog = new DLedgerCommitLog(this); } else { this.commitLog = new CommitLog(this); } this.consumeQueueTable = new ConcurrentHashMap<>(32); this.flushConsumeQueueService = new FlushConsumeQueueService(); this.cleanCommitLogService = new CleanCommitLogService(); this.cleanConsumeQueueService = new CleanConsumeQueueService(); this.storeStatsService = new StoreStatsService(); this.indexService = new IndexService(this); if (!messageStoreConfig.isEnableDLegerCommitLog()) { this.haService = new HAService(this); } else { this.haService = null; } //初始化ReputMessageService用来更新ConsumeQueue中消息偏移的 this.reputMessageService = new ReputMessageService(); this.scheduleMessageService = new ScheduleMessageService(this); this.transientStorePool = new TransientStorePool(messageStoreConfig); if (messageStoreConfig.isTransientStorePoolEnable()) { this.transientStorePool.init(); } this.allocateMappedFileService.start(); this.indexService.start(); this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); MappedFile.ensureDirOK(file.getParent()); lockFile = new RandomAccessFile(file, "rw"); }复制代码
putMessage方法
@Override public PutMessageResult putMessage(MessageExtBrokerInner msg) { //检查状态 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return new PutMessageResult(checkStoreStatus, null); } PutMessageStatus msgCheckStatus = this.checkMessage(msg); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return new PutMessageResult(msgCheckStatus, null); } //开始时间 long beginTime = this.getSystemClock().now(); //调用commitLog得 putMessage PutMessageResult result = this.commitLog.putMessage(msg); long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; }复制代码
当Broker获取到一条消息之后,他是如何存储这条消息的?
Broker通过Netty网络服务器获取到一条消息。Broker收到一条消息之后,其实就会直接把消息写入到CommitLog里去,但是他写入刚开始仅仅是写入到MappedFile映射的一块内存里去,后续是根据刷盘策略去决定是否立即把数据从内存刷入磁盘的,接着就会把这条消息写入到一个CommitLog文件里去,一个Broker机器上就只有一个CommitLog文件,同时还会以异步的方式把消息写入到ConsumeQueue文件里去,因为一个Topic有多个MessageQueue,任何一条消息都是写入一个MessageQueue的,那个MessageQueue其实就是对应了一个ConsumeQueue文件
C:\Users\Sapiens\AppData\Roaming\Typora\typora-user-images
如何异步构建cosumequeue的?
Broker启动的时候会开启一个【异步线程】,ReputMessageService,他会把CommitLog更新事件转发出去,然后让任务处理器去更新ConsumeQueue和IndexFile,
作者:乐享非凡
链接:https://juejin.cn/post/7032583229106290701