阅读 140

Transactional注解原理解析(spring transaction注解原理)

什么是@Transactional? @Transactional是Spring这种用于处理事务的注解,基于拦截器进行commit或rollback

使用例子

下面举一个加了@Transactional注解的方法addUser(),并且调用了另一个隔离级别为NESTED的addUser2()方法

 @Service public class UserService {     // .. 省略其他     @Transactional     public void addUser() {         userMapper.addItem("nA", "cA");         // 调用的addUser2()方法是嵌套模式         ((UserService) applicationContext.getBean(UserService.class)).addUser2();         userMapper.addItem("nB", "cB");     }     @Transactional(propagation = Propagation.NESTED)     public void addUser2() {         userMapper.addItem("nC", "cC");     } } 复制代码

执行流程时序图

Transactional事务注解执行时区图.png

实际上上面的例子是两个事务。是怎么实现的,见步骤如下

上述例子调用执行步骤

  1. 拦截器intercept拦截,由于service类没继承接口,进入CglibAopProxy的intercept拦截器中

  2. 调用getInterceptorsAndDynamicInterceptionAdvice()方法,然后工厂生成拦截chain集合(拦截器列表), 然后将拦截器、target等封装成一个对象CglibMethodInvocation,然后调用proceed执行事务逻辑

  3. 走到这步,开始执行事务逻辑。通过TransactionInterceptor类执行invokeWithinTransaction(), 然后走到TransactionAspectSupport类(重点类!!含事务处理逻辑)

  4. 走到TransactionAspectSupport类,其具体逻辑如下(重点)

  • 4.1 getTransaction()获取事务管理器,然后doGetTransaction()获取事务连接上下文。保存线程上下文在ThreadLocal

  • 4.2 判断是否存在事务,存在则:

以下是各个隔离级别的处理方式   - PROPAGATION_NEVER:抛异常 - PROPAGATION_NOT_SUPPORTED:不加事务 - PROPAGATION_REQUIRES_NEW:创建新事务 - PROPAGATION_NESTED:创建保存点 - PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED:合并已有的事务 复制代码

  • 4.3 不存在事务走这个逻辑:

以下是各个隔离级别的处理方式   - PROPAGATION_MANDATORY:抛异常 - PROPAGATION_REQUIRED 或 PROPAGATION_REQUIRES_NEW 或 PROPAGATION_NESTED:创建一个新事务。会将mysql自动提交关闭,然后标记当前事务开启。 - 其他传播级别:创建一个空事务 复制代码

  • 4.4 prepareTransactionInfo()把事务信息TransactionInfo绑定到线程上下文

  1. 然后执行invocation.proceedWithInvocation()执行到切面业务代码逻辑,会执行到invokeJoinpoint()真正调用业务代码逻辑 ,就是上面使用例子的逻辑

 @Service public class UserService {     // .. 省略其他     @Transactional     public void addUser() {         userMapper.addItem("nA", "cA");         // 调用的addUser2()方法是嵌套模式         ((UserService) applicationContext.getBean(UserService.class)).addUser2();         userMapper.addItem("nB", "cB");     }     @Transactional(propagation = Propagation.NESTED)     public void addUser2() {         userMapper.addItem("nC", "cC");     } } 复制代码

  1. 执行完之后回到TransactionAspectSupport,执行commit或rollback逻辑

步骤如下: [正常执行,提交事务]- 如果有保存点,释放保存点 - 新事务的话,获取数据库连接并提交事务con.commit() [正常执行,回滚事务] - 如果有保存点,回滚保存点 - 新事务的话,回滚事务con.rollback() 复制代码

源码流程

class CglibAopProxy implements AopProxy, Serializable {     // 省略其他逻辑。。。。。     @Override     public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {         // 调用Gglib动态代理    省略其他逻辑。。。。。         retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();         // 省略其他逻辑。。。。。     } } public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {     // 省略其他逻辑。。。。。     // 创建事务(重要!!!!重要!!!!)     TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);     // 创建事务的核心方法====================================     protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,                                                            @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {         // 省略其他逻辑。。。         TransactionStatus status = null;         if (txAttr != null) {             if (tm != null) {                 status = tm.getTransaction(txAttr);             }         }         // prepareTransactionInfo()把事务信息TransactionInfo绑定到线程上下文   省略其他逻辑。。。         return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);     }     // 获取事务     public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) {         // 在spring中,doXxx都是干正事的,所以这里才是调用事务的方法         Object transaction = doGetTransaction();         // 处理事务已经存在的情况,比如例子中调用addUser2就会调到这里来         if (isExistingTransaction(transaction)) {             return handleExistingTransaction(def, transaction, debugEnabled);         }         // 事务超时,默认没超时限制         if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {             throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());         }         // PROPAGATION_MANDATORY 不允许有事务,有的话直接抛异常         if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {             throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");         } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {             // 走到这步,说明 可以创建新事务的,共三种:PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED             SuspendedResourcesHolder suspendedResources = suspend(null);             // 省略其他代码。。。。             return startTransaction(def, transaction, debugEnabled, suspendedResources);         } else {             // 创建空事务        Create "empty" transaction: no actual transaction, but potentially synchronization.             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);             return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);         }     }     // 启动一个新事务(通过上面方法的 startTransaction() 可调用到)     protected void doBegin(Object transaction, TransactionDefinition definition) {         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;// 获取 事务数据源         Connection con = null;         try {             if (!txObject.hasConnectionHolder() ||                     txObject.getConnectionHolder().isSynchronizedWithTransaction()) {                 Connection newCon = obtainDataSource().getConnection();                 if (logger.isDebugEnabled()) {                     logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");                 }                 // 没有数据库连接connection,就创建一个新的放进去                 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);             }             // 设置事务同步,并且获取 数据库connection,并设置其他参数             txObject.getConnectionHolder().setSynchronizedWithTransaction(true);             con = txObject.getConnectionHolder().getConnection();             Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);             txObject.setPreviousIsolationLevel(previousIsolationLevel);             txObject.setReadOnly(definition.isReadOnly());             // 如果当前mysql开启了自动提交,关闭mysql自动提交,并且给当前事务对象autoCommit提交设置为true(就是改为当前spring自己控制)             if (con.getAutoCommit()) {                 txObject.setMustRestoreAutoCommit(true);                 con.setAutoCommit(false);             }             prepareTransactionalConnection(con, definition);             txObject.getConnectionHolder().setTransactionActive(true);// 设置事务激活             int timeout = determineTimeout(definition);             if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {                 txObject.getConnectionHolder().setTimeoutInSeconds(timeout);             }             // Bind the connection holder to the thread.             if (txObject.isNewConnectionHolder()) {                 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());             }         }         // 省略其他代码。。。     }     // 上面getTransaction()方法如果判断到已存在事务,会走这个逻辑     private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) {         // TransactionDefinition.PROPAGATION_NEVER:不能存在事务,抛异常         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {             throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");         }         // 不支持事务,所以会挂起当前事务         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {             if (debugEnabled) {                 logger.debug("Suspending current transaction");             }             // 事务挂起的作用是什么?            // - 方法A调用方法B,方法A打开的 Transaction将挂起,方法B中任何数据库操作,都不在该Transaction的管理之下            // - 是怎么操作挂起的?其实就是把他从threadLocal中移除除去(ThreadLocal<Map<Object, Object>> resources),并保存在另一个数据中,避免被提交了             Object suspendedResources = suspend(transaction);// 挂起             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);             return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);         }         // 支持创建新事务,这里会搞一个新事务         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {             if (debugEnabled) {                 logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");             }             SuspendedResourcesHolder suspendedResources = suspend(transaction);// 档期当前事务,并创建一个新事物             try {                 // 启动新事物                 return startTransaction(definition, transaction, debugEnabled, suspendedResources);             } catch (RuntimeException | Error beginEx) {                 resumeAfterBeginException(transaction, suspendedResources, beginEx);                 throw beginEx;             }         }         // 支持嵌套事务         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {             if (!isNestedTransactionAllowed()) {// 这个一般不会不支持                 throw new NestedTransactionNotSupportedException(                         "Transaction manager does not allow nested transactions by default - " +                                 "specify 'nestedTransactionAllowed' property with value 'true'");             }             if (debugEnabled) {                 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");             }             if (useSavepointForNestedTransaction()) {                 // Create savepoint within existing Spring-managed transaction,                 // through the SavepointManager API implemented by TransactionStatus.                 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.                 DefaultTransactionStatus status =                         prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);                 status.createAndHoldSavepoint();// 会创建一个保存点。后面提交事务的时候,会释放保存点                 return status;             } else {                 // 只有用JTA才会走到这步骤,没必要管                 return startTransaction(definition, transaction, debugEnabled, null);             }         }         // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.         if (debugEnabled) {             logger.debug("Participating in existing transaction");         }         if (isValidateExistingTransaction()) {             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {                 Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();                 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {                     Constants isoConstants = DefaultTransactionDefinition.constants;                     throw new IllegalTransactionStateException("Participating transaction with definition [" +                             definition + "] specifies isolation level which is incompatible with existing transaction: " +                             (currentIsolationLevel != null ?                                     isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :                                     "(unknown)"));                 }             }             if (!definition.isReadOnly()) {                 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {                     throw new IllegalTransactionStateException("Participating transaction with definition [" +                             definition + "] is not marked as read-only but existing transaction is");                 }             }         }         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);         return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);     } } // 执行完后,会调用到con.commit()或回滚(回滚就不看了,差不多的)操作 public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {     // 省略其他代码。。。。。。。。     private void processCommit(DefaultTransactionStatus status) throws TransactionException {         boolean unexpectedRollback = false;         // 省略其他代码         prepareForCommit(status);         triggerBeforeCommit(status);         triggerBeforeCompletion(status);         beforeCompletionInvoked = true;         // 如果有保存点,就是说嵌套事务的时候,会创建一个保存点.然后释放         if (status.hasSavepoint()) {             if (status.isDebug()) {                 logger.debug("Releasing transaction savepoint");             }             unexpectedRollback = status.isGlobalRollbackOnly();             status.releaseHeldSavepoint();// 释放保存点         } else if (status.isNewTransaction()) {             if (status.isDebug()) {                 logger.debug("Initiating transaction commit");             }             unexpectedRollback = status.isGlobalRollbackOnly();             doCommit(status);// 提交事务         } else if (isFailEarlyOnGlobalRollbackOnly()) {// 只读事务             unexpectedRollback = status.isGlobalRollbackOnly();         }         if (unexpectedRollback) {             throw new UnexpectedRollbackException(                     "Transaction silently rolled back because it has been marked as rollback-only");         }     } } 复制代码

总结

这个事务注解很简单。其实就是通过拦截器拦截注解,然后加载注解参数,根据隔离级别来判断是否需要创建事务, 如果有多个事务隔离级别,就会涉及到事务保存点, 或者是不同事务之间还会涉及掉事务挂起

  • 事务保存点:依托mysql数据库机制,用于回滚部分逻辑。比如事务有三个时间点(A->B1->B2->C),其中B1->B2是事务保存点,

如果这时候B失败,并没有抛异常到上一个方法,这时候只会回滚B1->B2执行的sql逻辑 -- 嵌套事务NESTED就是依托于事务保存点来处理,这就是为什么出异常子方法回滚了,而调用它的方法事务不会回滚(正常执行的时候子方法执行完会移除事务保存点)

-- 用法如下: BEGIN TRANSACTION A; SELECT 2; INSERT INTO TABLE1 (xx) VALUES ("xxx"); SAVE TRANSACTION B Point;  // B1  启动事务保存点 INSERT INTO TABLE (xx) VALUES ("xxx"); ROLLBACK TRANSACTION B Point; // B2  回滚事务保存点的数据 SELECT 1; INSERT INTO TABLE2 (xx) VALUES ("xxx"); COMMIT TRANSACTION A; 复制代码

  • 事务挂起:事务挂起很简单,就是移除treadLocal中数据暂存到其他地方,避免自动提交


作者:山同学
链接:https://juejin.cn/post/7048593598350098446

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐