阅读 251

RocketMQ顺序消息

18RocketMQ顺序消息

顺序消息的发送

@GetMapping(value = "/orderly") public String orderly() {     List<String> typeList = Arrays.asList("创建", "支付", "退款");     for (String type : typeList) {         Order order = new Order("123", type);         MessageBuilder builder = MessageBuilder.withPayload(order);         Message message = builder.build();         SendResult sendResult = rocketMQTemplate.syncSendOrderly("TopicTest", message, order.getOrderId());         System.out.println("MsgId = " + sendResult.getMsgId() + ", QueueId = " + sendResult.getMessageQueue().getQueueId());     }     return "OK"; } 复制代码

发送顺序消息相比发送普通消息:

  • 在配置文件中把默认异步发送改为同步发送

  • 设置Header信息头,将消息固定发送给同一个消息队列

接收顺序消息相比接收普通消息:

把默认并发消费改为顺序消费

RockeMQ顺序消息分为两种:

  • 局部有序:发送同一队列的消息有序,可以在发送消息时指定队列,在消费消息时按顺序消费。例如同一订单ID的消费要保证有序,不同订单ID的消费互不影响,并行处理

  • 全局有序:设置Topic只有一个队列实现全局有序,创建Topic时手动设置,这种性能差不推荐使用

RocketMQ消息发送三种方式:同步、异步、单向。

  • 同步:发送网络请求后会同步等待Broker服务器的返回结果,支持发送失败重试

  • 异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试

  • 单向:原理与异步一致,不支持回调

顺序消息发送原理很简单,同一类消息发送到相同队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式

RocketMQTemplate的syncSend()方法:

public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {     if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {         log.error("syncSend failed. destination:{}, message is null ", destination);         throw new IllegalArgumentException("`message` and `message.payload` cannot be null");     }     try {         long now = System.currentTimeMillis();         org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,             charset, destination, message);         if (delayLevel > 0) {             rocketMsg.setDelayTimeLevel(delayLevel);         }         SendResult sendResult = producer.send(rocketMsg, timeout);         long costTime = System.currentTimeMillis() - now;         log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());         return sendResult;     } catch (Exception e) {         log.error("syncSend failed. destination:{}, message:{} ", destination, message);         throw new MessagingException(e.getMessage(), e);     } } 复制代码

MessageQueueSelector的实现类SelectMessageQueueByHash

public class SelectMessageQueueByHash implements MessageQueueSelector {     @Override     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {         int value = arg.hashCode();         if (value < 0) {             value = Math.abs(value);         }         value = value % mqs.size();         return mqs.get(value);     } } 复制代码

  1. 根据hashKey计算hash值

  2. 然后用hash值和队列大小取模,得到一个索引值,结果小于队列值

  3. 根据索引值从队列列表中取出一个队列,hash值相同则队列相同

普通消息的发送

普通消息有两种机制:轮询和故障规避机制

轮询原理就是路由信息TopicPublishInfo维护一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计数器就进行+1,通过计数器的值inde与队列的数量取模计算出实现轮询算法。

package org.apache.rocketmq.client.impl.producer; import java.util.ArrayList; import java.util.List; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; public class TopicPublishInfo {     private boolean orderTopic = false;     private boolean haveTopicRouterInfo = false;     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();     private TopicRouteData topicRouteData;     public boolean isOrderTopic() {         return orderTopic;     }     public void setOrderTopic(boolean orderTopic) {         this.orderTopic = orderTopic;     }     public boolean ok() {         return null != this.messageQueueList && !this.messageQueueList.isEmpty();     }     public List<MessageQueue> getMessageQueueList() {         return messageQueueList;     }     public void setMessageQueueList(List<MessageQueue> messageQueueList) {         this.messageQueueList = messageQueueList;     }     public ThreadLocalIndex getSendWhichQueue() {         return sendWhichQueue;     }     public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {         this.sendWhichQueue = sendWhichQueue;     }     public boolean isHaveTopicRouterInfo() {         return haveTopicRouterInfo;     }     public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {         this.haveTopicRouterInfo = haveTopicRouterInfo;     }     public MessageQueue selectOneMessageQueue(final String lastBrokerName) {         if (lastBrokerName == null) {             return selectOneMessageQueue();         } else {             int index = this.sendWhichQueue.getAndIncrement();             for (int i = 0; i < this.messageQueueList.size(); i++) {                 int pos = Math.abs(index++) % this.messageQueueList.size();                 if (pos < 0)                     pos = 0;                 MessageQueue mq = this.messageQueueList.get(pos);                 if (!mq.getBrokerName().equals(lastBrokerName)) {                     return mq;                 }             }             return selectOneMessageQueue();         }     }     public MessageQueue selectOneMessageQueue() {         int index = this.sendWhichQueue.getAndIncrement();         int pos = Math.abs(index) % this.messageQueueList.size();         if (pos < 0)             pos = 0;         return this.messageQueueList.get(pos);     }     public int getQueueIdByBroker(final String brokerName) {         for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {             final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);             if (queueData.getBrokerName().equals(brokerName)) {                 return queueData.getWriteQueueNums();             }         }         return -1;     }     @Override     public String toString() {         return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList             + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";     }     public TopicRouteData getTopicRouteData() {         return topicRouteData;     }     public void setTopicRouteData(final TopicRouteData topicRouteData) {         this.topicRouteData = topicRouteData;     } } 复制代码

轮询算法可能轮询选择的队列在宕机的Broker上,导致消息发送失败,于是就有了鼓掌规避机制


作者:周杰倫本人
链接:https://juejin.cn/post/7018479164122480648


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐