RabbitMq如何实现可靠传输
默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除
如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除
消息发送确认
发送的消息怎么样才算失败或成功?如何确认?
当消息无法路由到队列时,确认消息路由失败。消息成功路由时,当需要发送的队列都发送成功后,进行确认消息,对于持久化队列意味着写入磁盘,对于镜像队列意味着所有镜像接收成功
ConfirmCallback
通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
@Componentpublic class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("消息唯一标识:"+correlationData); System.out.println("确认结果:"+ack); System.out.println("失败原因:"+cause); }复制代码
还需要在配置文件添加配置
spring: rabbitmq: publisher-confirms: true复制代码
ReturnCallback
通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
@Componentpublic class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息主体 message : "+message); System.out.println("消息主体 message : "+replyCode); System.out.println("描述:"+replyText); System.out.println("消息使用的交换器 exchange : "+exchange); System.out.println("消息使用的路由键 routing : "+routingKey); } }
作者:HILLJAY
链接:https://juejin.cn/post/7018904022698426398