阅读 228

RabbitMQ学习之路(rabbitmq入门教程)

一、概述

image.png

1、 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力

2、 消息服务中两个重要概念: 消息代理(message broker)目的地(destination) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目 的地。

3、 消息队列主要有两种形式的目的地

  • 队列(queue) :点对点消息通信(point-to-point)

  • 主题(topic) :发布(publish)/订阅(subscribe)消息通信

4、点对点式

  • 息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列

  • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者

5、 发布订阅式

  • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么 就会在消息到达时同时收到消息

6、 JMS (Java Message Service)JAVA消息服务

  • 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

7、AMQP Advanced Message Queuing Protocol)

  • 高级消息队列协议,也是一个消息代理的规范,兼容JMS

  • RabbitMQ是AMQP的实现

image.png

8、 Spring支持

  • spring-jms提供了对JMS的支持

  • spring-rabbit提供了对AMQP的支持

  • 需要ConnectionFactory的实现来连接消息代理

  • 提供JmsTemplateRabbitTemplate来发送消息

  • @JmsListener(JMS)@RabbitListener(AMQP注解在方法上监听消息代理发 布的消息

  • @EnableJms@EnableRabbit开启支持

9、 Spring Boot自动配置

  • JmsAutoConfiguration

  • RabbitAutoConfiguration

二、RabbitMQ简介

1、简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

2、核心概念

Message

消息,消息是不具名的,它由消息头消息体组成。消息体不透明的,而消息头则由一系列的可选属性组 成,这些属性包括routing-key(路由键) 、priority(相对于其他消息的优先权)、delivery-mode(指出 该消息可能需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

Exchange有4种类型:direct(默认)fanout, topic, 和headers,不同类型的Exchange转发消息的策略有 所区别

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息 可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连 接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。

Connection

网络连接,比如一个TCP连接。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚 拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这 些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所 以引入了信道的概念,以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有 自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定, RabbitMQ 默认的 vhost 是 / 。

Broker

表示消息队列服务器实体

image.png

三、Docker安装RabbitMQ

docker run -d --name rabbitmq \ -p 5671:5671 \ -p 5672:5672 \ -p 4369:4369 \ -p 25672:25672 \ -p 15671:15671 \ -p 15672:15672 \ rabbitmq:management 复制代码

4369、25672 (Erlang发现&集群端口)
5672、5671 (AMQP端口)
15672 (web管理后台端口)
61613、61614 (STOMP协议端口)
1883、8883 (MQTT协议端口)

查看文档:www.rabbitmq.com/networking.…

四、RabbitMQ运行机制

1、AMQP 中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 ExchangeBinding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被 消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

image.png

2、Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型: directfanouttopicheadersheaders 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多, 目前几乎用不到了,所以直接看另外三种类型:

image.png

五、RabbitMQ整合

1、引入spring-boot-starter-amqp

       <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-amqp</artifactId>         </dependency> 复制代码

2、开启注解@EnableRabbit

3、配置文件

spring:   rabbitmq:     host: 192.168.56.10     password: guest     username: guest     port: 5672     virtual-host: / 复制代码

4、创建Exchange、Queue、Binding测试

package com.test.order; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /**  *  RabbitMq给容器自动配置了  *  RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate  *  所有的属性都是spring.rabbitmq  *  @ConfigurationProperties(prefix = "spring.rabbitmq")  *  public class RabbitProperties {}  *  */ @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class ApplicationTests {     @Autowired     private AmqpAdmin amqpAdmin;     /**      * 创建 exchange      */     @Test     public void createExchane() {         //String name, 【交换机名称】         // boolean durable, 【是否持久化】         // boolean autoDelete【是否自动删除】         DirectExchange directExchange = new DirectExchange("hello-java-exchabge", true, false);         amqpAdmin.declareExchange(directExchange);         log.info("exchange创建成功:{}", "hello-java-exchabge");     }     /**      * 创建Queue      */     @Test     public void createQueue() {         //String name, 【队列名称】         // boolean durable,【是否持久化】         // boolean exclusive, 【是否排他】         // boolean autoDelete 【是否自动删除】         Queue queue = new Queue("hello-java", true, false, false);         amqpAdmin.declareQueue(queue);         log.info("Queue创建成功:{}", "hello-java");     }     /**      * 创建绑定关系      */     @Test     public void createBinding() {         //  String destination, 【目的地】         //  Binding.DestinationType destinationType, 【目的地类型】         //  String exchange, 【交换机】         //  String routingKey, 【路由键】         //  Map<String, Object> arguments) 【参数】         Binding binding = new Binding("hello-java",                 Binding.DestinationType.QUEUE,                 "hello-java-exchabge", "hello.java", null);         amqpAdmin.declareBinding(binding);         log.info("Binding创建成功:{}", "hello-binding");     } } 复制代码

5、收发消息测试

  @Autowired     private RabbitTemplate rabbitTemplate;         /**      * 发送消息      */     @Test     public  void sendMsg(){         String msg = "Hello World";         //如果发送的消息是对象,一定要序列化 ,或者发送的对象消息转换为json         OrderReturnReasonEntity entity = new OrderReturnReasonEntity();         entity.setId(1L);         entity.setName("测试");         entity.setSort(1);         entity.setStatus(1);         entity.setCreateTime(new Date());         rabbitTemplate.convertAndSend("hello-java-exchabge","hello.java",entity);         log.info("消息发送完成");     } 复制代码

消息json转化配置

package com.test.gulimall.order.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyRabbitConfig {     /**      * 配置RabbitMQ的消息转换机制为json格式      *      * @return      */     @Bean     public MessageConverter messageConverter() {         return new Jackson2JsonMessageConverter();     } } 复制代码

6、监听消息

必须在容器内才生效

 @RabbitListener(queues = {"hello-java-queue"}) @Service("orderItemService") public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {    /**      * queues:声明需要监听的所有队列      * <p>      * org.springframework.amqp.core.Message      * <p>      * 参数可以写一下类型      * 1、Message message:原生消息详细信息。头+体      * 2、T<发送的消息的类型> OrderReturnReasonEntity content;      * 3、Channel channel:当前传输数据的通道      * <p>      * Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息      * 场景:      * 1)、订单服务启动多个;同一个消息,只能有一个客户端收到      * 2)、 只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息      * 监听消息:使用@RabbitListener ;必须有@EnableRabbit      * @RabbitListener : 类+方法上 【监听那些队列即可】      * @RabbitHandler : 标在方法上 【重载区分消息】      */ //    @RabbitListener(queues = {"hello-java-queue"})       @RabbitHandler     public void recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {         //消息体         byte[] body = message.getBody();         //消息头         MessageProperties messageProperties = message.getMessageProperties();         System.err.println("接收消息:" + message);         System.err.println("content:" + content);         System.err.println("channel:" + channel);     }      } 复制代码

六、RabbitMQ消息确认机制-可靠抵达

  • 保证消息不丢失、可靠抵达, 可以使用事务消息 , 性能下降250倍

  • publisher confirmCallback 确认模式

  • publisher returnCallback 未投递到queue退回模式

  • consumer ack机制

image.png

官方文档:www.rabbitmq.com/reliability…

1、可靠抵达-ConfirmCallback

spring.rabbitmq.publisher-confirms=true 复制代码

  • 在创建ConnectionFactory的时候PublishConfirms(true) 选项 开启ConfirmCallback

  • CorrelationData : 用来表示当前消息唯一性

  • 消息只要被broker接收就会执行ConfirmCallback ,如果是Cluster模式,需要所有Broker接收带才会调用ConfirmCallback

  • 被Broker接收到只能表示Message已经到达服务器 ,并不能保证消息一定会被投递到目标Queue里 , 所以需要用到接下来的ReturnCallback

package com.order.config; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration public class MyRabbitConfig {     @Autowired     private RabbitTemplate rabbitTemplate;     /**      * 定制RabbitTemplate      */     @PostConstruct  //MyRabbitConfig对象创建完后执行这个方法     public void initRabbitTemplate() {         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {             /**              * 只要消息抵达Broker 就ack=true              * @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)              * @param ack  消息是否成功收到              * @param cause  失败的原因              */             @Override             public void confirm(CorrelationData correlationData, boolean ack, String cause) {                 System.out.println("correlationData[" + correlationData + "]====>" +                         "ack[" + ack + "]====>" +                         "cause[" + cause + "]" );             }         });     } } 复制代码

当我们只发送不消费情况下控制台打印:

第四个参数指定: correlationData 当前消息的唯一关联数据(这个消息的唯一id)

         rabbitTemplate.convertAndSend("hello-java-exchabge",                     "hello1.java",                     entity,                     new CorrelationData($.getUUID())); 复制代码

结果: correlationData保证消息可靠抵达

image.png

2、可靠抵达-ReturnCallback

spring.rabbitmq.publisher-returns=true spring.rabbitmq.template. mandatory= true 复制代码

  • Confirm 模式只能保证消息到达Broker,不能保证消息准确投递到目标Queue里。在有些业务场景下,我们需要保证一定都到目标Queue里,此刻就需要用到return退回模式

  • 这样如果未能投递到目标Queue里 将盗用ReturnCallback, 可以记录下详细到投递数据,定期巡检或者地洞纠错需要这些数据

package com.order.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration public class MyRabbitConfig {     @Autowired     private RabbitTemplate rabbitTemplate;     /**      * 定制RabbitTemplate      */     @PostConstruct  //MyRabbitConfig对象创建完后执行这个方法     public void initRabbitTemplate() {         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {             /**              * 只要消息抵达Broker 就ack=true              * @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)              * @param ack  消息是否成功收到              * @param cause  失败的原因              */             @Override             public void confirm(CorrelationData correlationData, boolean ack, String cause) {                 System.out.println("confirm correlationData[" + correlationData + "]====>" +                         "ack[" + ack + "]====>" +                         "cause[" + cause + "]");             }         });         /**          * 设置消息抵达队列的回调          */         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {             /**              *              * @param message  投递失败的消息详细信息              * @param replyCode  回复的状态码              * @param replyText 回复的文本内容              * @param exchange 消息发送的交换机              * @param routingKey 消息指定的路由键              */             @Override             public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {                 System.out.println("Fail Message[" + message + "]====>" +                         "replyCode[" + replyCode + "]=====>" +                         "replyText[" + replyText + "]====>" +                         "exchange[" + exchange + "]====>" +                         "routingKey[" + routingKey + "]");             }         });     } } 复制代码

发送消息是指定错误的routingKey

 rabbitTemplate.convertAndSend("hello-java-exchabge", "errorRoutingKey", entity); 复制代码

结果:

image.png

3、可靠抵达-Ack消息确认机制

消息端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)默认是自动确认的,只要消息收到,客户端会自动确认,服务端会移除这个消息
问题: 当发送很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了!
发生消息丢失;
解决: 手动确认只要我们没有明确告诉MQ,消息被签收,没有ACK,消息就一直是unacked状态,即使Consumer宕机。消息不会丢失,会重新变为Ready,下一次新的Consumer连接进来就发给它

## 手动ack模式 spring.rabbitmq.listener.direct.acknowledge-mode=manual 复制代码

  • 消费者获得消息,成功处理,可以回复Ack给Broker

    • basic.ack 用于肯定确认 ; broker将移除此详细

    • basic.nack 用于否定确认 ;可以指定broker是否丢弃此消息,可以批量

    • basic.reject 用于否定确认 ; 同时,但不能批量

  • 默认。消息被消费者收到,就会从brokerqueue中移除

  • queue无消费者,消息依然会被存储,直到消费者消费

  • 消费者收到消息,默认会自动ack,但是如果无法确定此消息是否被处理完成,或者处理成功。可以开启手动ack模式

    • 消息处理成功 ack() 界都下一个消息,此消息broker就会被移除

    • 消息处理失败,nack()/reject() ,从新发送给其他人进行处理,或者容错处理后ack

    • 消息一致没有调用ack/nack,broker认为此消息正在被处理,不会投递给被人,此时客户端断开,消息不会被broker异常,会投递给别人

//签收 非批量 channel.basicAck(deliveryTag, false); //拒收 非批量  退回   channel.basicNack(deliveryTag, false,true); 复制代码

七、RabbitMQ 延时队列(实现定时任务)

场景: 比如为付款订单,超过一定时间后,系统自动取消订单,并释放占有物品

常用解决方案:

  • 缺点:   消耗系统内存、增加数据库的压力、存在较大的时间误差

  • 解决:   rabbitmqde 消息TTL 和死信Exchange结合

1、消息的TTL(Time To Live)

消息的TTL就是 消息的存活时间。 RabbitMQ可以对 队列 和 消息 分别设置TTL。

  • 对队列设置就是队列没有消费者连着的保留时间, 也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

  • 如果队列设置了,消息也设置了,那么会 取小的 。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。

2、Dead Letter Exchanges(DLX)

一个消息在满足如下条件下,会进 死信路由 ,记住这里是路由而不是队列, 一个路由可以对应很多队列。(什么是死信)

  • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不 会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false

  • 上面的消息的TTL到了,消息过期了。

  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上

  • Dead Letter Exchange其实就是一种普通的exchange,和创建其他 exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中 去

  • 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息 被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列

手动ack&异常消息统一放在一个队列处理建议的两种方式

  • catch异常后, 手动发送到指定队列 ,然后使用 channel 给 rabbitmq 确认消息已消费

  • 给 Queue 绑定死信队列,使用 nack ( requque 为false)确认消息消费失败

image.png

3、延时队列实现-1

image.png

4、延时队列实现-2


image.png

image.png

image.png

5、SpringBoot中使用延时队列

1、Queue、Exchange、Binding可以@Bean进去

2、监听消息的方法可以有三种参数(不分数量,顺序)

Object content, Message message, Channel channel

3、channel可以用来拒绝消息,否则自动ack;

代码演示:

1、创建exchange、queue、binding

package com.order.config; import com.order.entity.OrderEntity; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; @Configuration public class MyMqConfig {     //@Bean Binding,Queue,Exchange     /**      * 容器中的 Binding,Queue,Exchange 都会自动创建(RabbitMQ没有的情况)      * RabbitMQ 只要有。@Bean声明属性发生变化也不会覆盖      *      * @return      */     @Bean     public Queue orderDelayQueue() {         Map<String, Object> arguments = new HashMap<>();         /**          * x-dead-letter-exchange: order-event-exchange          * x-dead-letter-routing-key: order.release.order          * x-message-ttl: 60000          */         arguments.put("x-dead-letter-exchange", "order-event-exchange");         arguments.put("x-dead-letter-routing-key", "order.release.order");         arguments.put("x-message-ttl", 60000);         //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments         Queue queue = new Queue("order.delay.queue", true, false, false, arguments);         return queue;     }     @Bean     public Queue orderReleaseOrderQueue() {         Queue queue = new Queue("order.release.order.queue", true, false, false);         return queue;     }     @Bean     public Exchange orderEventExchange() {         //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments         return new TopicExchange("order-event-exchange", true, false);     }     @Bean     public Binding orderCreateOrderBingding() {         //String destination, DestinationType destinationType, String exchange, String routingKey,         // Map<String, Object> arguments         return new Binding("order.delay.queue",                 Binding.DestinationType.QUEUE,                 "order-event-exchange",                 "order.create.order",                 null);     }     @Bean     public Binding orderReleaseOrderBingding() {         return new Binding("order.release.order.queue",                 Binding.DestinationType.QUEUE,                 "order-event-exchange",                 "order.release.order",                 null);     } } 复制代码

2、监听过期 消息

      /**      * 监听消息过期的消息      *      * @param entity      * @param channel      * @param message      * @throws IOException      */     @RabbitListener(queues = "order.release.order.queue")     public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {         System.err.println("收到过期的消息" + entity.getOrderSn());         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);     } 复制代码

3、测试发送消息

   @Autowired     RabbitTemplate rabbitTemplate;     @ResponseBody     @GetMapping("/test/createOrder")     public String createOrderTest() {         OrderEntity orderEntity = new OrderEntity();         orderEntity.setOrderSn($.getUUID());         //给mq发送消息         rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", orderEntity);         return "success";     } 复制代码

结果:一分钟打印结果

image.png

6、如何保证消息可靠性-消息重复

消息丢失

  • 消息发送出去,由于网络问题没有抵达服务器

    • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机 制,可记录到数据库,采用定期扫描重发的方式

    • 做好日志记录,每个消息状态是否都被服务器收到都应该记录

    • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进 行重发

  • 消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚 未持久化完成,宕机

    • publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

  • 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

    • 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重 新入队

7、如何保证消息可靠性-消息重复

消息重复

  • 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息 重新由unack变为ready,并发送给其他消费者

  • 消息消费失败,由于重试机制,自动又将消息发送出去

  • 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

    • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有 工作单的状态标志

    • 使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理

    • rabbitMQ的每一个消息都有redelivered字段,可以获取是否 是被重新投递过来的,而不是第一次投递过来的

8、如何保证消息可靠性-消息积压

消息积压

  • 消费者宕机积压

  • 消费者消费能力不足积压

  • 发送者发送流量太大

    • 上线更多的消费者,进行正常消费

    • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

八、自定义配置RabbitTemplate[循环引用时]

package com.order.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import javax.annotation.PostConstruct; @Configuration public class MyRabbitConfig {    @Primary    @Bean    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        this.rabbitTemplate = rabbitTemplate;        rabbitTemplate.setMessageConverter(messageConverter());        initRabbitTemplate();        return rabbitTemplate;    }    /**     * 使用JSON序列化机制,进行消息转换     */    @Bean    public MessageConverter messageConverter(){        return new Jackson2JsonMessageConverter();    }    /**     * 定制RabbitTemplate     * 1、服务器收到消息就回调     *      1、spring.rabbitmq.publisher-confirms=true     *      2、设置确认回调ConfirmCallback     * 2、消息正确抵达队列进行回调     *      1、 spring.rabbitmq.publisher-returns=true     *          spring.rabbitmq.template.mandatory=true     *      2、设置确认回调ReturnCallback     *     * 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)。     *      spring.rabbitmq.listener.simple.acknowledge-mode=manual 手动签收     *      1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息     *          问题:     *              我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。就会发生消息丢失;     *              消费者手动确认模式。只要我们没有明确告诉MQ,货物被签收。没有Ack,     *                  消息就一直是unacked状态。即使Consumer宕机。消息不会丢失,会重新变为Ready,下一次有新的Consumer连接进来就发给他     *      2、如何签收:     *          channel.basicAck(deliveryTag,false);签收;业务成功完成就应该签收     *          channel.basicNack(deliveryTag,false,true);拒签;业务失败,拒签     */ //    @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法    public void initRabbitTemplate(){        //设置确认回调        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {            /**             *             * 1、只要消息抵达Broker就ack=true             * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)             * @param ack  消息是否成功收到             * @param cause 失败的原因             */            @Override            public void confirm(CorrelationData correlationData, boolean ack, String cause) {                /**                 * 1、做好消息确认机制(pulisher,consumer【手动ack】)                 * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍                 */                //服务器收到了;                //修改消息的状态                System.out.println("confirm...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");            }        });        //设置消息抵达队列的确认回调        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {            /**             * 只要消息没有投递给指定的队列,就触发这个失败回调             * @param message   投递失败的消息详细信息             * @param replyCode 回复的状态码             * @param replyText 回复的文本内容             * @param exchange  当时这个消息发给哪个交换机             * @param routingKey 当时这个消息用哪个路由键             */            @Override            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {                //报错误了。修改数据库当前消息的状态->错误。                System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]===>exchange["+exchange+"]===>routingKey["+routingKey+"]");            }        });    } }


作者:小小明de烦恼
链接:https://juejin.cn/post/7051388628454359077

文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐