阅读 243

源码分析RocketMQ之CommitLog消息存储机制

RocketMQ 的存储核心类为 DefaultMessageStore,存储消息的入口方法为:putMessage。

消息存储分析

核心属性

  • messageStoreConfig

  • 存储相关的配置,例如存储路径、commitLog文件大小,刷盘频次等等。

  • CommitLog commitLog

  • comitLog 的核心处理类,消息存储在 commitlog 文件中。

  • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable

  • topic 的队列信息。

  • FlushConsumeQueueService flushConsumeQueueService

  • ConsumeQueue 刷盘服务线程。

  • CleanCommitLogService cleanCommitLogService

  • commitLog 过期文件删除线程。

  • CleanConsumeQueueService cleanConsumeQueueService

  • consumeQueue 过期文件删除线程。、

  • IndexService indexService

  • 索引服务。

  • AllocateMappedFileService allocateMappedFileService

  • MappedFile 分配线程,RocketMQ 使用内存映射处理 commitlog、consumeQueue文件。

  • ReputMessageService reputMessageService

  • reput 转发线程(负责 Commitlog 转发到 Consumequeue、Index文件)。

  • HAService haService

  • 主从同步实现服务。

  • ScheduleMessageService scheduleMessageService

  • 定时任务调度器,执行定时任务。

  • StoreStatsService storeStatsService

  • 存储统计服务。

  • TransientStorePool transientStorePool

  • ByteBuffer 池,后文会详细使用。

  • RunningFlags runningFlags

  • 存储服务状态。

  • BrokerStatsManager brokerStatsManager

  • Broker 统计服务。

  • MessageArrivingListener messageArrivingListener

  • 消息达到监听器。

  • StoreCheckpoint storeCheckpoint

  • 刷盘检测点。

  • LinkedList dispatcherList

  • 转发 comitlog 日志,主要是从 commitlog 转发到 consumeQueue、index 文件。

消息存储过程putMessage

    @Override     public PutMessageResult putMessage(MessageExtBrokerInner msg) {         PutMessageStatus checkStoreStatus = this.checkStoreStatus(); //判断发送消息的状态是否已经准备好了         if (checkStoreStatus != PutMessageStatus.PUT_OK) {             return new PutMessageResult(checkStoreStatus, null);         } //判断消息是否是非法的,也就是不是正常返回消息的值         PutMessageStatus msgCheckStatus = this.checkMessage(msg);         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {             return new PutMessageResult(msgCheckStatus, null);         } //这个其实就是获取了当前时间System.currentTimeMillis();         long beginTime = this.getSystemClock().now();         PutMessageResult result = this.commitLog.putMessage(msg); //elapsedTime 其实就是发送消息到结束之间耗费的时间         long elapsedTime = this.getSystemClock().now() - beginTime;         if (elapsedTime > 500) {             log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);         }         this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); //如果当前消息发送失败或者返回为null,就让发送消息的过期时间+1         if (null == result || !result.isOk()) {             this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();         }         return result;     } 复制代码

重载putMessages

    @Override     public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {         PutMessageStatus checkStoreStatus = this.checkStoreStatus();         if (checkStoreStatus != PutMessageStatus.PUT_OK) {             return new PutMessageResult(checkStoreStatus, null);         }         PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {             return new PutMessageResult(msgCheckStatus, null);         }         long beginTime = this.getSystemClock().now();         PutMessageResult result = this.commitLog.putMessages(messageExtBatch);         long elapsedTime = this.getSystemClock().now() - beginTime;         if (elapsedTime > 500) {             log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);         }         this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);         if (null == result || !result.isOk()) {             this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();         }         return result;     } 复制代码

但是我发现了一个问题,只有传进来的参数变了,其他的逻辑都没有任何改变

真正发送消息

commitlog类下面的putMessages方法

    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {         messageExtBatch.setStoreTimestamp(System.currentTimeMillis());         AppendMessageResult result;         StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); //获取事务标识         final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());         if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {             return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);         }         if (messageExtBatch.getDelayTimeLevel() > 0) {             return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);         }         InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();         if (bornSocketAddress.getAddress() instanceof Inet6Address) {             messageExtBatch.setBornHostV6Flag();         }         InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();         if (storeSocketAddress.getAddress() instanceof Inet6Address) {             messageExtBatch.setStoreHostAddressV6Flag();         }         long elapsedTimeInLock = 0;         MappedFile unlockMappedFile = null;         MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();         //fine-grained lock instead of the coarse-grained         MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();         messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); //发送消息上锁         putMessageLock.lock();         try {             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();             this.beginTimeInLock = beginLockTimestamp;             // Here settings are stored timestamp, in order to ensure an orderly             // global             messageExtBatch.setStoreTimestamp(beginLockTimestamp);             if (null == mappedFile || mappedFile.isFull()) {                 mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise             }             if (null == mappedFile) {                 log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());                 beginTimeInLock = 0;                 return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);             }             result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); //根据消息返回结果来进行不同的处理             switch (result.getStatus()) {                 case PUT_OK:                     break; //消息失败的情况,会进行重试一下                 case END_OF_FILE:                     unlockMappedFile = mappedFile;                     // Create a new file, re-write the message                     mappedFile = this.mappedFileQueue.getLastMappedFile(0);                     if (null == mappedFile) {                         // XXX: warn and notify me                         log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());                         beginTimeInLock = 0;                         return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);                     }                     result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);                     break;                 case MESSAGE_SIZE_EXCEEDED:                 case PROPERTIES_SIZE_EXCEEDED:                     beginTimeInLock = 0;                     return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);                 case UNKNOWN_ERROR:                     beginTimeInLock = 0;                     return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);                 default:                     beginTimeInLock = 0;                     return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);             }             elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;             beginTimeInLock = 0;         } finally {             putMessageLock.unlock();         }         if (elapsedTimeInLock > 500) {             log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);         }         if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {             this.defaultMessageStore.unlockMappedFile(unlockMappedFile);         }         PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);         // Statistics         storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());         storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());         handleDiskFlush(result, putMessageResult, messageExtBatch);         handleHA(result, putMessageResult, messageExtBatch);         return putMessageResult;     } 复制代码

  1. 获取消息类型

  2. 获取一个 MappedFile 对象,内存映射的具体实现。

  3. 追加消息需要加锁,串行化处理。

  4. 验证MappedFile 对象,获取一个可用的 MappedFile (如果没有,则创建一个)

  5. 通过MappedFile对象写入文件。

  6. 根据刷盘策略刷盘。

  7. 主从同步

存储核心类MappedFile

基本属性

//系统缓存大小    public static final int OS_PAGE_SIZE = 1024 * 4;     protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); //类变量,所有 MappedFile 实例已使用字节总数。     private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); //MappedFile 个数。     private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); //当前MappedFile对象当前写指针。     protected final AtomicInteger wrotePosition = new AtomicInteger(0); //当前提交的指针。     protected final AtomicInteger committedPosition = new AtomicInteger(0); //当前刷写到磁盘的指针。     private final AtomicInteger flushedPosition = new AtomicInteger(0); //文件总大小     protected int fileSize; //文件通道。     protected FileChannel fileChannel; //如果开启了transientStorePoolEnable,消息会写入堆外内存,然后提交到 PageCache 并最终刷写到磁盘。     protected ByteBuffer writeBuffer = null; //ByteBuffer的缓冲池,堆外内存,transientStorePoolEnable 为 true 时生效。     protected TransientStorePool transientStorePool = null; //文件名字     private String fileName; //文件序号,代表该文件代表的文件偏移量。     private long fileFromOffset; //文件对象     private File file; //对应操作系统的 PageCache。     private MappedByteBuffer mappedByteBuffer; //最后一次存储时间戳。     private volatile long storeTimestamp = 0;     private boolean firstCreateInQueue = false; 复制代码

初始化方法init

    public void init(final String fileName, final int fileSize,         final TransientStorePool transientStorePool) throws IOException {         init(fileName, fileSize);         this.writeBuffer = transientStorePool.borrowBuffer();         this.transientStorePool = transientStorePool;     }     private void init(final String fileName, final int fileSize) throws IOException {         this.fileName = fileName;         this.fileSize = fileSize;         this.file = new File(fileName);         this.fileFromOffset = Long.parseLong(this.file.getName());         boolean ok = false; //确保有这个目录,如果没有的话,就会创建一个目录         ensureDirOK(this.file.getParent());         try {             this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();             this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);             TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);             TOTAL_MAPPED_FILES.incrementAndGet();             ok = true;         } catch (FileNotFoundException e) {             log.error("Failed to create file " + this.fileName, e);             throw e;         } catch (IOException e) {             log.error("Failed to map file " + this.fileName, e);             throw e;         } finally {             if (!ok && this.fileChannel != null) {                 this.fileChannel.close();             }         }     } 复制代码

初始化方法重载了一下,主要是为了区别是否开启堆外内存,但是其实还是调用了上面的init的第二个方法。 init的方法总的来说就是 获取一个目录,然后随机读取一个文件,接着将存储数据的信息增加一下。

AppendMessageResult

    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {         return appendMessagesInner(msg, cb);     }     public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {         return appendMessagesInner(messageExtBatch, cb);     }     public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {         assert messageExt != null;         assert cb != null; //获取当前的写入位置         int currentPos = this.wrotePosition.get();         if (currentPos < this.fileSize) {             ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();             byteBuffer.position(currentPos);             AppendMessageResult result; //根据消息的类型是单个消息还是批量消息做不同处理             if (messageExt instanceof MessageExtBrokerInner) {                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);             } else if (messageExt instanceof MessageExtBatch) {                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);             } else {                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);             } //改变写入位置             this.wrotePosition.addAndGet(result.getWroteBytes()); //改变最终的时间戳             this.storeTimestamp = result.getStoreTimestamp();             return result;         }         log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);         return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);     } 复制代码

doAppend

//fileFromOffset 该文件在整个文件序列中的偏移量。 //ByteBuffer byteBuffer byteBuffer,NIO 字节容器。 //int maxBlank 最大可写字节数。 //MessageExtBrokerInner msgInner 消息内部封装实体。         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,             final MessageExtBrokerInner msgInner) {             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>             // PHY OFFSET             long wroteOffset = fileFromOffset + byteBuffer.position();             int sysflag = msgInner.getSysFlag();             int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;             int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;             ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);             ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);             this.resetByteBuffer(storeHostHolder, storeHostLength);             String msgId;             if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {                 msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);             } else {                 msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);             }             // Record ConsumeQueue information             keyBuilder.setLength(0);             keyBuilder.append(msgInner.getTopic());             keyBuilder.append('-');             keyBuilder.append(msgInner.getQueueId());             String key = keyBuilder.toString();             Long queueOffset = CommitLog.this.topicQueueTable.get(key);             if (null == queueOffset) {                 queueOffset = 0L;                 CommitLog.this.topicQueueTable.put(key, queueOffset);             }             // Transaction messages that require special handling             final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());             switch (tranType) {                 // Prepared and Rollback message is not consumed, will not enter the                 // consumer queuec                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:                     queueOffset = 0L;                     break;                 case MessageSysFlag.TRANSACTION_NOT_TYPE:                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:                 default:                     break;             }             /**              * Serialize message              */             final byte[] propertiesData =                 msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);             final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;             if (propertiesLength > Short.MAX_VALUE) {                 log.warn("putMessage message properties length too long. length={}", propertiesData.length);                 return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);             }             final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);             final int topicLength = topicData.length;             final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;             final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);             // Exceeds the maximum message             if (msgLen > this.maxMessageSize) {                 CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength                     + ", maxMessageSize: " + this.maxMessageSize);                 return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);             }             // Determines whether there is sufficient free space             if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {                 this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);                 // 1 TOTALSIZE                 this.msgStoreItemMemory.putInt(maxBlank);                 // 2 MAGICCODE                 this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);                 // 3 The remaining space may be any value                 // Here the length of the specially set maxBlank                 final long beginTimeMills = CommitLog.this.defaultMessageStore.now();                 byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);                 return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),                     queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);             }             // Initialization of storage space             this.resetByteBuffer(msgStoreItemMemory, msgLen);             // 1 TOTALSIZE             this.msgStoreItemMemory.putInt(msgLen);             // 2 MAGICCODE             this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);             // 3 BODYCRC             this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());             // 4 QUEUEID             this.msgStoreItemMemory.putInt(msgInner.getQueueId());             // 5 FLAG             this.msgStoreItemMemory.putInt(msgInner.getFlag());             // 6 QUEUEOFFSET             this.msgStoreItemMemory.putLong(queueOffset);             // 7 PHYSICALOFFSET             this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());             // 8 SYSFLAG             this.msgStoreItemMemory.putInt(msgInner.getSysFlag());             // 9 BORNTIMESTAMP             this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());             // 10 BORNHOST             this.resetByteBuffer(bornHostHolder, bornHostLength);             this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));             // 11 STORETIMESTAMP             this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());             // 12 STOREHOSTADDRESS             this.resetByteBuffer(storeHostHolder, storeHostLength);             this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));             // 13 RECONSUMETIMES             this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());             // 14 Prepared Transaction Offset             this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());             // 15 BODY             this.msgStoreItemMemory.putInt(bodyLength);             if (bodyLength > 0)                 this.msgStoreItemMemory.put(msgInner.getBody());             // 16 TOPIC             this.msgStoreItemMemory.put((byte) topicLength);             this.msgStoreItemMemory.put(topicData);             // 17 PROPERTIES             this.msgStoreItemMemory.putShort((short) propertiesLength);             if (propertiesLength > 0)                 this.msgStoreItemMemory.put(propertiesData);             final long beginTimeMills = CommitLog.this.defaultMessageStore.now();             // Write messages to the queue buffer             byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);             AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,                 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);             switch (tranType) {                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:                     break;                 case MessageSysFlag.TRANSACTION_NOT_TYPE:                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:                     // The next update ConsumeQueue information                     CommitLog.this.topicQueueTable.put(key, ++queueOffset);                     break;                 default:                     break;             }             return result;         } 复制代码

逻辑:

  1. 根据 topic-queryId 获取该队列的偏移地址(待写入的地址),如果没有,新增一个键值对,当前偏移量为 0。

  2. 对事务消息需要单独特殊的处理(PREPARE,ROLLBACK类型的消息,不进入Consume队列)。

  3. 消息的附加属性长度不能超过65536个字节。

  4. 计算消息存储长度

  5. 如果消息长度超过配置的消息总长度,则返回 MESSAGE_SIZE_EXCEEDED。

  6. 如果该 MapperFile 中可剩余空间小于当前消息存储空间,返回END_OF_FILE。

  7. 将消息写入MapperFile中(内存中)。

AppendMessageResult

    public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,         long storeTimestamp, long logicsOffset, long pagecacheRT) { //追加结果(成功,到达文件尾(文件剩余空间不足)、消息长度超过、消息属性长度超出、未知错误)。         this.status = status; //消息的偏移量(相对于整个commitlog)。         this.wroteOffset = wroteOffset; //消息待写入字节。         this.wroteBytes = wroteBytes; //消息ID。         this.msgId = msgId; //消息写入时间戳。         this.storeTimestamp = storeTimestamp; //消息队列偏移量。         this.logicsOffset = logicsOffset; //消息写入时机戳(消息存储时间戳--- 消息存储开始时间戳)。         this.pagecacheRT = pagecacheRT;     } 复制代码

这个方法走完,就到了要putMessages中消息刷盘的时候了

消息刷盘

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {         // Synchronization flush         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;             if (messageExt.isWaitStoreMsgOK()) {                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                 service.putRequest(request);                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future();                 PutMessageStatus flushStatus = null;                 try {                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),                             TimeUnit.MILLISECONDS);                 } catch (InterruptedException | ExecutionException | TimeoutException e) {                     //flushOK=false;                 }                 if (flushStatus != PutMessageStatus.PUT_OK) {                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                         + " client address: " + messageExt.getBornHostString());                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);                 }             } else {                 service.wakeup();             }         }         // Asynchronous flush         else {             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {                 flushCommitLogService.wakeup();             } else {                 commitLogService.wakeup();             }         }     } 复制代码

  1. 同步刷写,这里有两种配置,是否一定要收到存储MSG信息,才返回,默认为true。

  2. 如果要等待存储结果。

  3. 唤醒同步刷盘线程。

  4. 异步刷盘机制。

同步刷盘

    public static class GroupCommitRequest {         private final long nextOffset;         private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();         private final long startTimestamp = System.currentTimeMillis();         private long timeoutMillis = Long.MAX_VALUE;         public GroupCommitRequest(long nextOffset, long timeoutMillis) {             this.nextOffset = nextOffset;             this.timeoutMillis = timeoutMillis;         }         public GroupCommitRequest(long nextOffset) {             this.nextOffset = nextOffset;         }         public long getNextOffset() {             return nextOffset;         }         public void wakeupCustomer(final PutMessageStatus putMessageStatus) {             this.flushOKFuture.complete(putMessageStatus);         }         public CompletableFuture<PutMessageStatus> future() {             return flushOKFuture;         }     } 复制代码

这里留个坑,之前的是使用countDownLatch,现在是用compatibleFuture

run

//Commitlog下的GroupCommitService中的核心方法         public void run() {             CommitLog.log.info(this.getServiceName() + " service started");             while (!this.isStopped()) {                 try {                     this.waitForRunning(10);                     this.doCommit();                 } catch (Exception e) {                     CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);                 }             }             // Under normal circumstances shutdown, wait for the arrival of the             // request, and then flush             try {                 Thread.sleep(10);             } catch (InterruptedException e) {                 CommitLog.log.warn("GroupCommitService Exception, ", e);             }             synchronized (this) {                 this.swapRequests();             }             this.doCommit();             CommitLog.log.info(this.getServiceName() + " service end");         } 复制代码

waitForRunning

    protected void waitForRunning(long interval) {         if (hasNotified.compareAndSet(true, false)) {             this.onWaitEnd();             return;         }         //entry to wait         waitPoint.reset();         try {             waitPoint.await(interval, TimeUnit.MILLISECONDS);         } catch (InterruptedException e) {             log.error("Interrupted", e);         } finally {             hasNotified.set(false);             this.onWaitEnd();         }     } 复制代码

doCommit

        private void doCommit() {             synchronized (this.requestsRead) {                 if (!this.requestsRead.isEmpty()) {                     for (GroupCommitRequest req : this.requestsRead) {                         // There may be a message in the next file, so a maximum of                         // two times the flush                         boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                         for (int i = 0; i < 2 && !flushOK; i++) {                             CommitLog.this.mappedFileQueue.flush(0);                             flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                         }                         req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);                     }                     long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                     if (storeTimestamp > 0) {                         CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                     }                     this.requestsRead.clear();                 } else {                     // Because of individual messages is set to not sync flush, it                     // will come to this process                     CommitLog.this.mappedFileQueue.flush(0);                 }             }         } 复制代码

刷盘方法

    public boolean flush(final int flushLeastPages) {         boolean result = true;         MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);         if (mappedFile != null) {             long tmpTimeStamp = mappedFile.getStoreTimestamp();             int offset = mappedFile.flush(flushLeastPages);             long where = mappedFile.getFileFromOffset() + offset;             result = where == this.flushedWhere;             this.flushedWhere = where;             if (0 == flushLeastPages) {                 this.storeTimestamp = tmpTimeStamp;             }         }         return result;     } 复制代码

  1. 根据上次刷新的位置,得到当前的 MappedFile 对象。

  2. 执行 MappedFile 的 flush 方法。

  3. 更新上次刷新的位置。

真正的刷盘方法

    public int flush(final int flushLeastPages) {         if (this.isAbleToFlush(flushLeastPages)) {             if (this.hold()) {                 int value = getReadPosition();                 try {                     //We only append data to fileChannel or mappedByteBuffer, never both.                     if (writeBuffer != null || this.fileChannel.position() != 0) {                         this.fileChannel.force(false);                     } else {                         this.mappedByteBuffer.force();                     }                 } catch (Throwable e) {                     log.error("Error occurred when force data to disk.", e);                 }                 this.flushedPosition.set(value);                 this.release();             } else {                 log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());                 this.flushedPosition.set(getReadPosition());             }         }         return this.getFlushedPosition();     } 复制代码

刷写的实现逻辑就是调用 FileChannel 或 MappedByteBuffer 的force 方法。

异步刷盘

相关服务类(线程)CommitLogFlushRealTimeService、CommitLogFlushRealTimeService 、CommitLogFlushRealTimeServiceCommitLogCommitRealTimeService。

  • commitIntervalCommitLog  CommitRealTimeService 线程的循环间隔,默认200ms。

  • commitCommitLogLeastPages   每次提交到文件中,至少需要多少个页(默认4页)。

  • flushCommitLogLeastPages  每次刷写到磁盘(commitlog),至少需要多个页(默认4页)。

  • flushIntervalCommitLog    异步刷新线程,每次处理完一批任务后的等待时间,默认为500ms。

MappedFileQueue#commit

    public boolean commit(final int commitLeastPages) {         boolean result = true; //findMappedFileByOffset 按照偏移量查找映射文件 //findMappedFileByOffset 两个参数,第一个是偏移量,第二个是如果没有找到映射文件,就返回第一个         MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);         if (mappedFile != null) { //这个commit调用的就是下面的commit方法,即MappedFile#commit方法             int offset = mappedFile.commit(commitLeastPages);             long where = mappedFile.getFileFromOffset() + offset;             result = where == this.committedWhere;             this.committedWhere = where;         }         return result;     } 复制代码

MappedFile#commit

    public int commit(final int commitLeastPages) {         if (writeBuffer == null) {             //no need to commit data to file channel, so just regard wrotePosition as committedPosition.             return this.wrotePosition.get();         }         if (this.isAbleToCommit(commitLeastPages)) {             if (this.hold()) {                 commit0(commitLeastPages);                 this.release();             } else {                 log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());             }         }         // All dirty data has been committed to FileChannel.         if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {             this.transientStorePool.returnBuffer(writeBuffer);             this.writeBuffer = null;         }         return this.committedPosition.get();     } 复制代码

  1. 看是否可以提交(符合最小需要提交的页)

isAbleToCommit

    protected boolean isAbleToCommit(final int commitLeastPages) {         int flush = this.committedPosition.get();         int write = this.wrotePosition.get();         if (this.isFull()) {             return true;         }         if (commitLeastPages > 0) {             return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;         }         return write > flush;     } 复制代码

  1. 获取上次刷新偏移量。

  2. 获取当前写入偏移量。

  3. 如果文件已满,返回true。

  4. 如果commitLeastPages大于0,则需要判断当前写入的偏移与上次刷新偏移量之间的间隔,如果超过commitLeastPages页数,则提交,否则本次不提交。

  5. 如果没有新的数据写入,本次提交任务结束。

commit0

MappedFile#commit0

    protected void commit0(final int commitLeastPages) { //获取当前写入偏移量         int writePos = this.wrotePosition.get(); //获取这次刷新偏移量。         int lastCommittedPosition = this.committedPosition.get(); //当前写入的偏移量-上次刷新的偏移量 大于 最少刷新的页面大小         if (writePos - lastCommittedPosition > commitLeastPages) {             try { //slice方法 其实就是开辟了一个新的缓冲区,从writeBuffer中没有数据的地方开始 //比如说writeBuffer的容量是5,写入了两个数,slice之后,新的缓冲区起始位置是0,容量就是5-2=3                 ByteBuffer byteBuffer = writeBuffer.slice(); //设置此缓冲区的位置。如果标记已定义且大于新位置,则将其丢弃。                 byteBuffer.position(lastCommittedPosition); //设置此缓冲区的限制。如果头寸大于新限制,则将其设置为新限制。如果标记已定义且大于新限制,则将其丢弃。                 byteBuffer.limit(writePos); //设置位置                 this.fileChannel.position(lastCommittedPosition); //写入数据                 this.fileChannel.write(byteBuffer); //更新数据                 this.committedPosition.set(writePos);             } catch (Throwable e) {                 log.error("Error occurred when commit data to FileChannel.", e);             }         }     } 复制代码

消息存储过程

image.png

主从同步机制

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { //确定当前节点是master节点         if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {             HAService service = this.defaultMessageStore.getHaService(); //消息已经存储完毕             if (messageExt.isWaitStoreMsgOK()) {                 // 决定是否要等待,根据从节点是否接受消息完成                 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {                     GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                     service.putRequest(request);                     service.getWaitNotifyObject().wakeupAll();                     PutMessageStatus replicaStatus = null;                     try {                         replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),                                 TimeUnit.MILLISECONDS);                     } catch (InterruptedException | ExecutionException | TimeoutException e) {                     }                     if (replicaStatus != PutMessageStatus.PUT_OK) {                         log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "                             + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());                         putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);                     }                 }                 // Slave problem                 else {                     // Tell the producer, slave not available                     putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);                 }             }         }     } 复制代码

消息传输过程

  1. Producer 将消息发送到 Broker 后,Broker 会采用同步或者异步的方式把消息写入到 CommitLog。RocketMQ 所有的消息都会存放在 CommitLog 中,为了保证消息存储不发生混乱,对 CommitLog 写之前会加锁,同时也可以使得消息能够被顺序写入到 CommitLog,只要消息被持久化到磁盘文件 CommitLog,那么就可以保证 Producer 发送的消息不会丢失。

  2. CommitLog 持久化后,会把里面的消息 Dispatch 到对应的 Consume Queue 上,Consume Queue 相当于 Kafka 中的 Partition,是一个逻辑队列,存储了这个 Queue 在 CommitLog 中的起始 Offset,log 大小和 MessageTag 的 hashCode。

  3. 当消费者进行消息消费时,会先读取 ConsumerQueue,逻辑消费队列 ConsumeQueue 保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 Offset,消息大小、和消息 Tag 的 HashCode 值。

  4. 直接从 ConsumerQueue 中读取消息是没有数据的,真正的消息主体在 CommitLog 中,所以还需要从 CommitLog 中读取消息。

总结

RocketMQ 采用文件系统的方式来存储消息,消息的主要存储文件包括 CommitLog 文件、ConsumeQueue 文件、IndexFile 文件。

  • CommitLog 是消息存储的物理文件,所有消息主题的消息都存储在 CommitLog 文件中,每个 Broker 上的 CommitLog 被当前机器上的所有 ConsumeQueue 共享。CommitLog 中的文件默认大小为 1G,可以动态配置;当一个文件写满以后,会生成一个新的 CommitLog 文件。所有的 Topic 数据是顺序写入在 CommitLog 文件中的。

  • ConsumeQueue 是消息消费的逻辑队列,消息达到 CommitLog 文件后将被异步转发到消息消费队列,供消息消费者消费,这里面包含 MessageQueue 在 CommitLog 中的物理位置偏移量 Offset,消息实体内容的大小和 Message Tag 的 hash 值。每个文件默认大小约为 600W 个字节,如果文件满了后会也会生成一个新的文件。

  • IndexFile 是消息索引文件,Index 索引文件提供了对 CommitLog 进行数据检索,提供了一种通过 key 或者时间区间来查找 CommitLog 中的消息的方法。在物理存储中,文件名是以创建的时间戳命名,固定的单个 IndexFile 大小大概为 400M,一个 IndexFile 可以保存 2000W 个索引。

单个 commitlog 文件,默认大小为 1G,由多个 commitlog 文件来存储所有的消息,commitlog 文件的命名以该文件在整个commitlog中的偏移量来命名,举例如下。

例如一个 commitlog 文件,1024个字节。

第一个文件: 00000000000000000000

第二个文件: 00000000000000001024

MappedFile 封装一个一个的 CommitLog 文件,而 MappedFileQueue 就是封装的就是一个逻辑的 commitlog 文件。mappedFile队列,从小到大排列。

使用内存映射机制,MappedByteBuffer, 具体封装类为MappedFile。

1、同步刷盘每次发送消息,消息都直接存储在 MapFile 的 mappdByteBuffer,然后直接调用 force() 方法刷写到磁盘,等到 force 刷盘成功后,再返回给调用方(GroupCommitRequest#waitForFlush)就是其同步调用的实现。

2、异步刷盘

分为两种情况,是否开启堆外内存缓存池,具体配置参数:MessageStoreConfig#transientStorePoolEnable。

1)transientStorePoolEnable = true

消息在追加时,先放入到 writeBuffer 中,然后定时 commit 到 FileChannel,然后定时flush。

2)transientStorePoolEnable=false(默认)

消息追加时,直接存入 MappedByteBuffer(pageCache) 中,然后定时 flush。


作者:shyblog
链接:https://juejin.cn/post/7025199135846318088


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐