阅读 160

RocketMQ 存储消息的流程

概要总结:

1、commitLog其实有两层够层,其中MappendFileQueue是逻辑的存储队列概念,里面保存着顺序增长的MappedFile文件。

2、MappedFile文件是真正存储实际数据的文件

3、在整个broker的存储体系中,MappedFile文件保存了commitLog、consumeQueue、Index等,是核心的数据结构。

RocketMQ存储时序图

RocketMQ存储时序图.png

SendMessageProcessor类:

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    //Netty命令
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    //响应头
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
        
    response.setOpaque(request.getOpaque());

    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);
    //构建一个内部消息数据结构MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    //判断是不是事务消息
    if (traFlag != null && Boolean.parseBoolean(traFlag)
        && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        //调用MessageExtBrokerInner
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
    
        //普通消息发送
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }

    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}复制代码

DefaultMessageStore类

构造方法初始化ReputMessageService用来更新ConsumeQueue中消息偏移的

 public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        this.messageArrivingListener = messageArrivingListener;
        this.brokerConfig = brokerConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.brokerStatsManager = brokerStatsManager;
        this.allocateMappedFileService = new AllocateMappedFileService(this);
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        this.consumeQueueTable = new ConcurrentHashMap<>(32);

        this.flushConsumeQueueService = new FlushConsumeQueueService();
        this.cleanCommitLogService = new CleanCommitLogService();
        this.cleanConsumeQueueService = new CleanConsumeQueueService();
        this.storeStatsService = new StoreStatsService();
        this.indexService = new IndexService(this);
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            this.haService = new HAService(this);
        } else {
            this.haService = null;
        }
        //初始化ReputMessageService用来更新ConsumeQueue中消息偏移的
        this.reputMessageService = new ReputMessageService();

        this.scheduleMessageService = new ScheduleMessageService(this);

        this.transientStorePool = new TransientStorePool(messageStoreConfig);

        if (messageStoreConfig.isTransientStorePoolEnable()) {
            this.transientStorePool.init();
        }

        this.allocateMappedFileService.start();

        this.indexService.start();

        this.dispatcherList = new LinkedList<>();

        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());

        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));

        MappedFile.ensureDirOK(file.getParent());

        lockFile = new RandomAccessFile(file, "rw");
    }复制代码

putMessage方法

@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    //检查状态
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return new PutMessageResult(checkStoreStatus, null);
    }

    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return new PutMessageResult(msgCheckStatus, null);
    }
    //开始时间
    long beginTime = this.getSystemClock().now();
    //调用commitLog得 putMessage
    PutMessageResult result = this.commitLog.putMessage(msg);
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }

    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}复制代码

当Broker获取到一条消息之后,他是如何存储这条消息的?

Broker通过Netty网络服务器获取到一条消息。Broker收到一条消息之后,其实就会直接把消息写入到CommitLog里去,但是他写入刚开始仅仅是写入到MappedFile映射的一块内存里去,后续是根据刷盘策略去决定是否立即把数据从内存刷入磁盘的,接着就会把这条消息写入到一个CommitLog文件里去,一个Broker机器上就只有一个CommitLog文件,同时还会以异步的方式把消息写入到ConsumeQueue文件里去,因为一个Topic有多个MessageQueue,任何一条消息都是写入一个MessageQueue的,那个MessageQueue其实就是对应了一个ConsumeQueue文件

C:\Users\Sapiens\AppData\Roaming\Typora\typora-user-images

如何异步构建cosumequeue的?

image-20211120093721479.pngBroker启动的时候会开启一个【异步线程】,ReputMessageService,他会把CommitLog更新事件转发出去,然后让任务处理器去更新ConsumeQueue和IndexFile,


作者:乐享非凡
链接:https://juejin.cn/post/7032583229106290701


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐