RocketMQ顺序消息
18RocketMQ顺序消息
顺序消息的发送
@GetMapping(value = "/orderly") public String orderly() { List<String> typeList = Arrays.asList("创建", "支付", "退款"); for (String type : typeList) { Order order = new Order("123", type); MessageBuilder builder = MessageBuilder.withPayload(order); Message message = builder.build(); SendResult sendResult = rocketMQTemplate.syncSendOrderly("TopicTest", message, order.getOrderId()); System.out.println("MsgId = " + sendResult.getMsgId() + ", QueueId = " + sendResult.getMessageQueue().getQueueId()); } return "OK"; } 复制代码
发送顺序消息相比发送普通消息:
在配置文件中把默认异步发送改为同步发送
设置Header信息头,将消息固定发送给同一个消息队列
接收顺序消息相比接收普通消息:
把默认并发消费改为顺序消费
RockeMQ顺序消息分为两种:
局部有序:发送同一队列的消息有序,可以在发送消息时指定队列,在消费消息时按顺序消费。例如同一订单ID的消费要保证有序,不同订单ID的消费互不影响,并行处理
全局有序:设置Topic只有一个队列实现全局有序,创建Topic时手动设置,这种性能差不推荐使用
RocketMQ消息发送三种方式:同步、异步、单向。
同步:发送网络请求后会同步等待Broker服务器的返回结果,支持发送失败重试
异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试
单向:原理与异步一致,不支持回调
顺序消息发送原理很简单,同一类消息发送到相同队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式
RocketMQTemplate的syncSend()方法:
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSend failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } SendResult sendResult = producer.send(rocketMsg, timeout); long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("syncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } 复制代码
MessageQueueSelector的实现类SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value = value % mqs.size(); return mqs.get(value); } } 复制代码
根据hashKey计算hash值
然后用hash值和队列大小取模,得到一个索引值,结果小于队列值
根据索引值从队列列表中取出一个队列,hash值相同则队列相同
普通消息的发送
普通消息有两种机制:轮询和故障规避机制
轮询原理就是路由信息TopicPublishInfo维护一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计数器就进行+1,通过计数器的值inde与队列的数量取模计算出实现轮询算法。
package org.apache.rocketmq.client.impl.producer; import java.util.ArrayList; import java.util.List; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; public boolean isOrderTopic() { return orderTopic; } public void setOrderTopic(boolean orderTopic) { this.orderTopic = orderTopic; } public boolean ok() { return null != this.messageQueueList && !this.messageQueueList.isEmpty(); } public List<MessageQueue> getMessageQueueList() { return messageQueueList; } public void setMessageQueueList(List<MessageQueue> messageQueueList) { this.messageQueueList = messageQueueList; } public ThreadLocalIndex getSendWhichQueue() { return sendWhichQueue; } public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) { this.sendWhichQueue = sendWhichQueue; } public boolean isHaveTopicRouterInfo() { return haveTopicRouterInfo; } public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) { this.haveTopicRouterInfo = haveTopicRouterInfo; } public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); } public int getQueueIdByBroker(final String brokerName) { for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); if (queueData.getBrokerName().equals(brokerName)) { return queueData.getWriteQueueNums(); } } return -1; } @Override public String toString() { return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]"; } public TopicRouteData getTopicRouteData() { return topicRouteData; } public void setTopicRouteData(final TopicRouteData topicRouteData) { this.topicRouteData = topicRouteData; } } 复制代码
轮询算法可能轮询选择的队列在宕机的Broker上,导致消息发送失败,于是就有了鼓掌规避机制
作者:周杰倫本人
链接:https://juejin.cn/post/7018479164122480648