RocketMQ Producer重试机制,何时重试,何时不重试?
版本
4.7.1,以下说明参照该版本实现
功能特性
官网说明
Producer的send方法本身支持内部重试,重试逻辑如下:
至多重试2次。
如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
如果本身向broker发送消息产生超时异常,就不会再重试。
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
注意
异步发送在发送过程中出现异常进行重试,在解析请求结果时,发现响应状态码有其它异常(消息可能未正确被broker处理)会继续进行重试,重试依然选择当前broker。但是如果响应结果不为空的话,即使处理响应时发生异常也不会进行重试。
同步发送时,如果发送过程中没有异常,但是发送结果不OK,也会选择另一个broker继续进行重试(需要打开该配置项:retryAnotherBrokerWhenNotStoreOK)
顺序消息发送失败不进行重试,顺序消息指的是同步+指定消息队列的方式发送
异步+指定消息队列的方式发送,不认为是顺序发送,因为异步发送无法保证消息顺序,尽管在发送过程中出现异常不进行重试,但是发送完成后的处理逻辑和异步方式一样,如果发送状态不满足指定条件,会继续进行重试,同异步发送流程
one-way方式比较特别,因为不等待结果,所以和其它方式不一样,不存在等结果返回后再考虑是否继续重试,但发送时和同步发送一样,异常会进行重试
重试逻辑代码
重试逻辑在:源码org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout)
消息的发送流程:发送消息->sendDefaultImpl->sendKernelImpl,重试的逻辑在sendDefaultImpl,发送失败进行重试,会再次调用sendKernelImpl发送消息 顺序消息不重试的原因:顺序发送流程:发送消息->sendKernelImpl,所以没有重试逻辑 消息发送重试在进行队列选择的时候,如果有多个broker master,下次重试的broker不同于上次的
生产者流控不重试
因流控发送失败的消息,不进行重试,下面是摘自官方文档说明,哪些场景进行流控:
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。 生产者流控:
commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
broker通过拒绝send 请求方式实现流量控制。 注意,生产者流控,不会尝试消息重投。 消费者流控:
消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。 消费者流控的结果是降低拉取频率。
发送时允许重试的异常
• RemotingException • MQClientException • MQBrokerException,如下响应码不重试: case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT:复制代码
MQBrokerException是在消息发送结果中,发送状态不OK(且状态码不是ResponseCode.FLUSH_DISK_TIMEOUT、ResponseCode.FLUSH_SLAVE_TIMEOUT、ResponseCode.SLAVE_NOT_AVAILABLE、ResponseCode.SUCCESS),就会抛出这个异常。
broker流控返回的状态码:RemotingSysResponseCode.SYSTEM_BUSY(不重试)
作者:不识君
链接:https://juejin.cn/post/7023941534873550878