阅读 18 SEO

RabbitMQ高级特性

0. 前言

本文内容分为如下三部分
RabbitMQ高级特性

消息可靠性投递
Consumer ACK
消费端限流
TTL
死信队列
延迟队列
日志与监控
消息可靠性分析与追踪
管理

RabbitMQ应用问题

消息可靠性保障
消息幂等性处理

1. 高级特性

1.1 消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式

rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback
消息从 exchange-->queue 投递失败则会返回一个 returnCallback
我们将利用这两个 callback 控制消息的可靠性投递

confirm模式

在上一篇中的最后我们用spring-boot配置了rabbitMQ,
在这里在原来的基础上继续进行,在生产者的application.yml添加

使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理
#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: xxx
    port: 5672
    username: root
    password: root
    virtual-host: /example
    #生产端配置
    #开启发送确认,此配置在Springboot2.3.0版本中已经@Deprecated了,默认就是
    # publisher-confirms: true
    #
    publisher-confirm-type: simple
    #开启发送失败退回
    publisher-returns: true

然后新增一个test

@Test
    public void testConfirm() {
        // 1. 设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。

        // 2. 使用rabbitTemplate.setConfirmCallback设置回调函数。
        //      当消息发送到exchange后回调confirm方法。
        //      在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关配置信息
             * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm方法被执行了");
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });
        // 3. 发生消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.confirm", "confirm mq hello~~~~~~~~");

    }

启动测试,结束后控制台成功打印

登陆rabbitmq管理后台也可以看到消息已经写入队列中了,只是还没有被消费。
其中回调函数中 confirm的第一个参数correlationData会在发送消息的函数convertAndSend的重载函数中会使用,这里没有使用这个参数。
ack比较重要,可以判断交换机是否收到消息
cause失败原因

return模式

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

application.yml添加

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    #生产端配置
    #开启发送确认,此配置在Springboot2.3.0版本中已经@Deprecated了,默认就是
    # publisher-confirms: true
    #
    publisher-confirm-type: simple
    #开启发送失败退回
    publisher-returns: true
    #开启执行return回调
    template:
      mandatory: true

编写测试

 /**
     * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时候才会执行 ReturnCallBack
     * 步骤:
     * 1. 开启回退模式:publisher-returns="true"
     * 2. 设置ReturnCallBack
     * 3. 设置Exchange处理消息的模式:
     *      1. 如果消息没有路由到Queue,则丢弃消息(默认)
     *      2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
     */
    @Test
    public void testReturn(){
        // 设置交换机处理失败消息的模式
        // rabbitTemplate.setMandatory(true);
        // 2.设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 被执行了");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });
        // 3. 发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boott.treturn", "return mq hello~~~~~~~~");

    }

控制台打印

不知道因为什么原因,return回调经常会不打印信息,有待研究

在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
txSelect(), 用于将当前channel设置成transaction模式
txCommit(),用于提交事务
txRollback(),用于回滚事务

1.2 Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

配置consumer的监听器

对于消费者,配置application.yml为

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

我们在原来RabbitMQListener上修改
新建一个监听器ackListener

@Component
@RabbitListener(queues = "boot_queue")
public class AckListener  {
    @RabbitHandler
    public void process(String hello,Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1. 接受转换消息
            System.out.println("ackListener收到的消息为:" + new String(message.getBody()));

            // 2. 处理业务逻辑
            System.out.println("处理业务逻辑");
            int i =3/0;
            // 3. 手动签收
            channel.basicAck(deliveryTag,true);
        }
        catch (Exception e){
            // 4. 拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
//            channel.basicReject(deliveryTag,true); 单条数据
           
        }
            //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //ack返回false,并重新回到队列,api里面解释得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒绝消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

    }
}

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息,签收成功,消息就被消费了。

如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

消息可靠性小结

  1. 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  2. 生产方确认Confirm
  3. 消费方确认Ack
  4. Broker高可用,后面集群会讲到

1.3 消费端限流

回顾一下这个图,我们说过MQ有个很重要的作用就是削峰填谷

接下来就学习如何实现限流
新建一个QosListener,这时候要把之前的AckListener注释掉,
修改application.yml

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    listener:
      direct:
        acknowledge-mode: manual
        #每次限流1条消息
        prefetch: 1
      simple:
        acknowledge-mode: manual
        prefetch: 1

其中新增了perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。

/**
 * Consumer 限流机制
 * 1. 确保ack机制为手动确认。
 * 2. listener-container配置属性
 * perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
 */
@Component
@RabbitListener(queues = "boot_queue")
public class QosListener {
    @RabbitHandler
    public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        //1.获取消息
        System.out.println("Qos:"+new String(message.getBody()));

        //2. 处理业务逻辑

        //3. 签收
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
}

在这里我们把最后一行 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);注释掉
启动消费者
发现只打印了一次

小结

配置 prefetch属性设置消费端一次拉取多少消息

消费端的确认模式一定为手动确认。acknowledge="manual"

1.4 TTL

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
比如我们有个订单系统,下订单时候如果30分钟内未被支付,那么这条消息就失效了。


控制台测试

打开我们的控制台,新建队列的时候可以看到最下面的arguments,我们先设置一个message ttl为10秒

再创建一个交换机

将交换机和队列绑定

再在这个页面下面手动发消息

十秒钟这个消息就会消失

编写代码测试

删除在控制台创建的交换机和对垒
producer端新增一个TTLConfig

@Configuration
public class TTLConfig {

    public static final String EXCHANGE_NAME = "test_exchange_ttl";
    public static final String QUEUE_NAME = "test_queue_ttl";

    //1.交换机
    @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }


    //2.Queue 队列
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(QUEUE_NAME).ttl(20000).build();
    }

    //3. 队列和交互机绑定关系 Binding
    /*
        1. 知道哪个队列
        2. 知道哪个交换机
        3. routing key
     */
    @Bean
    public Binding bindTTLQueueExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
    }

}

之后运行测试文件就能创建exchange和queue,其中创建队列时候设置了参数ttl为20秒,指队列的过期时间。
再编写测试函数

/**
     * TTL:过期时间
     *  1. 队列统一过期
     *
     *  2. 消息单独过期
     *
     *
     * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
     * 队列过期后,会将队列所有消息全部移除。
     * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
     *
     */
    @Test
    public void testTTL() {
        // 消息后处理对象,设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){

            /**
             * Change (or replace) the message.
             *
             * @param message the message.
             * @return the message.
             * @throws AmqpException an exception.
             */
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //1.设置message的信息
                message.getMessageProperties().setExpiration("10000");//消息的过期时间
                //2.返回该消息
                return message;
            }
        };


     
        //消息单独过期
        //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);


        for (int i = 0; i < 10; i++) {
            if(i == 5){
                //消息单独过期
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
            }else{
                //不过期的消息
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");

            }

        }
    }

在这里,其实消息不会看到它过期了,因为5不是在队列的头部,只有在队列头部的才会被移除掉。

小结

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

1.5 死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

在前面的ttl例子中,当我们的消息过期后,会被丢弃,但如果这个队列绑定了死信交换机,则消息不会被丢弃,而是发送到死信交换机,而死信交换机又可以绑定其他队列,从而可以重新被消费者消费。

考虑两个问题:

  • 第一个队列如何绑定死信交换机?
  • 消息什么时候成为死信

消息成为死信的三种情况

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

tip:
死信交换机和死信队列和正常的交换机和队列没有任何区别

  1. 声明正常的队列(redirect_queue)和交换机(redirect_exchange)
  2. 声明死信队列(dlx_queue)和死信交换机(dlx_exchange)
  3. 正常队列绑定死信交换机
    设置两个参数:
    • x-dead-letter-exchange:死信交换机名称
    • x-dead-letter-routing-key:发送给死信交换机的routingkey

修改application.yml

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    #生产端配置
    #开启发送确认,此配置在Springboot2.3.0版本中已经@Deprecated了,默认就是
    # publisher-confirms: true
    #
    publisher-confirm-type: simple
    #开启发送失败退回
    publisher-returns: true
    #开启执行return回调
    template:
      mandatory: true
      retry:
        # 允许消息消费失败的重试
        enabled: true
        # 消息最多消费次数3次
        max-attempts: 3
        # 消息多次消费的间隔1秒
        initial-interval: 1000
    listener:
      direct:
        #  设置为false,会丢弃消息或者重新发布到死信队列
        default-requeue-rejected: false

编写DeadLetterConfig

package org.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName DeadLetterConfig
 * @Description TODO
 * @Author Patrick Star
 * @Date 2020/12/7 9:37 下午
 */
@Configuration
public class DeadLetterConfig {
    public static final String DL_EXCHANGE = "dl_exchange";

    public static final String DL_QUEUE = "dl_queue";
    public static final String REDIRECT_QUEUE = "redirect_queue";
    public static final String REDIRECT_EXCHANGE = "redirect_exchange";
    public static final String DL_REDIRECT_ROUTING_KEY = "dlx.hehe";


    /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     */
    @Bean("dlExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.topicExchange(DL_EXCHANGE).durable(true).build();
    }

    @Bean("dlQueue")
    public Queue deadLetterQueue() {
        // 设置正常队列的长度限制和ttl
        return QueueBuilder.durable(DL_QUEUE).build();
    }

    @Bean("redirectQueue")
    public Queue redirectQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信队列Exchange
        args.put("x-dead-letter-exchange", DL_EXCHANGE);
//       x-dead-letter-routing-key    声明 死信队列抛出异常重定向队列的routingKey("dlx.hehe")
        args.put("x-dead-letter-routing-key", DL_REDIRECT_ROUTING_KEY);
        args.put("x-message-ttl", 10000);
        args.put("x-max-length", 10);
        return QueueBuilder.durable(REDIRECT_QUEUE).withArguments(args).build();
    }

    //1.交换机
    @Bean("redirectExchange")
    public Exchange redirectExchange() {
        return ExchangeBuilder.topicExchange(REDIRECT_EXCHANGE).durable(true).build();
    }


    /**
     * 死信队列绑定到死信交换器上.
     *
     * @return the binding
     */
    @Bean
    public Binding dlxBinding(Queue dlQueue, Exchange dlExchange) {
        return BindingBuilder
                .bind(dlQueue)
                .to(dlExchange)
                .with("dlx.#")
                .noargs();

    }

    /**
     * 将重定向队列通过routingKey(“dlx.hehe”)绑定到死信队列的Exchange上
     *
     * @return the binding
     */
    @Bean
    public Binding redirectToDLBinding(@Qualifier("redirectQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(DL_REDIRECT_ROUTING_KEY)
                .noargs();

    }

    /**
     * 绑定正常的交换机和队列
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return BindingBuilder
                .bind(redirectQueue())
                .to(redirectExchange())
                .with("test.dlx.#")
                .noargs();

    }
}


其中分别使用了几种方法来绑定队列,在Binding函数中

  1. 我们可以直接写队列的函数和交换机的函数,如最后一个
  2. 我们可以用@Qualifier注解指定队列和交换机,如倒数第二个
  3. 我们可以在传递参数时,将参数名字和定义的队列和交换机匹配

对消息产生死信的三种情况进行测试

  /**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */
    @Test
    public void testDlx(){
        //1. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.hehe","我是一条消息,我会死吗?");

        //2. 测试长度限制后,消息死信
       /* for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.haha","我是一条消息,我会死吗?");
        }*/

        //3. 测试消息拒收
//        rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.haha","我是一条消息,我会死吗?");

    }

对第一种情况,消息过期后会自动转到死信队列
对第二种情况,消息长度超过了限制,超过的会自动转到死信队列
对第三种情况,我们要编写一个消费者来监听正常的队列,让消息拒绝接收

新建一个DlxListener,注意,这里是监听我们的正常队列,而不是死信队列

@Component
@RabbitListener(queues = "redirect_queue")
public class DlxListener {
    @RabbitHandler
    public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1. 接受转换消息
            System.out.println("DlxListener收到的消息为:" + new String(message.getBody()));

            // 2. 处理业务逻辑
            System.out.println("处理业务逻辑");
            int i = 3 / 0;
            // 3. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {

            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            System.out.println("出现异常,拒绝接收");

            // 4. 拒绝签收,不重回队列 requeue = false
            channel.basicNack(deliveryTag, true, false);
//            channel.basicReject(deliveryTag,true); 单条数据

            //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //ack返回false,并重新回到队列,api里面解释得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒绝消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }


    }


}

之后运行Consumer,将test的第三个注释打开,进行测试,控制台打印


小结

  1. 死信交换机和死信队列和普通的没有区别

  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

  3. 消息成为死信的三种情况:

    1. 队列消息长度到达限制;
    2. 消费者拒接消费消息,并且不重回队列;
    3. 原队列存在消息过期设置,消息到达超时时间未被消费;

1.6 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

应用场景有:
比如用户下单后,30分钟未支付,取消订单,回滚库存。再比如新用户注册成功7天后,发送短信问候。

实现方式:
1. 定时器
2. 延迟队列

但是很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果

image.png

如何实现呢,加入我们给订单设定30分钟的支付时间,订单系统一开始将消息发送到正常队列,30分钟后转发到死信队列,有一个专门的库存系统保存去获取这条消息,来判断订单是支付了还是未支付。

具体步骤

  1. 定义正常的交换机( order_exchange )和队列( order_queue )
  2. 定义死信交换机及和队列
  3. 绑定设置正常队列过期时间为10秒钟,测试的时候30分钟太久了

新建一个OrderCondig,我们直接复制并修改之前的死信队列和死信交换机,将order_queue过期时间设置为30分钟,将正常交换机到DLX的routing key改为order.dlx.cancel,将DLX和死信队列的routing key改为order.dlx.#,正常交换机和正常队列的routing key 就为order.#

package org.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName DeadLetterConfig
 * @Description TODO
 * @Author Patrick Star
 * @Date 2020/12/7 9:37 下午
 */
@Configuration
public class OrderConfig {
    public static final String ORDER_DL_EXCHANGE = "order_dl_exchange";

    public static final String ORDER_DL_QUEUE = "order_dl_queue";
    public static final String ORDER_QUEUE = "order_queue";
    public static final String ORDER_EXCHANGE = "order_exchange";
    public static final String ORDER_DL_ORDER_ROUTING_KEY = "order.dlx.cancel";
    public static final String ORDER_ROUTING_KEY = "order.#";
    public static final String DLX_ROUTING_KEY = "order.dlx.#";

    /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     */
    @Bean("orderDLExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.topicExchange(ORDER_DL_EXCHANGE).durable(true).build();
    }

    @Bean("orderDLQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(ORDER_DL_QUEUE).build();
    }
    // order 队列
    @Bean("orderQueue")
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>(2);
                // x-dead-letter-exchange    声明  死信队列Exchange
        args.put("x-dead-letter-exchange", ORDER_DL_EXCHANGE);
                // x-dead-letter-routing-key    声明 死信队列抛出异常重定向队列的routingKey("order.dlx.cancel")
        args.put("x-dead-letter-routing-key", ORDER_DL_ORDER_ROUTING_KEY);
        args.put("x-message-ttl", 10000);
        args.put("x-max-length", 30);
        return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
    }

    // order交换机
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();
    }


    /**
     * 死信队列绑定到死信交换器上.
     *
     * @return the binding
     */
    @Bean
    public Binding newDLBinding(Queue orderDLQueue, Exchange orderDLExchange) {
        return BindingBuilder
                .bind(orderDLQueue)
                .to(orderDLExchange)
                .with(DLX_ROUTING_KEY)
                .noargs();

    }

    /**
     * 将重定向队列通过routingKey(“order.dlx.cancel”)绑定到死信队列的Exchange上
     *
     * @return the binding
     */
    @Bean
    public Binding orderToDLBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderDLExchange") Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ORDER_DL_ORDER_ROUTING_KEY)
                .noargs();

    }

    /**
     * 绑定正常的交换机和队列
     *
     * @return the binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder
                .bind(orderQueue())
                .to(orderExchange())
                .with(ORDER_ROUTING_KEY)
                .noargs();

    }
}

Consumer端新建一个orderListener,一定要监听死信队列

package org.example.rabbitmq.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @ClassName AckListener
 * @Description TODO
 * @Author Patrick Star
 * @Date 2020/12/7 5:50 下午
 */
@Component
@RabbitListener(queues = "order_dl_queue")
public class OrderListener {
    @RabbitHandler
    public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1. 接受转换消息
            System.out.println("orderListener收到的消息为:" + new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            // 3. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 4. 拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag, true, true);
//            channel.basicReject(deliveryTag,true); 单条数据

            //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //ack返回false,并重新回到队列,api里面解释得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒绝消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }


    }


}

我们主要看消费者是不是延迟10秒后收到消息,在消费者控制台打印消息。

延迟队列小结

  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
  2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果

1.7 日志与监控

我的rabbitmq运行在linux服务器上的一个docker容器中,我们可以进入该容器查看日志信息,对于不是docker运行的服务,就直接在服务器上查找。

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

我们使用docker ps查找我们运行的rabbitmq的容器id

再使用

docker logs 47b

查看docker的输出日志,其中47b是我rabbitmq容器id的前几个字母,里面就可以看到我们的rabbitmq的日志了。

进入容器内,使用命令rabbitmqctl status可以看到我们的Log文件是输出到了输出流,而没有使用文件保存,如果需要查看log文件,参考下面的解决方法

https://blog.csdn.net/fvdfsdafdsafs/article/details/110097643

也可以打开web控制台,也可以看到很多的的参数信息

点击name,可以看到一些负载参数

如果绿色的接近红色,就应该注意了。

也可以通过rabbitmq的控制命令查看

查看队列

rabbitmqctl list_queues

查看exchanges

rabbitmqctl list_exchanges

查看用户

rabbitmqctl list_users

查看连接

rabbitmqctl list_connections

查看消费者信息

rabbitmqctl list_consumers

查看环境变量

rabbitmqctl environment

查看未被确认的队列

rabbitmqctl list_queues name messages_unacknowledged

查看单个队列的内存使用

rabbitmqctl list_queues name memory

查看准备就绪的队列

rabbitmqctl list_queues name messages_ready

1.8 消息追踪

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

如何使用呢,我们可以将一个队列绑定到默认交换机,routing key 就为test_trace好了,然后往这个队列发消息,默认交换机会把消息转发到队列,同时,队列还收到了两条消息,是trace交换机发的详细的日志消息。

注意:打开 trace 会影响消息写入功能,适当打开后请关闭。

  • rabbitmqctl trace_on:开启Firehose命令

  • rabbitmqctl trace_off:关闭Firehose命令

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

我们要做的就是启用插件:rabbitmq-plugins enable rabbitmq_tracing

首先进入 docker 容器内部,执行rabbitmq-plugins list命令

root@47b96c4e50ef:/# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@47b96c4e50ef
 |/
[  ] rabbitmq_amqp1_0                  3.8.9
[  ] rabbitmq_auth_backend_cache       3.8.9
[  ] rabbitmq_auth_backend_http        3.8.9
[  ] rabbitmq_auth_backend_ldap        3.8.9
[  ] rabbitmq_auth_backend_oauth2      3.8.9
[  ] rabbitmq_auth_mechanism_ssl       3.8.9
[  ] rabbitmq_consistent_hash_exchange 3.8.9
[  ] rabbitmq_event_exchange           3.8.9
[  ] rabbitmq_federation               3.8.9
[  ] rabbitmq_federation_management    3.8.9
[  ] rabbitmq_jms_topic_exchange       3.8.9
[E*] rabbitmq_management               3.8.9
[e*] rabbitmq_management_agent         3.8.9
[  ] rabbitmq_mqtt                     3.8.9
[  ] rabbitmq_peer_discovery_aws       3.8.9
[  ] rabbitmq_peer_discovery_common    3.8.9
[  ] rabbitmq_peer_discovery_consul    3.8.9
[  ] rabbitmq_peer_discovery_etcd      3.8.9
[  ] rabbitmq_peer_discovery_k8s       3.8.9
[E*] rabbitmq_prometheus               3.8.9
[  ] rabbitmq_random_exchange          3.8.9
[  ] rabbitmq_recent_history_exchange  3.8.9
[  ] rabbitmq_sharding                 3.8.9
[  ] rabbitmq_shovel                   3.8.9
[  ] rabbitmq_shovel_management        3.8.9
[  ] rabbitmq_stomp                    3.8.9
[  ] rabbitmq_top                      3.8.9
[  ] rabbitmq_tracing                  3.8.9
[  ] rabbitmq_trust_store              3.8.9
[e*] rabbitmq_web_dispatch             3.8.9
[  ] rabbitmq_web_mqtt                 3.8.9
[  ] rabbitmq_web_mqtt_examples        3.8.9
[  ] rabbitmq_web_stomp                3.8.9
[  ] rabbitmq_web_stomp_examples       3.8.9

查看我们启用的插件,带*的就是启用了

然后输入

rabbitmq-plugins enable rabbitmq_tracing

然后在web控制台,刷新一下,点击admin标签,可以看到右边多了一个tracing标签

可以在这里添加新的tracing

Virtual host:虚拟主机名

Name :tracing 的名称,将来可以在这个tracing中记录很多的日志信息

Format:日志信息的格式,一种是text,一种是json,text是给我们程序员看的,是明文的,json便于计算机解析,经过base64编码了

Max payload bytes:不填就把所有的消息体记录起来,填个10就表示取前10个字节

Pattern:#表示的接收所有的消息,不管是发过来的还是消费的都可,如果只想接受发过来的,就填publish.#,如果想接收消费的,就填deliver.#

添加完后上边就多了一些东西,点进去右边的mytrace.log现在是啥都没有,我们往里面发一些消息,就可以看到一些日志信息了。

点开队列列表可以看到多了一个队列记录我们的日志消息

这个队列绑定的是amq.rabbitmq.trace交换机

2. RabbitMQ应用问题

2.1 消息可靠性保障

我们想要消息100%发送成功似乎是不可能的,但是我们最起码可以保证消息99.9%能发送成功吧。其中就用到了消息补偿。

来看一张图,其中producer 和 consumer都有自己对应的数据库,正常情况下,producer将业务数据入库,发送消息到Q1,consumer监听Q1,接收消息去消费,完成相应的DB操作。

考虑不正常的情况,producer操作数据库成功了,但是第2步发送消息失败了,这样consumer收不到消息,业务操作也会失败,这时候怎么办呢,producer发送消息完成之后,到第3步,延迟发送消息到Q3,也就是说,发送一条消息之后等待一段时间再发一条消息到Q3,这两个消息一模一样,如果消息到Q1发送成功,consumer消费成功后要向Q2发送确认消息,相当于consumer转换了一次角色,变成了生产端,我们有个回调检查服务,监听了Q2的确认消息,收到确认消息,将消息写入消息数据库中。而回调检查服务也监听着Q3,Q1、Q2、Q3中的消息ID是一样的,收到Q3的消息后,回调检查服务要去比对当前这条消息是否和刚才写入MDB的消息是否一致,检查是否被消费过,没有被消费过得话,MDB一定不存在记录,则转到第8步,producer重新发送消息。最后还有个问题,如果第2步第3步都失败了呢,我们还有个最后的保障,也就是定时检查服务,检查业务数据库DB和消息MDB是否能匹配,检查DB是不是比MDB的数据多了,或者匹配不上了,再去调用producer,重发那些多的消息。

2.2 消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

举个栗子,我花了500买衣服,所以服务器会发消息到MQ去扣款,下订单付款的时候,可能由于网络的原因,不管是什么原因,总之我发了两条扣款500的消息,总不能扣我1000块吧,RabbitMQ采取了用数据库乐观锁的机制来保障消息的幂等性。

这里的乐观锁,就是给消息加了个版本号,由上图的例子,consumer第一次执行消息写入数据库,version是1,乐观锁会将id与version绑定,且version取出来加1,下一条消息来了的时候,consumer想写数据库的时候判断条件 id = 1 and version = 1便不成立了。

幂等性其实有很多的保障机制,这里只介绍了乐观锁的机制。

文章分类
后端
文章标签
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 gxwowoo@163.com 举报,一经查实,本站将立刻删除。
相关推荐