阅读 123

RocketMQ之事务消息

一 理论

RocketMQ 的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。 一个常见的场景:订单付款后给用户发优惠券,订单系统需要作为生产者发消息,优惠券系统作为消费者收消息。 在这个场景下,发送“用户已付款” 消息,要和“用户付款”这个操作同时成功或者同时失败。

RocketMQ 采用两阶段提交的方式实现事务消息。 TransactionMQProducer处理上面情况的流程是,先发一个“用户已付款”的半消息,发送成功后做用户付款的操作,根据操作结果是否成功,确定之前的“用户已付款”的消息是做commit 还是rollback。具体流程如下图所示:

image2021-10-6_22-42-40.png

严格的事务实现,需要实现ACID,那么rocketmq都实现了吗?还以【订单付款后给用户发优惠券】场景说明:

A(原子性):付款和发券同时发生或不发生,实现了原子性。

C(一致性):实现了最终一致性,在上图步骤4或者7未完成前是不一致的。

I(隔离性):存在读未提交的可能,因此未实现隔离性。

D(持久性):消息最后都会落盘,实现了持久性。

二 实战

(1)Create the transactional producer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class TransactionProducer {     public static void main(String[] args) throws MQClientException, InterruptedException {         TransactionListener transactionListener = new TransactionListenerImpl();         TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 Thread thread = new Thread(r);                 thread.setName("client-transaction-msg-check-thread");                 return thread;             }         });         producer.setExecutorService(executorService);         producer.setTransactionListener(transactionListener);         producer.start();         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};         for (int i = 0; i < 10; i++) {             try {                 Message msg =                     new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                 SendResult sendResult = producer.sendMessageInTransaction(msg, null);                 System.out.printf("%s%n", sendResult);                 Thread.sleep(10);             } catch (MQClientException | UnsupportedEncodingException e) {                 e.printStackTrace();             }         }         for (int i = 0; i < 100000; i++) {             Thread.sleep(1000);         }         producer.shutdown();     } } 复制代码

(2)Implement the TransactionListener interface

import ... public class TransactionListenerImpl implements TransactionListener {     private AtomicInteger transactionIndex = new AtomicInteger(0);     private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();     @Override     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {         int value = transactionIndex.getAndIncrement();         int status = value % 3;         localTrans.put(msg.getTransactionId(), status);         return LocalTransactionState.UNKNOW;     }     @Override     public LocalTransactionState checkLocalTransaction(MessageExt msg) {         Integer status = localTrans.get(msg.getTransactionId());         if (null != status) {             switch (status) {                 case 0:                     return LocalTransactionState.UNKNOW;                 case 1:                     return LocalTransactionState.COMMIT_MESSAGE;                 case 2:                     return LocalTransactionState.ROLLBACK_MESSAGE;             }         }         return LocalTransactionState.COMMIT_MESSAGE;     } }


作者:junex
链接:https://juejin.cn/post/7020401999396995103


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐