阅读 108

RabbitMQ基础知识

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是给予AMQP协议(Advanced Message Queuing Protocol 高级消息队列协议,是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计)的。

AMQP核心概念

  1. Server:又称Broker,接收客户端的连接,实现AMQP实体服务;

  2. Connection:连接,应用程序和Broker之间的网络连接;

  3. Channel:网络信道,几乎所有的操作都是在Channel中进行的,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务,有点类似于数据中的session;

  4. Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性,Body则就是消息体内容;

  5. Virtual Host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue,有点类似于Redis中的16个db,是逻辑层面的隔离;

  6. Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列(Producer生产消息后都是直接投递到Exchange中);

  7. Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key;

  8. Routing Key:一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息;

  9. Queue:也被称为Message Queue,消息队列,保存消息并将它们转发给消费者。

RabbitMQ架构图

rabbitmq架构图.jpeg Producer生产消息之后直接将消息投递到Exchange中,在投递的时候需要指定两个重要的信息,一个是消息需要被投递到哪个Exchange上,另一个是Routing Key,也就是将消息路由到哪个Message Queue上。

RabbitMQ安装

参考官网的安装,已经非常详细了,官网推荐的安装是将RabbitMQ和Erlang一起安装了,如果要单独安装的话,需要注意RabbitMQ和Erlang之间的版本需要对应。 www.rabbitmq.com/install-rpm…

RabbitMQ基本使用

  1. 服务的启动:rabbitmq-server start &

  2. 服务的停止:rabbitmqctl stop_app

  3. 管理插件:rabbitmq-plugins enable rabbitmq_management(启动管控台插件,方便图形化管理rabbitmq)

  4. 访问地址:http://localhost:15672

RabbitMQ常用命令-基础操作

  1. rabbitmqctl stop_app: 关闭应用

  2. rabbitmqctl start_app: 启动应用

  3. rabbitmqctl status: 查看节点状态

  4. rabbitmqctl add_user username password: 添加用户

  5. rabbitmqctl list_users: 列出所有用户

  6. rabbitmqctl delete_user username: 删除用户

  7. rabbitmqctl clear_permissions -p vhostpath username: 清除用户权限

  8. rabbitmqctl list_user_permissions username: 列出用户权限

  9. rabbitmqctl change_password username newpassword: 修改密码

  10. rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*": 设置用户权限(权限分别为configure write read,也就是可以配置、可写、可读)

  11. rabbitmqctl add_vhost vhostpath: 创建虚拟主机

  12. rabbitmqctl list_vhosts: 列出所有虚拟主机

  13. rabbitmqctl list_permissions -p vhostpath: 列出虚拟主机上所有权限

  14. rabbitmqctl list_queues: 查看所有队列信息

  15. rabbitmqctl -p vhostpath purge_queue blue: 清楚队列中的消息

RabbitMQ常用命令-高级操作

  1. rabbitmqctl reset: 移除所有数据,要在rabbitmqctl stop_app之后使用

  2. rabbitmqctl join_cluster <clusternode> [--ram]: 组成集群命令

  3. rabbitmqctl change_cluster_node_type <clusternode> disc | ram: 修改集群节点的存储形式,disc为磁盘存储,消息数据是存储在磁盘上的,可靠性高,但是持久化时间长,ram是内存存储,消息是存储在内存中,性能好,但是可能存在丢失

  4. rabbitmqctl forget_cluster_node [--offline]: 忘记节点(摘除节点)

  5. rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]...: 修改节点名称

生产者消费者模型构建

  1. 创建好一个SpringBoot或者Spring或者普通的Java项目

  2. 安装RabbitMQ相关依赖

<dependency>     <groupId>com.rabbitmq</groupId>     <artifactId>amqp-client</artifactId>     <version>3.6.5</version> </dependency> 复制代码

public class Producer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 通过Channel发送数据         /*          * basicPublish的四个参数为别为:          * exchange: 交换机,如果为空的,routingKey的规则就是routingKey需要和消息队列的名称一样,不然就发送失败          * routingKey: 路由规则          * properties: 消息的额外修饰          * body: 消息体,也就是消息的主要内容          */         for (int i = 0; i < 5; i++) {             String msg = "Hello, RabbitMQ!";             channel.basicPublish("", "test001", null, msg.getBytes());         }         // 5. 关闭连接         channel.close();         connection.close();     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明一个队列         /*          * queueDeclare方法的五个参数          * queue: 队列的名称          * durable: 是否是持久化,也就是RabbitMQ服务重启之后消息队列是否被保存,为true就是持久化,服务重启消息队列不会被删除          * exclusive: 是否独占,有点类似于独占锁          * autoDelete: 是否开启自动删除,也就是当该消息队列没有被绑定到任何一个Exchange上时是否自动删除          * arguments: 额外的参数          */         String queueName = "test001";         channel.queueDeclare(queueName, true, true, false, null);         // 5. 创建消费者         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);         // 6. 设置Channel         /*          * basicConsume的三个参数的函数          * queue: 队列的名称          * autoAck: 是否自动签收,为true表示当Consumer收到消息之后自动发送ACK确定给Broker          * callback: 指定消费者          */         channel.basicConsume(queueName, true, queueingConsumer);         // 7. 获取消息         while (true) {             Delivery delivery = queueingConsumer.nextDelivery();             String msg = new String(delivery.getBody());             System.out.println("消费端:" + msg);         }     } } 复制代码

交换机Exchange详解

交换机属性

  1. Name:交换机名称

  2. Type:交换机类型,大致有direct、topic、fanout、headers四种

  3. Durability:是否需要持久化,true为持久化

  4. AutoDelete:当最后一个绑定到Exchange上的队列被删除后,是否自动删除该Exchange

  5. Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false

  6. Arguments:扩展参数,用于扩展AMQP协议定制化使用

交换机类型 - Direct Exchange

所有发送到Direct Exchange上的消息都会被转发到RoutingKey中指定的Queue中,在Direct模式下可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作(默认的RoutingKey就是队列的名称),消息传递时,RoutingKey必须完全匹配(名称完全一样,不支持模糊匹配)才会被队列接收,否则该消息会被抛弃。

public class Producer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange的名称和RoutingKey         String exchangeName = "test_direct_exchange";         String routingKey = "test.direct";         // 5. 发送消息         String msg = "Hello RabbitMQ - Direct Exchange Message...";         channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());         // 6. 关闭连接         channel.close();         connection.close();     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_direct_exchange";         String exchangeType = "direct";         String queueName = "test_direct_queue";         String routingKey = "test.direct";         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);         channel.queueDeclare(queueName, true, true, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 5. 创建消费者         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);         // 6. 设置Channel         channel.basicConsume(queueName, true, queueingConsumer);         // 7. 获取消息         while (true) {             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();             String msg = new String(delivery.getBody());             System.out.println("消费端:" + msg);         }     } } 复制代码

交换机类型 - Topic Exchange

所有发送到Topic Exchange上的消息被转发到所有关系RoutingKey中指定Topic的Queue中,Exchange将RoutingKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic。 上面这句话有点拗口,其实简单来说,就是当Exchange的类型为topic时,RoutingKey是一组规则(不再仅仅表示一个规则,Direct Exchange中的RoutingKey就是一个规则,Producer传递的RoutingKey必须和Exchange中的RoutingKey名称完全一致才能发送成功),通过这组规则可以将多个RoutingKey和一个Queue进行关联,只要满足RoutingKey的规则就会被路由到相关的队列中(比如RoutingKey为log.#,只要符合这个规则的消息都会被路由到相关队列中)。 在制定RoutingKey时可以使用通配符进行模糊匹配,符号#表示匹配一个或多个词,*表示匹配一个词(注意这里是词,而不是字符),比如log.#可以匹配到log.info.oalog.*只能匹配到log.info,是匹配不到log.info.oa

public class Producer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange的名称和RoutingKey         String exchangeName = "test_topic_exchange";         String routingKey1 = "log.info.oa";         String routingKey2 = "log.error";         String routingKey3 = "log.debug";         // 5. 发送消息         String msg = "Hello RabbitMQ - Topic Exchange Message...";         channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());         channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());         channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());         // 6. 关闭连接         channel.close();         connection.close();     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_topic_exchange";         String exchangeType = "topic";         String queueName = "test_topic_queue";         // String routingKey = "log.*";         String routingKey = "log.#";         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);         channel.queueDeclare(queueName, true, true, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 5. 创建消费者         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);         // 6. 设置Channel         channel.basicConsume(queueName, true, queueingConsumer);         // 7. 获取消息         while (true) {             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();             String msg = new String(delivery.getBody());             System.out.println("消费端:" + msg);         }     } } 复制代码

交换机类型 - Fanout Exchange

该种交换机类型是不会处理RoutingKey的,只会简单地将队列绑定到交换机上,发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,Fanout Exchange是转发消息最快的,因为不会处理路由相关的操作,即使指定了RoutingKey也不会理会

public class Producer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange的名称和RoutingKey         String exchangeName = "test_fanout_exchange";         // 指定了RoutingKey也没有作用         String routingKey = "log.debug";         // 5. 发送消息         String msg = "Hello RabbitMQ - Fanout Exchange Message...";         channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());         // 6. 关闭连接         channel.close();         connection.close();     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_fanout_exchange";         String exchangeType = "fanout";         String queueName = "test_fanout_queue";         String routingKey = "test";         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);         channel.queueDeclare(queueName, true, true, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 5. 创建消费者         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);         // 6. 设置Channel         channel.basicConsume(queueName, true, queueingConsumer);         // 7. 获取消息         while (true) {             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();             String msg = new String(delivery.getBody());             System.out.println("消费端:" + msg);         }     } } 复制代码

绑定、队列、消息、虚拟主机详解

绑定Binding

是指Exchange和Exchange、Exchange和Queue之间的连接关系

队列

是指消息队列,实际存储消息数据的。包含一些属性,比如Durability表示是否持久化,Durable就是持久化,Transient表示不持久化;Autodelete表示当最后一个监听被移除后,该Queue是否被自动删除。

Message

是指服务器和应用程序之间传送的数据,本质上就是一段数据,由Properties和Payload(Body)组成,也包含一些属性,比如delivery modeheaders(自定义属性)、content_typecontent_encodingprioritycorrelation_idreply_toexpirationmessage_idtimestamptypeuser_idapp_idcluster_id

如何发送携带Properties的Message呢?

public class Producer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 通过Channel发送数据         Map<String, Object> headers = new HashMap<>();         headers.put("name", "snow");         headers.put("sex", "man");                  // 设置Properties         AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()                 .deliveryMode(2)                 .expiration("15000")                 .contentEncoding("UTF-8")                 .headers(headers)                 .build();         for (int i = 0; i < 5; i++) {             String msg = "Hello, RabbitMQ!";             channel.basicPublish("", "test001", properties, msg.getBytes());         }         // 5. 关闭连接         channel.close();         connection.close();     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明一个队列         String queueName = "test001";         channel.queueDeclare(queueName, true, false, false, null);         // 5. 创建消费者         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);         // 6. 设置Channel         channel.basicConsume(queueName, true, queueingConsumer);         // 7. 获取消息         while (true) {             Delivery delivery = queueingConsumer.nextDelivery();             String msg = new String(delivery.getBody());             Map<String, Object> headers = delivery.getProperties().getHeaders();             System.out.println("消费端:" + msg);             System.out.println(headers.get("name"));         }     } } 复制代码

RabbitMQ高级特性

消息如何保证100%的投递成功方案-1

什么是生产端的可靠性投递?

  1. 保障消息的成功发出

  2. 保障MQ节点的成功接收

  3. 发送端收到MQ节点(Broker)的确认应答

  4. 完善的消息补偿机制(也就是消息投递失败或者未收到Broker的确认应答的补偿措施)

消息可靠性投递的解决方案

  1. 消息落库,对消息状态进行打标

  2. 消息的延迟投递,做二次确认,回调检查

消息可靠性投递方案一.jpg

  1. Producer端首先将业务信息入库,同时创建一条消息入库,设置消息的status为0(表示消息已经投递)

  2. Producer端生成一条消息Message投递到Broker

  3. Broker收到消息之后,发送确认Confirm返回给Producer

  4. Producer收到Broker发送过来的Confirm之后,就将消息数据库中消息的状态为1(表示消息已经投递成功)

  5. 因为步骤2和步骤3都有可能发生故障,也就是消息投递失败,或者网络等原因造成Producer未收到Broker发送过来的Confirm消息,所以需要开启一个分布式定时任务从消息数据库中抓取status为0的消息

  6. 将抓取出来的status为0的消息重新投递给Broker,重复上述动作

  7. 因为在极端状况下有些消息可能就是会投递失败,不能无休止地重新投递,可以设置一个投递上限,比如最大重新投递次数为3,如果3次投递均失败,就将消息数据库中的消息状态设置为3,之后再建立补偿措施来对status为3的消息进行处理

缺点:由于在最开始进行了两次入库的操作,所以在高并发的情况下其实会有性能上的问题。

消息如何保证100%的投递成功方案-2

消息可靠性投递方案二.jpg

  1. Producer端首先对业务消息进行入库,然后同时生成两条相同的消息,一条消息立即发出,另一条消息延迟一段时间再次发出

  2. Consumer端对消息队列进行监听,从中取出消息进行消费,在消费完一条消息之后,需要向Broker发送一个消费确认Confirm,表示该条消息已被消费

  3. Callback Service对Consumer端发送的消费确认消息进行监听,如果收到了Consumer端发送过来的消费确认,就将消息数据库中的消息进行入库

  4. 同时Callback还会对Producer端发送的另一条延迟消息进行监听,如果收到了Producer发送过来的延迟消息,就从消息数据库中查询该条消息是否已被消费,如果查询不到或者消息消费失败,Callback Service就通知Producer进行消息重发

优点:由于最开始只是进行了一次入库的操作,性能得到了较大的提升,而Callback Service是一个补偿措施,对业务的性能并不会产生实际的影响

具体的实现请参考:RabbitMQ之消息可靠性投递实现

幂等性概念及业界主流解决方案

什么是幂等性? 通俗来说,就是假如我们要对一件事进行操作,这个操作可能重复进行100次或者1000次,那么无论操作多少次,这些操作的结果都是一样的,就像数据库中的乐观锁机制,比如我们多个线程同时更新库存的SQL语句,不采用乐观锁的机制的话可能会存在线程安全问题导致数据不一致,update sku set count = count - 1, version = version + 1 where version = 1,加上一个乐观锁来保证线程安全,当然乐观锁的背后采用的原理是CAS(CompareAndSwap,也就是先比较然后再替换,保证操作的原子性)。

在海量订单产生的业务高峰期,如何避免消息的重复消费问题? 在业务高峰期,可能会存在网络原因或者其他原因导致Producer端的消息重发,消费端要实现幂等性,就意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息,解决方案大致有两种:

  1. 唯一ID + 指纹码 机制,利用数据库主键去重

  2. 利用Redis的原子性去实现

唯一ID + 指纹码 机制

  1. 唯一ID + 指纹码 机制,利用数据库进行主键去重

  2. select count(1) from order where id = 唯一ID + 指纹码,在消费的时候先进行查询,如果查询结果为1的话就表示已经被消费过了就不再重复进行消费了,没有查询出结果的话就说明没有被消费,就进行数据库的入库

  3. 好处:实现简单

  4. 坏处:高并发下有数据库写入的性能瓶颈

  5. 解决方案:根据ID进行分库分表,进行算法路由,比如对ID进行路由算法路由到不同的数据库中,分摊整个数据流量的压力

利用Redis原子特性实现

  1. 使用Redis实现消费端的幂等,有几个需要考虑的问题

  2. 第一:是否要进行数据库入库的操作,如果要入库的话,如何使得数据库和缓存的入库做到原子性,也就是如何实现数据库和缓存的数据一致性,因为有可能出现这样的情况,redis中保存了该order的数据,但是在保存到数据库的时候出现了问题,导致数据库中没有保存成功,然后如何保证数据准确地被同时保存在数据库中呢?

  3. 第二:如果不进行数据库入库的话,那么都存储到缓存redis中,又如何设置定时同步的策略呢,因为数据不可能一直保存在redis中,而且就算一直保存在redis中,redis服务也有可能会出现问题,这也是需要重点考虑的问题

Confirm确认消息详解

什么是Confirm消息确认机制? 消息的确认,是指Producer投递消息后,如果Broker收到消息,则会给我们Producer一个应答,Producer进行接收应答,用来确定这条消息是否正常地发送到了Broker,这种方式也是消息的可靠性投递的核心保障。

如何实现Confirm确认消息?

  1. 在channel上开启确认模式:channel.confirmSelect()

  2. 在channel上添加监听:addConfirmListener,监听成功或者失败的返回结果,根据具体的结果对消息进行重新发送或者日志记录等后续处理

public class Producer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 指定消息投递模式:消息的确认模式         channel.confirmSelect();         String exchangeName = "test_confirm_exchange";         String routingKey = "confirm.save";         // 5. 发送消息         String msg = "Hello RabbitMQ! Send a confirm message.";         channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());         // 6. 添加一个确认监听         channel.addConfirmListener(new ConfirmListener() {             @Override             public void handleAck(long deliveryTag, boolean multiple) throws IOException {                 System.out.println("------ACK!------");             }             @Override             public void handleNack(long deliveryTag, boolean multiple) throws IOException {                 System.out.println("------NO ACK!------");             }         });     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_confirm_exchange";         String routingKey = "confirm.*";         String queueName = "test_confirm_queue";         channel.exchangeDeclare(exchangeName, "topic", true);         channel.queueDeclare(queueName, true, false, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 5. 创建消费者消费消息         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);         channel.basicConsume(queueName, true, queueingConsumer);         while (true) {             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();             String msg = new String(delivery.getBody());             System.out.println("消费端: " + msg);         }     } } 复制代码

Return返回消息详解

什么是Return返回消息机制? ReturnListener用于处理一些不可路由的消息,Producer生产一条消息之后,通过指定一个Exchange和RoutingKey,将消息送达到某一个队列中去,然后Consumer监听队列,进行消息的消费处理操作,但是在某些情况下,Producer在投递消息的时候,指定的Exchange不存在或者RoutingKey路由不到,就说明消息投递失败,这个时候如果需要监听这种不可达的消息,就需要使用ReturnListener。 在使用ReturnListener的基础API时有一个关键的配置项是Mandatory,该参数为true,则ReturnListener会接收到路由不可达的消息,然后进行后续的处理,如果为false,那么Broker端会自动删除该消息,ReturnListener是监听不到的。

public class Producer {     public static void main(String[] args) throws Exception {         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         Connection connection = connectionFactory.newConnection();         Channel channel = connection.createChannel();         String exchangeName = "test_return_exchange";         String routingKey = "return.save";         String routingKeyError = "snow.save";         String msg = "Hello RabbitMQ! Send a Return message.";         boolean mandatory = true;         channel.basicPublish(exchangeName, routingKeyError, mandatory, null, msg.getBytes());         channel.addReturnListener(new ReturnListener() {             @Override             public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {                 System.out.println("----handle return----");                 System.out.println("replyText: " + replyText);                 System.out.println("exchange: " + exchange);                 System.out.println("routingKey: " + routingKey);                 System.out.println("properties: " + properties);                 System.out.println("body: " + new String(body));             }         });     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_return_exchange";         String routingKey = "return.*";         String queueName = "test_return_queue";         channel.exchangeDeclare(exchangeName, "topic", true);         channel.queueDeclare(queueName, true, false, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 5. 创建消费者消费消息         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);         channel.basicConsume(queueName, true, queueingConsumer);         while (true) {             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();             String msg = new String(delivery.getBody());             System.out.println("消费端: " + msg);         }     } } 复制代码

自定义消费者使用

如何自定义消费者进行消息消费? 在之前,我们都是采用默认的QueueingConsumer来创建一个消费者,之后再使用while循环来不停地取出消息,但是这种方式不是特别好,一般我们会自定义自己的Consumer,那么要实现自定义的Consumer有两种方式,一种是实现Consumer的接口,但是这种实现方式需要重写很多方法,另一种是继承DefaultConsumer,重写其中的

public class Producer {     public static void main(String[] args) throws Exception {         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         Connection connection = connectionFactory.newConnection();         Channel channel = connection.createChannel();         String exchangeName = "test_consumer_exchange";         String routingKey = "consumer.save";         String msg = "Hello RabbitMQ! Send a Consumer message.";         for (int i = 0; i < 5; i++) {             channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());         }     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception { // 1. 创建一个ConnectionFactory,并且进行相关连接配置         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_consumer_exchange";         String routingKey = "consumer.*";         String queueName = "test_consumer_queue";         channel.exchangeDeclare(exchangeName, "topic", true);         channel.queueDeclare(queueName, true, false, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 5. 创建消费者消费消息         channel.basicConsume(queueName, true, new MyConsumer(channel));     } } 复制代码

public class MyConsumer extends DefaultConsumer {     public MyConsumer(Channel channel) {         super(channel);     }     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {         System.out.println("----my consumer handle delivery----");         System.out.println("consumerTag: " + consumerTag);         System.out.println("envelope: " + envelope);         System.out.println("properties: " + properties);         System.out.println("body: " + new String(body));     } } 复制代码

消费端的限流策略

什么是消费端的限流? 假设一个场景,就是我们的RabbitMQ服务器有上万条未处理的消息,此时如果我们随便打开一个消费者客户端,会出现下面的情况,就是巨量的消息瞬间全部推送过来,但是我们的单个客户端无法同时处理这么多数据,就有可能造成服务器崩溃。 RabbitMQ提供了一种qos(Quality of Service 服务质量保证)功能,即在非自动确认消息(autoAck为false)的前提下,如果一定数目的消息(通过基于Consumer或者channel设置的Qos的值)未被确认前,不进行消费新的消息。 void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)中的prefetchSize表示单个消息的大小,为0表示不限制单个消息的大小,prefetchCount会告诉RabbitMQ不要同时给一个消费者推送超过N个消息,即一旦有N个消息还没有Ack,则该Consumer就将block阻塞住,直到有消息被Ack,global表示是否将前两个参数的设置应用于channel,简单点说就是前两个限制是channel级别还是Consumer级别的,一般设置为false,表示Consumer级别(prefetchCount只在autoAck为false的情况下才会生效,在自动Ack的情况下是无效的)

public class Producer {     public static void main(String[] args) throws Exception {         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         Connection connection = connectionFactory.newConnection();         Channel channel = connection.createChannel();         String exchangeName = "test_qos_exchange";         String routingKey = "qos.save";         String msg = "Hello RabbitMQ! Send a QOS message.";         for (int i = 0; i < 5; i++) {             channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());         }     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {                  ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_qos_exchange";         String routingKey = "qos.*";         String queueName = "test_qos_queue";         channel.exchangeDeclare(exchangeName, "topic", true);         channel.queueDeclare(queueName, true, false, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 5. 限流,记得将basicConsume方法中的autoAck的值设置为false         channel.basicQos(0, 1, false);         channel.basicConsume(queueName, false, new MyConsumer(channel));     } } 复制代码

public class MyConsumer extends DefaultConsumer {     private Channel channel;     public MyConsumer(Channel channel) {         super(channel);         this.channel = channel;     }     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {         System.out.println("----my consumer handle delivery----");         System.out.println("consumerTag: " + consumerTag);         System.out.println("envelope: " + envelope);         System.out.println("properties: " + properties);         System.out.println("body: " + new String(body));         // 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true         this.channel.basicAck(envelope.getDeliveryTag(), false);     } } 复制代码

消费端ACK与重回队列机制

消费端的手工ACK和NACK为什么会存在?

  1. 消费端在进行消息消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿,如果采用自动ACK的话就达不到需求

  2. 如果由于服务器宕机等严重问题,我们也需要手工进行ACK来保障消费端消费成功,因为消费者宕机后,Broker收不到ACK或者NACK,就会重新发送消息给消费端再次消费,因为在自动ACK的机制下Broker发送消息给消费者时,自动确认消息被处理完毕

消费端的重回队列机制

  1. 消费端重回队列是为了将没有处理成功的消息重新投递给Broker

  2. 一般在实际应用中,都会关闭重回队列,也就是将requeue设置为false

public class Producer {     public static void main(String[] args) throws Exception {         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         Connection connection = connectionFactory.newConnection();         Channel channel = connection.createChannel();         String exchangeName = "test_ack_exchange";         String routingKey = "ack.save";         for (int i = 0; i < 5; i++) {             Map<String, Object> headers = new HashMap<>();             headers.put("num", i);             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()                     .deliveryMode(2)                     .contentEncoding("UTF-8")                     .headers(headers)                     .build();             String msg = "Hello RabbitMQ! Send a ACK message." + i;             channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());         }     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_ack_exchange";         String routingKey = "ack.*";         String queueName = "test_ack_queue";         channel.exchangeDeclare(exchangeName, "topic", true);         channel.queueDeclare(queueName, true, false, false, null);         channel.queueBind(queueName, exchangeName, routingKey);         // 将autoAck设置为false,手工Ack确认         channel.basicConsume(queueName, false, new MyConsumer(channel));     } } 复制代码

public class MyConsumer extends DefaultConsumer {     private final Channel channel;     public MyConsumer(Channel channel) {         super(channel);         this.channel = channel;     }     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {         System.out.println("----my consumer handle delivery----");         System.out.println("body: " + new String(body));         if ((Integer) properties.getHeaders().get("num") == 0) {             // 第三个参数requeue表示是否重回队列             this.channel.basicNack(envelope.getDeliveryTag(), false, false);         } else {             // 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true             this.channel.basicAck(envelope.getDeliveryTag(), false);         }     } } 复制代码

TTL消息详解

  1. TTL 是Time To Live的缩写,也就是生存时间

  2. RabbitMQ支持消息的过期时间,在消息发送的时候可以再Properties中指定expiration过期时间

  3. RabbitMQ支持队列的过期时间,从消息入队列开始计算,如果超过了队列设置的超时时间配置还没有被消费,该消息就会被自动清除

死信队列详解

死信队列 DLX Dead-Letter-Exchange

  1. 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

  2. DLX也是一个正常的Exchange,和一般的Exchange没有什么区别,它可以在任何队列上被指定(也就是需要设置队列的属性),这样的话只要这个队列中有死信就会被重新发布到DLX中

  3. 当设置了DLX的队列中有死信时,RabbitMQ就会自动将这个死信重新发布到设置的Exchange中去,从而被路由到另一个队列

  4. 可以监听这个队列中的消息做相应的处理,这个特性可以弥补RabbitMQ3.0版本以前支持的immediate参数的功能

消息变成死信的情况

  1. 消息被拒绝或消费失败(basicReject/basicNack)并且requeue为false(不重回队列)

  2. 消息TTL过期

  3. 队列达到最大长度

死信队列的设置 首先要设置死信队列的Exchange和Queue,然后进行绑定

  1. Exchange: dlx.exchange(名字可以任意取)

  2. Queue: dlx.queue(名字可以任意取)

  3. RoutingKey: # (为#表示任何消息都可以被路由到dlx.queue中)

然后再进行正常的交换机、队列声明和绑定,只不过需要再被设置死信队列的队列中加上一个参数:arguments.put("x-dead-letter-exchange", "dlx.exchange"),这样消息在过期、不重回队列、队列达到最大长度时被直接路由到死信队列中

public class Producer {     public static void main(String[] args) throws Exception {         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         Connection connection = connectionFactory.newConnection();         Channel channel = connection.createChannel();         String exchangeName = "test_dlx_exchange";         String routingKey = "dlx.save";         for (int i = 0; i < 1; i++) {             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()                     .deliveryMode(2)                     .expiration("10000")                     .contentEncoding("UTF-8")                     .build();             String msg = "Hello RabbitMQ! Send a ACK message.";             channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());         }     } } 复制代码

public class Consumer {     public static void main(String[] args) throws Exception {         ConnectionFactory connectionFactory = new ConnectionFactory();         connectionFactory.setHost("localhost");         connectionFactory.setPort(5672);         connectionFactory.setVirtualHost("/");         // 2. 通过连接工厂创建一个连接         Connection connection = connectionFactory.newConnection();         // 3. 通过Connection创建一个Channel         Channel channel = connection.createChannel();         // 4. 声明Exchange、Queue、RoutingKey         String exchangeName = "test_dlx_exchange";         String routingKey = "dlx.*";         String queueName = "test_dlx_queue";         Map<String, Object> arguments = new HashMap<>();         arguments.put("x-dead-letter-exchange", "dlx.exchange");         channel.exchangeDeclare(exchangeName, "topic", true);         channel.queueDeclare(queueName, true, false, false, arguments);         channel.queueBind(queueName, exchangeName, routingKey);         // 死信队列的声明         channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);         channel.queueDeclare("dlx.queue", true, false, false, null);         channel.queueBind("dlx.queue", "dlx.exchange", "#");         // 将autoAck设置为false,手工Ack确认         channel.basicConsume(queueName, false, new MyConsumer(channel));     } } 复制代码

public class MyConsumer extends DefaultConsumer {     private final Channel channel;     public MyConsumer(Channel channel) {         super(channel);         this.channel = channel;     }     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {         System.out.println("----my consumer handle delivery----");         System.out.println("body: " + new String(body));         // 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true         this.channel.basicAck(envelope.getDeliveryTag(), false);     } }


作者:Yan拾月
链接:https://juejin.cn/post/7030704373881798663

文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐