阅读 134

RocketMQ Producer重试机制,何时重试,何时不重试?

版本

4.7.1,以下说明参照该版本实现

功能特性

image.png

官网说明

Producer的send方法本身支持内部重试,重试逻辑如下:

  • 至多重试2次。

  • 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。

  • 如果本身向broker发送消息产生超时异常,就不会再重试。

以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

注意

  • 异步发送在发送过程中出现异常进行重试,在解析请求结果时,发现响应状态码有其它异常(消息可能未正确被broker处理)会继续进行重试,重试依然选择当前broker。但是如果响应结果不为空的话,即使处理响应时发生异常也不会进行重试。

  • 同步发送时,如果发送过程中没有异常,但是发送结果不OK,也会选择另一个broker继续进行重试(需要打开该配置项:retryAnotherBrokerWhenNotStoreOK)

  • 顺序消息发送失败不进行重试,顺序消息指的是同步+指定消息队列的方式发送

  • 异步+指定消息队列的方式发送,不认为是顺序发送,因为异步发送无法保证消息顺序,尽管在发送过程中出现异常不进行重试,但是发送完成后的处理逻辑和异步方式一样,如果发送状态不满足指定条件,会继续进行重试,同异步发送流程

  • one-way方式比较特别,因为不等待结果,所以和其它方式不一样,不存在等结果返回后再考虑是否继续重试,但发送时和同步发送一样,异常会进行重试

重试逻辑代码

重试逻辑在:源码org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout)

消息的发送流程:发送消息->sendDefaultImpl->sendKernelImpl,重试的逻辑在sendDefaultImpl,发送失败进行重试,会再次调用sendKernelImpl发送消息 顺序消息不重试的原因:顺序发送流程:发送消息->sendKernelImpl,所以没有重试逻辑 消息发送重试在进行队列选择的时候,如果有多个broker master,下次重试的broker不同于上次的

生产者流控不重试

因流控发送失败的消息,不进行重试,下面是摘自官方文档说明,哪些场景进行流控:

生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。 生产者流控:

  • commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。

  • 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。

  • broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。

  • broker通过拒绝send 请求方式实现流量控制。 注意,生产者流控,不会尝试消息重投。 消费者流控:

  • 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。

  • 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。

  • 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。 消费者流控的结果是降低拉取频率。

发送时允许重试的异常

• RemotingException
• MQClientException
• MQBrokerException,如下响应码不重试:
            case ResponseCode.TOPIC_NOT_EXIST:
            case ResponseCode.SERVICE_NOT_AVAILABLE:
            case ResponseCode.SYSTEM_ERROR:
            case ResponseCode.NO_PERMISSION:
            case ResponseCode.NO_BUYER_ID:
            case ResponseCode.NOT_IN_CURRENT_UNIT:复制代码

MQBrokerException是在消息发送结果中,发送状态不OK(且状态码不是ResponseCode.FLUSH_DISK_TIMEOUT、ResponseCode.FLUSH_SLAVE_TIMEOUT、ResponseCode.SLAVE_NOT_AVAILABLE、ResponseCode.SUCCESS),就会抛出这个异常。

broker流控返回的状态码:RemotingSysResponseCode.SYSTEM_BUSY(不重试)


作者:不识君
链接:https://juejin.cn/post/7023941534873550878


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐