阅读 159

rabbitmq的死信队列

1.业务背景

如果有有错误消息,如果手动nack同时将消息放回到队列中,那么这条消息会反复消费,留在队列中 。

如果nack后将消息丢弃,那么如果碰到网络抖动,消息也会丢失 。所以 通过建立死信队列避免消息丢失。

2.实现

文件目录如下: 在这里插入图片描述

1.原理

我们额外建立一条队列。当消息进入进入业务队列后,如果收到nack那么就将这条消息放入这条条队列中 。

2.修改pom文件

       <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-amqp</artifactId>         </dependency> 复制代码

3.修改配置文件

server:   port: 8088 spring:   rabbitmq:     host: 192.168.*.*     port: 5672     username: root     password: root     virtual-host: /     listener:       simple:         acknowledge-mode: manual  #手动应答         prefetch: 1 # 每次只处理一个信息     publisher-confirms: true #开启消息确认机制     publisher-returns: true #支持消息发送失败返回队列 复制代码

4.rabbitmq的配置

@Configuration public class RabbitMqConfig {     /**      * 连接工厂      */     @Autowired     private ConnectionFactory connectionFactory;     /**      * 定制化amqp模版      *      * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack      * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack      */     @Bean     public RabbitTemplate rabbitTemplate() {         Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);         // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true         rabbitTemplate.setMandatory(true);         // 发送消息确认, yml需要配置 publisher-confirms: true         rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());         // 消息返回, yml需要配置 publisher-returns: true         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {             String correlationId = message.getMessageProperties().getCorrelationId().toString();             logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange,                 routingKey);         });         return rabbitTemplate;     }     /**      * 确认发送消息是否成功(调用util方法)      *      * @return      */     @Bean     public MsgSendConfirmCallBack msgSendConfirmCallBack() {         return new MsgSendConfirmCallBack();     } } 复制代码

5.util类

发送是否成功的回调方法。

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {     /**      * 回调方法      * @param correlationData      * @param ack      * @param cause      */     @Override     public void confirm(CorrelationData correlationData, boolean ack, String cause) {         System.out.println("MsgSendConfirmCallBack  , 回调id:" + correlationData);         if (ack) {             System.out.println("消息发送成功");         } else {             //可以将消息写入本地,使用定时任务重新发送             System.out.println("消息发送失败:" + cause + "\n重新发送");         }     } } 复制代码

这里有一个点,如果想做实现消息失败重新发送,在注释处可以实现。需要将消息写入本地,如果失败从本地读取,然后发送,如果成功删除本地信息。

6.业务队列(如:订单业务)

这里声明了一个业务队列 ,关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数。

@Configuration public class BusinessConfig {     /**      * 业务1模块direct交换机的名字      */     public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";     /**      * 业务1 demo业务的队列名称      */     public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";     /**      * 业务1 demo业务的routekey      */     public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";          @Bean     public Queue yewu1DemoDeadQueue() {         // 将普通队列绑定到死信队列交换机上         Map<String, Object> args = new HashMap<>(2);         args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);         args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);         return new Queue("yewu1_demo_dead_queue", true, false, false, args);     }     /**      * 将消息队列和交换机进行绑定      */     @Bean     public Binding binding_one() {         return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())             .with("yewu1_demo_dead_key");     } } 复制代码

这里有一个点如果想持久化消息到磁盘,需要新建队列时,new Queue将第二个参数输入为true,但是面对大并发时效率会变低 。

7.死信队列

这里声明死信队列与绑定关系。

@Configuration public class DeadConfig {     /**      * 死信队列      */     public final static String FAIL_QUEUE_NAME = "fail_queue";     /**      * 死信交换机      */     public final static String FAIL_EXCHANGE_NAME = "fail_exchange";     /**      * 死信routing      */     public final static String FAIL_ROUTING_KEY = "fail_routing";     /**      * 创建配置死信队列      *      */     @Bean     public Queue deadQueue() {         return new Queue(FAIL_QUEUE_NAME, true, false, false);     }     /**      * 死信交换机      *      * @return      */     @Bean     public DirectExchange deadExchange() {         DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);         return directExchange;     }     /**      * 绑定关系      *       * @return      */     @Bean     public Binding failBinding() {         return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);     } } 复制代码

8.生产者消费者

生产者与消费者的代码实现。

public enum RabbitEnum {     /**      * 处理成功      */     ACCEPT,     /**      * 可以重试的错误      */     RETRY,     /**      * 无需重试的错误      */     REJECT @RequestMapping("/sendDirectDead")         String sendDirectDead(@RequestBody String message) throws Exception {         System.out.println("开始生产");         CorrelationData data = new CorrelationData(UUID.randomUUID().toString());         rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",                 message, data);         System.out.println("结束生产");         System.out.println("发送id:" + data);         return "OK,sendDirect:" + message;     }     @RabbitListener(queues = "yewu1_demo_dead_queue")     protected void consumerDead(Message message, Channel channel) throws Exception {         RabbitEnum ackSign = RabbitEnum.RETRY;         try {             int i = 10 / 0;             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         } catch (Exception e) {             ackSign = RabbitEnum.RETRY;             throw e;         } finally {             // 通过finally块来保证Ack/Nack会且只会执行一次             if (ackSign == RabbitEnum.ACCEPT) {                 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);             } else if (ackSign == RabbitEnum.RETRY) {                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);             }         }     } 复制代码

9.实验

当发送yewu1_demo_dead_queue队列时,如果抛出异常,会放入死信队列中。


作者:小黄鸡1992
链接:https://juejin.cn/post/7028099901489840164


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐