RabbitMQ官方教程及理解-第二章(Worker Queues)
简介 Work Queues
什么是Work Queues,和前面的Queue有什么不同?细心的同学会发现.这里的Queues用了复数形式,前面还加了Work.那就说明这个Queue是支持多个work的.官方图,最为致命.
上代码 (don't talk to me, show me the code!)
新建项目 WorkQueues
依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies> 复制代码
发送者 NewTask
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * @author caiqian */ public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { argv = new String[]{"second msg"}; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("123456"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } 复制代码
接收者Worker
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * @author caiqian */ public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("123456"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } } 复制代码
运行结果
NewTask, 这里先运行两个实例的worker , 再运行NewTask, 从1st,2nd,3rd,4th,5th一直发送
Worker 2 实例
实例1
实例2
结论
1 worker queues支持多实例
2 两个worker是以后的顺序轮流获取信息
消息确认(Message acknowledgment) 我更喜欢称之为消息回执
当消息发给worker后, mq不知道worker处理的时间是否过长,不知道worker是否死掉, 不知道是否有消息遗失.于是,就有了
Message acknowledgment
.如果一个消费者在没有发送ack的情况下死亡(通道关闭,连接关闭,或者TCP连接丢失),RabbitMQ会明白消息没有被完全处理,并将其重新排队。如果有其他消费者在同一时间在线,然后它将迅速重新交付给另一个消费者。这样你就可以确保没有信息丢失,即使worker偶尔死亡。
默认是30分钟时间
上代码
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); 复制代码
ack需要注意的地方
如果忘记/丢失回执,则mq将会一直保留此消息,长时间下来, 会影响MQ甚至崩溃.
消息持久化
我们已经学习了如何确保即使用户死亡,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然会丢失。 于是便有了持久化
首先,我们要确定队列中的消息是否存在,其次,我们要标记持久化的状态
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null); 复制代码
但是,已经存的队列是无法修改的,所以我们可以重新定义一个队列
将消息标记为persistent并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息后仍然有一个很短的时间窗口尚未保存。此外,RabbitMQ不会对每条消息执行fsync(2)操作——它可能只是被保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果你需要更有力的保证,你可以使用发行商确认。
公平调度
从之前的情形来看,调度仍然没有完全按照我们希望的那样工作。例如,在有两个worker的情况下,当有的消息很重,有的消息又很轻时,一个工作人员将一直很忙,而另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。所以这里就需要代码中控制
int prefetchCount = 1; channel.basicQos(prefetchCount);
作者:三重罗生门
链接:https://juejin.cn/post/7039130209328562212