阅读 151

RabbitMQ官方教程及理解-第二章(Worker Queues)

简介 Work Queues

什么是Work Queues,和前面的Queue有什么不同?细心的同学会发现.这里的Queues用了复数形式,前面还加了Work.那就说明这个Queue是支持多个work的.官方图,最为致命.

image.png

上代码 (don't talk to me, show me the code!)

新建项目 WorkQueues

image.png

依赖

<dependencies>     <dependency>         <groupId>com.rabbitmq</groupId>         <artifactId>amqp-client</artifactId>         <version>5.6.0</version>     </dependency>     <dependency>         <groupId>org.slf4j</groupId>         <artifactId>slf4j-api</artifactId>         <version>1.7.25</version>     </dependency>     <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->     <dependency>         <groupId>org.slf4j</groupId>         <artifactId>slf4j-simple</artifactId>         <version>1.7.25</version>     </dependency> </dependencies> 复制代码

发送者 NewTask

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /**  * @author caiqian  */ public class NewTask {     private static final String TASK_QUEUE_NAME = "task_queue";     public static void main(String[] argv) throws Exception {         argv = new String[]{"second msg"};         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("127.0.0.1");         factory.setUsername("admin");         factory.setPassword("123456");         try (Connection connection = factory.newConnection();              Channel channel = connection.createChannel()) {             channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);             String message = String.join(" ", argv);             channel.basicPublish("", TASK_QUEUE_NAME,                     MessageProperties.PERSISTENT_TEXT_PLAIN,                     message.getBytes("UTF-8"));             System.out.println(" [x] Sent '" + message + "'");         }     } } 复制代码

接收者Worker

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /**  * @author caiqian  */ public class Worker {     private static final String TASK_QUEUE_NAME = "task_queue";     public static void main(String[] argv) throws Exception {         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("127.0.0.1");         factory.setUsername("admin");         factory.setPassword("123456");         final Connection connection = factory.newConnection();         final Channel channel = connection.createChannel();         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");         channel.basicQos(1);         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             System.out.println(" [x] Received '" + message + "'");             try {                 doWork(message);             } finally {                 System.out.println(" [x] Done");                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);             }         };         channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {         });     }     private static void doWork(String task) {         for (char ch : task.toCharArray()) {             if (ch == '.') {                 try {                     Thread.sleep(1000);                 } catch (InterruptedException _ignored) {                     Thread.currentThread().interrupt();                 }             }         }     } } 复制代码

运行结果

NewTask, 这里先运行两个实例的worker , 再运行NewTask, 从1st,2nd,3rd,4th,5th一直发送

image.png

Worker 2 实例

实例1

image.png

实例2

image.png

结论

  • 1 worker queues支持多实例

  • 2 两个worker是以后的顺序轮流获取信息

消息确认(Message acknowledgment) 我更喜欢称之为消息回执

  • 当消息发给worker后, mq不知道worker处理的时间是否过长,不知道worker是否死掉, 不知道是否有消息遗失.于是,就有了Message acknowledgment.

  • 如果一个消费者在没有发送ack的情况下死亡(通道关闭,连接关闭,或者TCP连接丢失),RabbitMQ会明白消息没有被完全处理,并将其重新排队。如果有其他消费者在同一时间在线,然后它将迅速重新交付给另一个消费者。这样你就可以确保没有信息丢失,即使worker偶尔死亡。

  • 默认是30分钟时间

上代码

channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> {   String message = new String(delivery.getBody(), "UTF-8");   System.out.println(" [x] Received '" + message + "'");   try {     doWork(message);   } finally {     System.out.println(" [x] Done");     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);   } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); 复制代码

ack需要注意的地方

如果忘记/丢失回执,则mq将会一直保留此消息,长时间下来, 会影响MQ甚至崩溃.

消息持久化

我们已经学习了如何确保即使用户死亡,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然会丢失。 于是便有了持久化

首先,我们要确定队列中的消息是否存在,其次,我们要标记持久化的状态

boolean durable = true; channel.queueDeclare("hello", durable, false, false, null); 复制代码

但是,已经存的队列是无法修改的,所以我们可以重新定义一个队列

将消息标记为persistent并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息后仍然有一个很短的时间窗口尚未保存。此外,RabbitMQ不会对每条消息执行fsync(2)操作——它可能只是被保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果你需要更有力的保证,你可以使用发行商确认。

公平调度

从之前的情形来看,调度仍然没有完全按照我们希望的那样工作。例如,在有两个worker的情况下,当有的消息很重,有的消息又很轻时,一个工作人员将一直很忙,而另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。所以这里就需要代码中控制

int prefetchCount = 1; channel.basicQos(prefetchCount);


作者:三重罗生门
链接:https://juejin.cn/post/7039130209328562212


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐