TX-LCN分布式事务之TCC模式
什么是TCC模式
TCC
模式是TX-LCN
分布式事务模式的一种,T-try
-尝试执行业务、C-confirm
-确认执行业务、 C-cancel
-取消执行业务
原理
TCC
事务机制相对于传统事务机制(X/Open XA Two-Phase-Commit
),其特征在于它不依赖资源管理器(RM
)对XA
的支持, 而是通过对(由业务系统提供的)业务逻辑的调度来实现分布式事务。主要由三步操作,Try
: 尝试执行业务、 Confirm
:确认执行业务、 Cancel
: 取消执行业务。
模式特点
该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。
该模式对有无本地事务控制都可以支持使用面广。
数据一致性控制几乎完全由开发者控制,对业务开发难度要求高。
源码解读
首先我们来看几个关键的类TransactionAspect
事务切面类、DTXLogicWeaver
分布式事务调度器、DTXServiceExecutor
分布式事务执行器
TransactionAspect 作用
源码
@Aspect @Component public class TransactionAspect implements Ordered { private static final Logger log = LoggerFactory.getLogger(TransactionAspect.class); private final TxClientConfig txClientConfig; private final DTXLogicWeaver dtxLogicWeaver; public TransactionAspect(TxClientConfig txClientConfig, DTXLogicWeaver dtxLogicWeaver) { this.txClientConfig = txClientConfig; this.dtxLogicWeaver = dtxLogicWeaver; } @Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.TccTransaction)") public void tccTransactionPointcut() { } @Around("tccTransactionPointcut() && !lcnTransactionPointcut()&& !txcTransactionPointcut() && !txTransactionPointcut()") public Object runWithTccTransaction(ProceedingJoinPoint point) throws Throwable { DTXInfo dtxInfo = DTXInfo.getFromCache(point); TccTransaction tccTransaction = (TccTransaction)dtxInfo.getBusinessMethod().getAnnotation(TccTransaction.class); dtxInfo.setTransactionType("tcc"); dtxInfo.setTransactionPropagation(tccTransaction.propagation()); DTXLogicWeaver var10000 = this.dtxLogicWeaver; point.getClass(); return var10000.runTransaction(dtxInfo, point::proceed); } public int getOrder() { return this.txClientConfig.getDtxAspectOrder(); }复制代码
由该类的源码,我们能够明白,通过解析
@TccTransaction
注解进行相应的操作。代码会调用到DTXLogicWeaver
类
DTXLogicWeaver 作用
public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable { if (Objects.isNull(DTXLocalContext.cur())) { DTXLocalContext.getOrNew(); log.debug("<---- TxLcn start ---->"); DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew(); TxContext txContext; if (this.globalContext.hasTxContext()) { txContext = this.globalContext.txContext(); dtxLocalContext.setInGroup(true); log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId()); } else { txContext = this.globalContext.startTx(); } if (Objects.nonNull(dtxLocalContext.getGroupId())) { dtxLocalContext.setDestroy(false); } dtxLocalContext.setUnitId(dtxInfo.getUnitId()); dtxLocalContext.setGroupId(txContext.getGroupId()); dtxLocalContext.setTransactionType(dtxInfo.getTransactionType()); TxTransactionInfo info = new TxTransactionInfo(); info.setBusinessCallback(business); info.setGroupId(txContext.getGroupId()); info.setUnitId(dtxInfo.getUnitId()); info.setPointMethod(dtxInfo.getBusinessMethod()); info.setPropagation(dtxInfo.getTransactionPropagation()); info.setTransactionInfo(dtxInfo.getTransactionInfo()); info.setTransactionType(dtxInfo.getTransactionType()); info.setTransactionStart(txContext.isDtxStart()); boolean var15 = false; Object var6; try { var15 = true; var6 = this.transactionServiceExecutor.transactionRunning(info); var15 = false; } finally { if (var15) { if (dtxLocalContext.isDestroy()) { synchronized(txContext.getLock()) { txContext.getLock().notifyAll(); } if (!dtxLocalContext.isInGroup()) { this.globalContext.destroyTx(); } DTXLocalContext.makeNeverAppeared(); TracingContext.tracing().destroy(); } log.debug("<---- TxLcn end ---->"); } } if (dtxLocalContext.isDestroy()) { synchronized(txContext.getLock()) { txContext.getLock().notifyAll(); } if (!dtxLocalContext.isInGroup()) { this.globalContext.destroyTx(); } DTXLocalContext.makeNeverAppeared(); TracingContext.tracing().destroy(); } log.debug("<---- TxLcn end ---->"); return var6; } else { return business.call(); } }复制代码
以上代码是该类的核心逻辑,可以看出来
TX-LCN
事务的处理全部都是走的这个类的该方法,最终会调用到DTXServiceExecutor
分布式事务执行器
DTXServiceExecutor 作用
/** * 事务业务执行 * * @param info info * @return Object * @throws Throwable Throwable */ public Object transactionRunning(TxTransactionInfo info) throws Throwable { // 1. 获取事务类型 String transactionType = info.getTransactionType(); // 2. 获取事务传播状态 DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info); // 2.1 如果不参与分布式事务立即终止 if (propagationState.isIgnored()) { return info.getBusinessCallback().call(); } // 3. 获取本地分布式事务控制器 DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState); // 4. 织入事务操作 try { // 4.1 记录事务类型到事务上下文 Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes(); transactionTypeSet.add(transactionType); dtxLocalControl.preBusinessCode(info); // 4.2 业务执行前 txLogger.txTrace( info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType); // 4.3 执行业务 Object result = dtxLocalControl.doBusinessCode(info); // 4.4 业务执行成功 txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success"); dtxLocalControl.onBusinessCodeSuccess(info, result); return result; } catch (TransactionException e) { txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error"); throw e; } catch (Throwable e) { // 4.5 业务执行失败 txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION, "business code error"); dtxLocalControl.onBusinessCodeError(info, e); throw e; } finally { // 4.6 业务执行完毕 dtxLocalControl.postBusinessCode(info); } }复制代码
通过以上代码可以看出,该类是整个事务执行关键类。
以上就是TCC模式比较核心的代码,其他的分支代码就不一一赘述了。可以看到和LCN模式有一些代码是重复的,整体就是通过解析不同的注解走不同的事务模式
实战
由第一篇分布式事务之TX-LCN 我们规划了俩个TC
分别是lcn-order
服务和lcn-pay
服务,我们的思路是订单服务调用支付服务,分别在订单服务表t_order
和支付服务表t_pay
中插入插入数据。
订单服务核心代码和数据表脚本
代码
/** * @author:triumphxx * @Date:2021/10/24 * @Time:2:13 下午 * @微信公众号:北漂码农有话说 * @网站:http://blog.triumphxx.com.cn * @GitHub https://github.com/triumphxx * @Desc: **/ @RestController public class TccOrderController { @Autowired TOrderDao tOrderDao; @Autowired private RestTemplate restTemplate; //保存主键信息 private static Map<String,Integer> maps = new HashMap<>(); @PostMapping("/add-order-tcc") @Transactional(rollbackFor = Exception.class) @TccTransaction public String add(){ TOrder bean = new TOrder(); Long no = Math.round((Math.random()+1) * 1000); bean.setTId(no.intValue()); bean.setTName("order"+no.intValue()); JSONObject date = new JSONObject(); date.put("tId",bean.getTId()); date.put("tName",bean.getTName()+"pay"); restTemplate.postForEntity("http://lcn-pay/add-pay-tcc",date,String.class); // int i = 1/0; tOrderDao.insert(bean); maps.put("a",no.intValue()); System.out.println("本次新增订单号为" + no.intValue()); return "新增订单成功"; } public String confirmAdd(){ System.out.println("order confirm "); System.out.println("订单新增成功 id为:"+maps.get("a")); maps.clear(); return "新增订单成功"; } /** * 逆sql * @param * @return */ public String cancelAdd(){ Integer a = maps.get("a"); System.out.println("order cancel "); System.out.println("删除的订单号为 :"+a); tOrderDao.deleteByPrimaryKey(a); maps.clear(); return "新增订单成功"; } }复制代码
代码解读:若使用TCC模式只需要在你的方法上添加注解@TccTransaction,相对于lcn模式,我们可以看到多出来很多代码,所以tcc模式的数据一致性依赖于开发者来进行控制,对于开发要求高, 需要注意的是confirm的操作,方法名必须是confirm开头,cancel操作同样的道理,在confirm中我们可以做一些业务执行 成功后的操作,cancel操作的是try的逆操作,若try是新增数据,那么cancel就是删除数据。注意代码中有一个map的数据结果, 就是进行try、confirm、cancel操作业务的唯一标识进行相应的操作。
脚本
CREATE TABLE `t_order` ( `t_id` int(11) NOT NULL, `t_name` varchar(45) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1复制代码
支付服务核心代码和数据表脚本
代码
/** * @author:triumphxx * @Date:2021/10/24 * @Time:2:26 下午 * @微信公众号:北漂码农有话说 * @网站:http://blog.triumphxx.com.cn * @GitHub https://github.com/triumphxx * @Desc: **/ @RestController public class TccPayController { @Autowired TPayDao tPayDao; //保存主键信息 private static Map<String,Integer> maps = new HashMap<>(); @PostMapping("/add-pay-tcc") @Transactional(rollbackFor = Exception.class) @TccTransaction public String addPay(@RequestBody Map map){ TPay tPay = new TPay(); tPay.setTId((Integer) map.get("tId")); tPay.setTName((String) map.get("tName")); tPayDao.insert(tPay); maps.put("a",(Integer) map.get("tId")); // int i = 1/0; System.out.println("本次新增支付号为" + (Integer) map.get("tId")); return "新增支付成功"; } public String confirmAddPay(Map map){ System.out.println("pay confirm"); System.out.println("本次新增支付号为" + maps.get("a")); maps.clear(); return "新增支付成功"; } /** * 逆sql * @param map * @return */ public String cancelAddPay(Map map){ Integer a = maps.get("a"); System.out.println("pay cancel"); System.out.println("本次删除支付号为" + maps.get("a")); tPayDao.deleteByPrimaryKey(a); maps.clear(); return "取消支付成功"; } }复制代码
脚本
CREATE TABLE `t_pay` ( `t_id` int(11) NOT NULL, `t_name` varchar(45) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1复制代码
测试流程
启动
Redis
启动
TM
启动注册中心
eureka-server
启动服务
lcn-order
启动服务
lcn-pay
请求接口
http://localhost:8001/add-order
代码搞一个异常看数据是否进行回滚
作者:北漂码农有话说
链接:https://juejin.cn/post/7023362727343030302