RocketMQ之事务消息
一 理论
RocketMQ 的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。 一个常见的场景:订单付款后给用户发优惠券,订单系统需要作为生产者发消息,优惠券系统作为消费者收消息。 在这个场景下,发送“用户已付款” 消息,要和“用户付款”这个操作同时成功或者同时失败。
RocketMQ 采用两阶段提交的方式实现事务消息。 TransactionMQProducer处理上面情况的流程是,先发一个“用户已付款”的半消息,发送成功后做用户付款的操作,根据操作结果是否成功,确定之前的“用户已付款”的消息是做commit 还是rollback。具体流程如下图所示:
严格的事务实现,需要实现ACID,那么rocketmq都实现了吗?还以【订单付款后给用户发优惠券】场景说明:
A(原子性):付款和发券同时发生或不发生,实现了原子性。
C(一致性):实现了最终一致性,在上图步骤4或者7未完成前是不一致的。
I(隔离性):存在读未提交的可能,因此未实现隔离性。
D(持久性):消息最后都会落盘,实现了持久性。
二 实战
(1)Create the transactional producer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } } 复制代码
(2)Implement the TransactionListener interface
import ... public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
作者:junex
链接:https://juejin.cn/post/7020401999396995103