阅读 149

RocketMQ事务消息篇(三)之事务消息源码分析

事务消息处理基本流程

在介绍事务消息的时候,画了一个简单的流程图说明事务消息的整体处理流程:

image.png p.s. 下面的序号(1、2、3...)表示顺序,与上图中的(1、2、3...)无关。

  1. 事务生产者调用事务消息发送接口,发送消息

  2. 开始预提交阶段,客户端发送预消息并在请求头标记这是一条事务消息。消息体就是我们实际要发送的消息内容

  3. broker接收到消息,发现这是一条事务消息,于是将当前消息备份。所谓“备份”即将当前消息的所有数据写入内部的事务topic中而不是我们实际要发送的topic,该事务topic由于消费端并没有订阅,所以这条消息对消费端不可见,然后响应客户端的发送请求

  4. 客户端确认发送成功,则执行本地事务,并标记事务执行状态。如果发送失败,就不需要执行本地事务了,直接标记事务执行失败,需要回滚。

  5. 基于事务的执行状态,给本次发送事务消息的那个broker发送一条结束事务的请求(请求头里包含是提交还是回滚,亦或者是未知状态)

  6. broker收到事务结束的请求,如果是未知状态就打条日志直接返回了;如果是提交事务,就将备份的那条事务消息恢复过来,写入到原始的topic里,此时就对消费端可见了,然后要在op队列里(另一个内部topic)写入一条消息,消息体就是当前这条事务消息的队列偏移值。如果是回滚事务,就只用在op队列里写入一条消息即可,就不还原事务消息了,这样对消费端就不可见。关于op队列的具体作用,后面源码部分再详说。

  7. 说一下事务回查。事务回查就是broker扫描到那些没有提交也没回滚的消息,找到客户端,发一个请求,让客户端再次提交一下事务结束状态。

源码剖析

整体流程涉及的代码还是比较多的,接下来对每一部分的源码拆开进行分析。

客户端处理,事务执行流程

客户端处理基本流程如下:

image.png 源码的主要入口实现部分在这个方法里: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException;

代码如下,我已经加上相关注释:

    public TransactionSendResult sendMessageInTransaction(final Message msg,         final LocalTransactionExecuter localTransactionExecuter, final Object arg)         throws MQClientException {         // 获取我们创建的事务消息监听器(本地事务执行及事务回查)         TransactionListener transactionListener = getCheckListener();         if (null == localTransactionExecuter && null == transactionListener) {             throw new MQClientException("tranExecutor is null", null);         }           // 事务消息不支持延时消息         // ignore DelayTimeLevel parameter         if (msg.getDelayTimeLevel() != 0) {             MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);         }           Validators.checkMessage(msg, this.defaultMQProducer);           SendResult sendResult = null;         // 设置半消息属性(TRAN_MSG),标志这是一个事务半消息         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");         // 设置生产组属性(PGROUP)         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());         try {             // 消息发送,这里就是平常同步发送普通消息的默认发送流程了,在客户端这里,就已经没有太多其它额外的针对事务消息的处理了,主要还是会判断是半消息的话,打个事务消息的标记             sendResult = this.send(msg);         } catch (Exception e) {             throw new MQClientException("send message Exception", e);         }           LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;         Throwable localException = null;         switch (sendResult.getSendStatus()) {             case SEND_OK: {                 try {                     // transactionId                     if (sendResult.getTransactionId() != null) {                         msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                     }                     // UNIQ_KEY,这是客户端发送的时侯生成的一个唯一ID,也就是我们平常用的sendResult里的msgId                     String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                     if (null != transactionId && !"".equals(transactionId)) {                         // 在这种情况下,transactionId其实就是message的客户端msgId                         msg.setTransactionId(transactionId);                     }                     // 一般,我平常使用的时候,不会采用localTransactionExecuter方式调用事务消息接口,所以这里一般是空                     if (null != localTransactionExecuter) {                         localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                     } else if (transactionListener != null) {                         log.debug("Used new transaction API");                         // 执行我们本地事务                         localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                     }                     if (null == localTransactionState) {                         localTransactionState = LocalTransactionState.UNKNOW;                     }                       if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                         log.info("executeLocalTransactionBranch return {}", localTransactionState);                         log.info(msg.toString());                     }                 } catch (Throwable e) {                     log.info("executeLocalTransactionBranch exception", e);                     log.info(msg.toString());                     localException = e;                 }             }             break;             // 未发送成功,回滚消息             case FLUSH_DISK_TIMEOUT:             case FLUSH_SLAVE_TIMEOUT:             case SLAVE_NOT_AVAILABLE:                 localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;                 break;             default:                 break;         }           try {             // 事务结束处理,是提交还是回滚还是要做其它操作             this.endTransaction(msg, sendResult, localTransactionState, localException);         } catch (Exception e) {             log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);         }           TransactionSendResult transactionSendResult = new TransactionSendResult();         transactionSendResult.setSendStatus(sendResult.getSendStatus());         transactionSendResult.setMessageQueue(sendResult.getMessageQueue());         transactionSendResult.setMsgId(sendResult.getMsgId());         transactionSendResult.setQueueOffset(sendResult.getQueueOffset());         transactionSendResult.setTransactionId(sendResult.getTransactionId());         transactionSendResult.setLocalTransactionState(localTransactionState);         // 返回事务消息发送结果,这里已经返回本地事务执行状态了         return transactionSendResult;     } 复制代码

关于上面调用事务结束请求的方法,具体代码及注释如下:

    public void endTransaction(         final Message msg,         final SendResult sendResult,         final LocalTransactionState localTransactionState,         final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {         final MessageId id;         // getOffsetMsgId,这个是服务端的msgId,包含了不少消息的元信息         if (sendResult.getOffsetMsgId() != null) {             id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());         } else {             id = MessageDecoder.decodeMessageId(sendResult.getMsgId());         }         String transactionId = sendResult.getTransactionId();         // 半消息发到了哪个broker上,最后提交也得到这个broker上         final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());         EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();         requestHeader.setTransactionId(transactionId);         // 设置事务消息的提交偏移(提交到内部的事务topic上了)         requestHeader.setCommitLogOffset(id.getOffset());         switch (localTransactionState) {             case COMMIT_MESSAGE:                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);                 break;             case ROLLBACK_MESSAGE:                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);                 break;             case UNKNOW:                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);                 break;             default:                 break;         }           doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);         requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());         requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());         requestHeader.setMsgId(sendResult.getMsgId());         // 竟然还带本地事务异常信息         String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;         // 2阶段执行的消息         this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,             this.defaultMQProducer.getSendMsgTimeout());     } 复制代码

通过查阅上面两个方法的代码基本对客户的事务消息发送部分,已经比较清楚了(事务回查的处理部分在后面)

broker端处理,接收事务半消息(预提交)

broker端在接收到事务消息的基本处理流程如下:

image.png

简单来说,事务消息也如普通消息一样发送到broker,broker像接收普通一样接收,接收到之后会判断是否有事务标记,有的话,就把这条消息的所有信息写入一个内部的事物topic里,来保证暂时对消费端不可见,关键源码如下(以异步写入为示例):

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,                                                                 SendMessageContext mqtraceContext,                                                                 SendMessageRequestHeader requestHeader) {         final RemotingCommand response = preSend(ctx, request, requestHeader);         final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();           ...         // 省略上面一部分代码,主要看下面判断这是一条事务消息         // 如果这个属性存在,说明是发送的事务消息         if (transFlag != null && Boolean.parseBoolean(transFlag)) {             // broker检查是否启用事务消息了             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {                 response.setCode(ResponseCode.NO_PERMISSION);                 response.setRemark(                         "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                                 + "] sending transaction message is forbidden");                 return CompletableFuture.completedFuture(response);             }             // 从这里可心看到,事务消息是一个单独的流程处理,和其它消息不一样             putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);         } else {             putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);         }         return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);     } 复制代码

发现这是一条事务消息后,备份事务消息的代码如下:

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {         // 会把消息的原始topic及队列信息存储到属性中,因为要写到事务topic的队列里,就是备份原消息         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,             String.valueOf(msgInner.getQueueId()));         // 把事务标记也去掉         msgInner.setSysFlag(             MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));         // 设置当前存储的消息的topic为:RMQ_SYS_TRANS_HALF_TOPIC, 事务半消息的topic         msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());         // 发送到队列0里, 这个topic也只有一条队列,另外还用到的一个topic是:RMQ_SYS_TRANS_OP_HALF_TOPIC,也是只有一条队列在每个broker上         msgInner.setQueueId(0);         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));         return msgInner;     } 复制代码

然后就是将这条事务半消息如果普通消息一样写入到内部的事务topic里了

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {         // 见名知义,事务半消息处理         // 事务半消息存储完成,基本半消息发送(一阶段)已经算是结束了,在写入commitlog的时候,基本没有对这个事务topic做额外处理了,就像普通消息那样了         return store.asyncPutMessage(parseHalfMessageInner(messageInner));     } 复制代码

broker端处理,事务结束

前面提到客户端在一阶段(发送事务半消息后,然后执行本地事务),会再根据事务执行状态给broker发送一条事务结束的请求,告诉broker是提交还是要回滚,基本流程如下:

image.png 事务处理上还是做了不少动作的,看一下它的关键源码实现:

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws         RemotingCommandException {         // 事务消息结束(二阶段)处理         final RemotingCommand response = RemotingCommand.createResponseCommand(null);         final EndTransactionRequestHeader requestHeader =             (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);         LOGGER.debug("Transaction request:{}", requestHeader);         // 从节点是不允许处理事务消息的         if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {             response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);             LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");             return response;         }           // 事务回查标记,是否为事务回查         if (requestHeader.getFromTransactionCheck()) {             switch (requestHeader.getCommitOrRollback()) {                 case MessageSysFlag.TRANSACTION_NOT_TYPE: {                     LOGGER.warn("Check producer[{}] transaction state, but it's pending status."                             + "RequestHeader: {} Remark: {}",                         RemotingHelper.parseChannelRemoteAddr(ctx.channel()),                         requestHeader.toString(),                         request.getRemark());                     return null;                 }                   case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {                     LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."                             + "RequestHeader: {} Remark: {}",                         RemotingHelper.parseChannelRemoteAddr(ctx.channel()),                         requestHeader.toString(),                         request.getRemark());                       break;                 }                   case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {                     LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."                             + "RequestHeader: {} Remark: {}",                         RemotingHelper.parseChannelRemoteAddr(ctx.channel()),                         requestHeader.toString(),                         request.getRemark());                     break;                 }                 default:                     return null;             }         } else {             // 只是为了打条日志             switch (requestHeader.getCommitOrRollback()) {                 // 本地事务执行状态返回的是UNKNOW,该回查了                 case MessageSysFlag.TRANSACTION_NOT_TYPE: {                     LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."                             + "RequestHeader: {} Remark: {}",                         RemotingHelper.parseChannelRemoteAddr(ctx.channel()),                         requestHeader.toString(),                         request.getRemark());                     return null;                 }                   case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {                     break;                 }                   case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {                     LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."                             + "RequestHeader: {} Remark: {}",                         RemotingHelper.parseChannelRemoteAddr(ctx.channel()),                         requestHeader.toString(),                         request.getRemark());                     break;                 }                 default:                     return null;             }         }         OperationResult result = new OperationResult();         if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {             // 开始提交事务消息             // 这里就是根据之前提交的内部事务topic的半消息偏移,查出来提交的这条消息             result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);             if (result.getResponseCode() == ResponseCode.SUCCESS) {                 // result.getPrepareMessage()就是之前提交到内部的事务topic上的那条半消息,检查下这条信息是否正确,日志偏移呀什么的是否匹配,是不是查错消息了                 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);                 if (res.getCode() == ResponseCode.SUCCESS) {                     // 一切都OK了,准备提交事务,这里就是把原始消息信息,原原本本的恢复过来                     MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());                     msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));                     msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());                     msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());                     msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());                     // 清除事务消息属性                     MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);                     // 原始消息写入对应的topic,此时对消费端就可见了,可以正常消费了                     RemotingCommand sendResult = sendFinalMessage(msgInner);                     if (sendResult.getCode() == ResponseCode.SUCCESS) {                         // 删除的动作是在op队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)写入该消息,tag是d,消息体是在事务topic里的消息偏移                         this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());                     }                     return sendResult;                 }                 return res;             }         } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {             // 事务回滚             result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);             if (result.getResponseCode() == ResponseCode.SUCCESS) {                 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);                 if (res.getCode() == ResponseCode.SUCCESS) {                     this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());                 }                 return res;             }         }         /**          * 总结一下:          * commit:就是先把原始消息写入到原始topic里,然后删除半消息就是在op 的事务topic里写入一条tag为d的消息,消息体就是半消息的偏移值          * rollback: 就是直接删除过程了          * unknown: 就是上面 两步都没做,原始消息未写入,op队列里也没有          */         // UNKNOW状态了,看来得回查了,其实这里返不返回都一样,客户端是one way调用         response.setCode(result.getResponseCode());         response.setRemark(result.getResponseRemark());         return response;     } 复制代码

事务回查

前面的流程呢,都是事务的正常处理,但是如果客户端在发送事务请求的时候,宕机、重启、网络原因等,最终是导致事务结束的请求没有正确发送给broker处理,那就需要事务回查机制。

broker启动的时候,会启动一个定时任务(默认是1分钟),从前面提到的事务topic的队列里拉取消息,检查拉取到的消息是否已经处理过了(比如提交或回滚),如果没有,根据是否要进行事务回查,让客户再检查一下本地事务的执行状态并告诉broker或者丢弃。

其实这里涉及到几个关键问题需要明白:

  1. 写入到事务topic里的事务半消息在事务结束后进行删除,但是rocketmq是追加写的方式,所以这里的删除并不是从消息队列里真正的删除一条消息。

  2. broker怎么知道一条事务半消息是否已经提交或者回滚了,正如前面说的,这里引入一个op队列,即另一个内部topic,如果一条消息已经提交或回滚了,就向op队列里写入一条消息消息体就是在事务topic队列里的偏移值,如果op队列里没有,那就说明这条事务消息的状态还没有提交,还是未知的,可能需要事务回查。

  3. 我们知道写入到事务topic的事务半消息也如普通消息一样,是顺序写顺序读的,如果此时已经写入1、2、3、4、5、6共6条事务消息了,1、2、5的事务状态已经提交或者回滚了,但是3、4还是未知的,那总不能再重新回头消费吧。并没有,如果broker发现这条消息是未知状态的,那在处理的时候,把这条消息再追回写入到事务topic的队列里,然后找客户端回查。继续下一条消息处理,等到再处理到刚才重新追加的这条事务消息的时候,再从op队列里检查一下,这条事务半消息是否已经处理过了,如果还没有而且也没达到事务回查的最大次数,那就再追回写回去,再继续呗。如果已经达到最大次数,就丢弃(其实是写到另一个内部topic,也就是说事务消息这里用到了3个内部topic来存储数据)

关于事务回查这里主要采用文字描述说明了,就不再画一个流程图了,关键源码如下:

broker默认每分钟检查一次,从内部事务topic队列和op队列里拉取消息,然后比对,当前的事务半消息是否已经处理过了,是否需要回查:

    // 定时检查事务消息(1分钟查一次)     @Override     public void check(long transactionTimeout, int transactionCheckMax,         AbstractTransactionalMessageCheckListener listener) {         try {             String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;             Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);             if (msgQueues == null || msgQueues.size() == 0) {                 log.warn("The queue of topic is empty :" + topic);                 return;             }             log.debug("Check topic={}, queues={}", topic, msgQueues);             for (MessageQueue messageQueue : msgQueues) {                 long startTime = System.currentTimeMillis();                 // 一条预消息队列对应一个op队列(实际也就1条队列)                 MessageQueue opQueue = getOpQueue(messageQueue);                 // 获取事务topic和op topic的消费偏移                 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);                 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);                 log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);                 if (halfOffset < 0 || opOffset < 0) {                     log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,                         halfOffset, opOffset);                     continue;                 }                   List<Long> doneOpOffset = new ArrayList<>();                 HashMap<Long, Long> removeMap = new HashMap<>();                 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);                 if (null == pullResult) {                     log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",                         messageQueue, halfOffset, opOffset);                     continue;                 }                 // single thread                 int getMessageNullCount = 1;                 long newOffset = halfOffset;                 long i = halfOffset;                 while (true) {                     if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {                         log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);                         break;                     }                     // 已经处理过的,没必要再处理一次                     if (removeMap.containsKey(i)) {                         log.debug("Half offset {} has been committed/rolled back", i);                         Long removedOpOffset = removeMap.remove(i);                         // op的队列偏移                         doneOpOffset.add(removedOpOffset);                     } else {                         // 获取当前要处理的half消息                         GetResult getResult = getHalfMsg(messageQueue, i);                         MessageExt msgExt = getResult.getMsg();                         if (msgExt == null) {                             if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {                                 break;                             }                             if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {                                 log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,                                     messageQueue, getMessageNullCount, getResult.getPullResult());                                 break;                             } else {                                 log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",                                     i, messageQueue, getMessageNullCount, getResult.getPullResult());                                 i = getResult.getPullResult().getNextBeginOffset();                                 newOffset = i;                                 continue;                             }                         }                           // 超过15次丢弃,或者消息过期了(超过了设置的文件保存时间)                         if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {                             // 默认实现是移动到TRANS_CHECK_MAXTIME_TOPIC这个topic里                             listener.resolveDiscardMsg(msgExt);                             newOffset = i + 1;                             i++;                             continue;                         }                         if (msgExt.getStoreTimestamp() >= startTime) {                             log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,                                 new Date(msgExt.getStoreTimestamp()));                             break;                         }                           long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();                         long checkImmunityTime = transactionTimeout;                         // 未找到写入这个属性的地方(除了test)                         String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);                         if (null != checkImmunityTimeStr) {                             checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);                             if (valueOfCurrentMinusBorn < checkImmunityTime) {                                 // 超过这个检查时间,重新写回半消息队列                                 if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {                                     newOffset = i + 1;                                     i++;                                     continue;                                 }                             }                         } else {                             // 新提交的半消息,暂不处理,估计是认为事务也可能没执行完,处理也没意义                             if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {                                 log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,                                     checkImmunityTime, new Date(msgExt.getBornTimestamp()));                                 break;                             }                         }                         List<MessageExt> opMsg = pullResult.getMsgFoundList();                         // checkImmunityTime默认是6秒,第一次可以检查的时间                         // 正常来说,每条提交/回滚就是已经处理过的消息,在op队列里都有一条消息,如果没有(第一次回查),或者已经有了,但是存放时间已经满足检查条件了,都得回查                         boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)                             || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))                             || (valueOfCurrentMinusBorn <= -1);                           if (isNeedCheck) {                             // 把这个消息重新写回half队列里                             if (!putBackHalfMsgQueue(msgExt, i)) {                                 continue;                             }                             // 事务回查,确认状态后,下次再处理上边这个写回的半消息                             listener.resolveHalfMsg(msgExt);                         } else {                             pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);                             log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,                                 messageQueue, pullResult);                             continue;                         }                     }                     newOffset = i + 1;                     i++;                 }                 if (newOffset != halfOffset) {                     transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);                 }                 //已经连接处理的偏移,如果2,3,4,6,7,则最偏移到4.                 long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);                 if (newOpOffset != opOffset) {                     transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);                 }             }         } catch (Throwable e) {             log.error("Check error", e);         }           // 总之,一条一条拉取,如果在op队列,就是已经commit或者rollback的,不用再管了,否则就检查是否需要回查,需要的话,这条写再写回half队列 复制代码

关于如何从op队列里确认事务半消息已经处理过了,主要就是根据op队列里拉取的消息的消息体(保存的是事务半消息的偏移值)来判断当前偏移的事务消息是否已经处理过了:

    /**      * Read op message, parse op message, and fill removeMap      *      * @param removeMap Half message to be remove, key:halfOffset, value: opOffset.      * @param opQueue Op message queue.      * @param pullOffsetOfOp The begin offset of op message queue.      * @param miniOffset The current minimum offset of half message queue.      * @param doneOpOffset Stored op messages that have been processed.      * @return Op message result.      */     private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,         MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {         // 使用CID_RMQ_SYS_TRANS拉取op队列里的消息         PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);         if (null == pullResult) {             return null;         }         if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL             || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {             log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,                 pullResult);             transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());             return pullResult;         } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {             log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,                 pullResult);             return pullResult;         }         List<MessageExt> opMsg = pullResult.getMsgFoundList();         if (opMsg == null) {             log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);             return pullResult;         }         // 对拉取的消息做过滤处理,判断一下这些op消息对应的half消息是否处理过了         for (MessageExt opMessageExt : opMsg) {             // 记录这条op消息对应在事务队列里的偏移值             Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));             log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),                 opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);             if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {                 // 找到的都是需要"删除"的半消息                 // miniOffset就是halfOffset,将要消费的最小偏移 ,这是处理完成待删除的op消息                 if (queueOffset < miniOffset) {                     doneOpOffset.add(opMessageExt.getQueueOffset());                 } else {                     // op消息保存的是half消息的偏移,这个值竟然大于当前half消息的偏移,这是已经处理过的,不需要再处理了                     removeMap.put(queueOffset, opMessageExt.getQueueOffset());                 }             } else {                 log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);             }         }         log.debug("Remove map: {}", removeMap);         log.debug("Done op list: {}", doneOpOffset);         return pullResult;     } 复制代码

broker在该生产组下找到一个生产者客户端发送回查请求:

    public void sendCheckMessage(MessageExt msgExt) throws Exception {         CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();         checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());         checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());         checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));         checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());         checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());         msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));         msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));         msgExt.setStoreSize(0);         String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);         // 根据生产组找到对应的生产者实例,发送一个回查请求         Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);         if (channel != null) {             brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);         } else {             LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);         }     } 复制代码

客户端收到请求后,执行事务回查逻辑,并将事务状态发回broker:

    // 事务反查     public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,         RemotingCommand request) throws RemotingCommandException {         final CheckTransactionStateRequestHeader requestHeader =             (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);         final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());         final MessageExt messageExt = MessageDecoder.decode(byteBuffer);         if (messageExt != null) {             if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {                 messageExt.setTopic(NamespaceUtil                     .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));             }             String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);             if (null != transactionId && !"".equals(transactionId)) {                 // 就是客户端 msg Id,如果用户没有自定义设置这个值                 messageExt.setTransactionId(transactionId);             }             final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);             if (group != null) {                 MQProducerInner producer = this.mqClientFactory.selectProducer(group);                 if (producer != null) {                     final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());                     producer.checkTransactionState(addr, messageExt, requestHeader);                 } else {                     log.debug("checkTransactionState, pick producer by group[{}] failed", group);                 }             } else {                 log.warn("checkTransactionState, pick producer group failed");             }         } else {             log.warn("checkTransactionState, decode message failed");         }           return null;     } 复制代码

结语

关于rocketmq事务消息篇到此结束。


作者:不识君
链接:https://juejin.cn/post/7023193997627621390


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐