转载

原 荐 Spring事务实现原理详解

上文( Spring事务之切点解析详解 )中我们讲解了Spring是如何判断目标方法是否需要织入切面逻辑,其中讲解到事务逻辑的织入是通过 TransactionInterceptor 进行的,本文则主要讲解 TransactionInterceptor 是如何织入切面逻辑的。

1. 整体事务逻辑

Spring是通过Aop实现切面逻辑织入的,这里 TransactionInterceptor 实现了 MethodInterceptor 接口,这个接口则继承了 Advice 接口,也就是说,本质上 TransactionInterceptor 是只是Spring Aop中需要织入的切面逻辑的一部分。如下是 TransactionInterceptor 实现的 invoke() 方法的源码:

@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
    // 获取需要织入事务逻辑的目标类
    Class<?> targetClass = (invocation.getThis() != null ? 
        AopUtils.getTargetClass(invocation.getThis()) : null);

    // 进行事务逻辑的织入
    return invokeWithinTransaction(invocation.getMethod(), targetClass, 
        invocation::proceed);
}

这里事务逻辑的织入是封装到了 invokeWithinTransaction() 中,我们继续阅读其源码:

@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
       final InvocationCallback invocation) throws Throwable {
    // 获取@Transactional中的相关属性
    TransactionAttributeSource tas = getTransactionAttributeSource();
    final TransactionAttribute txAttr = (tas != null ? 
        tas.getTransactionAttribute(method, targetClass) : null);
    // 获取当前TransactionManager的配置,这个bean一般在配置文件中会进行配置
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 获取当前方法的一个签名
    final String joinpointIdentification = 
        methodIdentification(method, targetClass, txAttr);

    // 如果配置的TransactionManager不是CallbackPreferringPlatformTransactionManager类型的,
    // 则为当前方法的执行新建一个事务
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 为当前方法的执行新建一个事务
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, 
             joinpointIdentification);
        Object retVal = null;
        try {
            // 执行目标方法
            retVal = invocation.proceedWithInvocation();
        } catch (Throwable ex) {
            // 在执行抛出异常时对异常进行处理,并织入异常处理逻辑
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        } finally {
            // 执行事务处理完成的逻辑,无论事务是需要提交还是回滚
            cleanupTransactionInfo(txInfo);
        }
        // 提交当前事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    } else {
        final ThrowableHolder throwableHolder = new ThrowableHolder();
        try {
            // 如果当前TransactionManager实现了CallbackPreferringPlatformTransactionManager,
            // 则通过其execute()方法进行事务处理。这里CallbackPreferringPlatform-
            // TransactionManager的作用在于其提供了一个execute()方法,用于供给实现了自定义
            // 的TransactionManager的类实现事务的相关处理逻辑
            Object result = ((CallbackPreferringPlatformTransactionManager) tm)
                .execute(txAttr, status -> {
                // 获取Transaction配置
                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, 
                     joinpointIdentification, status);
                try {
                    // 调用目标方法
                    return invocation.proceedWithInvocation();
                } catch (Throwable ex) {
                    // 如果当前异常需要回滚,则进行回滚并抛出异常
                    if (txAttr.rollbackOn(ex)) {
                        if (ex instanceof RuntimeException) {
                            throw (RuntimeException) ex;
                        } else {
                            throw new ThrowableHolderException(ex);
                        }
                    } else {
                        throwableHolder.throwable = ex;
                        return null;
                    }
                } finally {
                    // 清除保存的Transaction信息
                    cleanupTransactionInfo(txInfo);
                }
            });

            // 如果执行异常,则将该异常抛出
            if (throwableHolder.throwable != null) {
                throw throwableHolder.throwable;
            }
            return result;
        } catch (ThrowableHolderException ex) {
            throw ex.getCause();
        } catch (TransactionSystemException ex2) {
            if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", 
                             throwableHolder.throwable);
                ex2.initApplicationException(throwableHolder.throwable);
            }
            throw ex2;
        } catch (Throwable ex2) {
            if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", 
                             throwableHolder.throwable);
            }
            throw ex2;
        }
    }
}

可以看到,这里Spring事务逻辑织入的主干逻辑就在 invokeWithinTransaction() 方法中,其首先会根据需要判断是否需要创建一个事务,无论是否存在事务,都将其封装为一个TransactionInfo对象。然后会调用目标方法,如果目标方法执行报错,则执行异常抛出时的操作,主要包括事务回滚和异常回调逻辑。接着会清理当前事务属性,如果当前事务是一个新的事务,则会直接提交,如果是使用保存点的事务,则会释放所持有的保存点,并且提交事务。

2. 创建事务

在进行事务逻辑织入的时候,首先进行的就是事务的创建,而Spring创建事务也不是随意创建的,其是通过事务的配置属性来判断需要创建什么类型的事务的。事务的创建主要在 createTransactionIfNecessary() 方法中,如下是该方法的源码:

protected TransactionInfo createTransactionIfNecessary(
    @Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, 
    final String joinpointIdentification) {
    // 如果TransactionAttribute的名称为空,则创建一个代理的TransactionAttribute,
    // 并且将其名称设置为需要织入事务的方法的名称
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 如果事务属性不为空,并且TransactionManager都存在,
            // 则通过TransactionManager获取当前事务状态的对象
            status = tm.getTransaction(txAttr);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" 
                     + joinpointIdentification 
                     + "] because no transaction manager has been configured");
            }
        }
    }
    
    // 将当前事务属性和事务状态封装为一个TransactionInfo,这里主要做的工作是将事务属性绑定到当前线程
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

这里对事务的创建,首先会判断封装事务属性的对象名称是否为空,如果不为空,则以目标方法的标识符作为其名称,然后通过TransactionManager创建事务。如下是TransactionManager.getTransaction()方法的源码:

@Override
public final TransactionStatus getTransaction(
    @Nullable TransactionDefinition definition) throws TransactionException {
    // 对于Spring的BasicDataSource,这里就是创建一个DataSourceTransactionObject对象,
    // 其返回值是根据具体的实现类不一样的,比如hibernate返回的是一个HibernateTransactionObject对象,
    // 而Jpa则返回的是一个JpaTransactionObject对象
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // 如果TransactionDefinition为空,则创建一个默认的TransactionDefinition
    if (definition == null) {
        definition = new DefaultTransactionDefinition();
    }

    // 判断当前方法调用是否已经在某个事务中,这里的判断方式就是判断当前的ConnectionHolder是否为空,
    // 并且当前存在的事务是否处于active状态,如果是,则说明当前存在事务。如果这里不存在事务,一般的,
    // 其ConnectionHolder是没有值的
    if (isExistingTransaction(transaction)) {
        // 判断当前方法的事务的传播性是否支持已经存在的事务属性,将封装后的事务属性返回
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // 这里timeout的默认值是-1,用户设置的过期时间不能比-1要小
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", 
            definition.getTimeout());
    }
    
    // 如果当前方法的执行不在某个事务中,并且当前方法标注了事务传播性为PROPAGATION_MANDATORY,
    // 由于这种传播性要求当前方法执行必须处于某一事务中,并且该事务必须是已存在的,这里不存在,因而
    // 抛出异常
    if (definition.getPropagationBehavior() == 
        TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException("No existing transaction" 
            + " found for transaction marked with propagation 'mandatory'");
    } else if (definition.getPropagationBehavior() == 
               TransactionDefinition.PROPAGATION_REQUIRED 
        || definition.getPropagationBehavior() == 
               TransactionDefinition.PROPAGATION_REQUIRES_NEW 
        || definition.getPropagationBehavior() == 
               TransactionDefinition.PROPAGATION_NESTED) {
        // 判断当前方法的事务传播性是否为REQUIRED,REQUIRES_NEW或NESTED中的一种,
        // 由于当前方法走到这里说明是不存在事务的,因而需要为其创建一个新事务。这里suspend()方法
        // 调用时传了一个null进去,如果用户设置了事务事件回调的属性,则会将这些回调事件暂时挂起,
        // 并且封装到SuspendedResourcesHolder中,如果没有注册回调事件,该方法将会返回null
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" 
                + definition.getName() + "]: " + definition);
        }
        try {
            // 判断用户是否设置了永远不执行事务回调事件的属性
            boolean newSynchronization = 
                (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 将当前事务属性的定义,已经存在的事务和挂起的事务回调事件都封装为一个
            // DefaultTransactionStatus对象
            DefaultTransactionStatus status = newTransactionStatus(definition, 
                 transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 通过调用Jdbc的api开始一个事务
            doBegin(transaction, definition);
            // 将当前事务的一些属性,比如隔离级别,是否只读等设置到ThreadLocal变量中进行缓存
            prepareSynchronization(status, definition);
            return status;
        } catch (RuntimeException | Error ex) {
            // 如果当前事务抛出异常,则重新加载挂起的事务和事务事件,这里因为当前是不存在事务的,
            // 因而传入的需要加载的事务为null
            resume(null, suspendedResources);
            throw ex;
        }
    } else {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT 
            && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction " 
               + "initiated; isolation level will effectively be ignored: " + definition);
        }
        // 走到这一步说明事务的传播性为SUPPORTS,NOT_SUPPORTED或者NEVER,由于当前是不存在事务的,
        // 对于这几种传播性而言,其也不需要事务,因而这里不用做其他处理,直接封装一个空事务的
        // TransactionStatus即可
        boolean newSynchronization = 
            (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, 
             newSynchronization, debugEnabled, null);
    }
}

这里getTransaction()方法主要可以分为两个方面:①当前方法调用已经存在于某个事务中;②当前方法调用前不存在某个事务。对于第一点,我们下面会讲到,而对于第二点,这里主要进行了三个分支的判断:①如果事务传播性为MANDATORY,则抛出异常,因为其要求必须存在一个事务;②如果事务传播性为REQUIRED,REQUIRES_NEW或NESTED,则新建一个事务供其使用;③对于其他情况,即SUPPORTS,NOT_SUPPORTED和NEVER,由于当前不存在事务,因而可以直接创建一个包含空事务的TransactionStatus返回即可。对于已有事务的处理,其主要在handleExistingTransaction()方法中进行,如下是该方法的源码:

private TransactionStatus handleExistingTransaction(TransactionDefinition definition, 
    Object transaction, boolean debugEnabled) throws TransactionException {
    // 走到这里说明已经存在了一个事务,而如果当前方法的执行不支持已有的事务,则抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
            "Existing transaction found for transaction marked with propagation 'never'");
    }

    // 对于NOT_SUPPORTED类型的传播性,如果当前已经存在了事务,则会将该事务挂起,然后保证当前方法
    // 的执行是不存在事务的;如果当前不存在事务,则不进行处理
    if (definition.getPropagationBehavior() == 
        TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        // 挂起已经存在的事务
        Object suspendedResources = suspend(transaction);
        // 判断用户是否设置了始终要执行事务事件回调函数
        boolean newSynchronization = 
            (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 封装事务结果,这里由于当前方法要求不在事务中执行,因而传入的transaction为null,而对于
        // 挂起的事务,其后面是需要进行重新加载的,因而将挂起的事务属性suspendedResources传入了
        return prepareTransactionStatus( definition, null, false, newSynchronization, 
            debugEnabled, suspendedResources);
    }

    // 如果当前事务的传播性为REQUIRES_NEW,则将当前事务挂起,这种传播性要求当前方法必须在一个新事务
    // 中进行,因而这里会创建一个新的事务,然后开启该事务
    if (definition.getPropagationBehavior() == 
        TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction " 
                + " with name [" + definition.getName() + "]");
        }
        // 挂起当前事务和事务事件回调函数
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 判断用户是否设置了永不执行事务事件回调函数
            boolean newSynchronization = 
                (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 新建一个一个事务执行
            DefaultTransactionStatus status = newTransactionStatus(definition, 
                 transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 开始新的事务
            doBegin(transaction, definition);
            // 将新事务的隔离性,是否只读等属性设置到ThreadLocal中
            prepareSynchronization(status, definition);
            return status;
        } catch (RuntimeException | Error beginEx) {
            // 如果新建的事务中抛出异常,则重新加载已有的事务,并且执行该事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // 判断当前事务传播性是否为NESTED,如果是,然后判断当前数据源是否支持嵌套事务,如果不支持,
    // 则抛出异常;这里如果数据源指定了使用保存点的方式执行嵌套事务,则会使用保存点模拟嵌套事务,
    // 如果没有指定,则会使用用户自定义的方式执行嵌套事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 如果不支持嵌套事务,则抛出异常
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                "Transaction manager does not allow nested transactions by default - " +
                "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" 
                         + definition.getName() + "]");
        }
        
        // 如果指定了使用保存点的方式执行嵌套事务,则创建一个保存点执行当前方法
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                prepareTransactionStatus(definition, transaction, false, false, 
                    debugEnabled, null);
            // 创建保存点
            status.createAndHoldSavepoint();
            return status;
        } else {
            // 如果没有指定使用保存点的方式,则由用户自行使用其自定义的方式,这里只是会开启一个新的事务
            boolean newSynchronization = 
                (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }

    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    
    // 走到这一步,说明事务的传播性为SUPPORTS或REQUIRED,这两种事务传播性都只会继承当前已经存在的事务
    // 来执行当前方法。因而这里在进行继承的时候会判断已经存在的事务与当前方法的事务属性是否有冲突。这里的
    // 判断主要包含两个方面:事务隔离级别和是否只读。如果当前方法的事务隔离级别和只读属性与集成来的事务
    // 不一致,则抛出异常
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = 
                TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            // 判断事务隔离级别是否一致,不一致则抛出异常
            if (currentIsolationLevel == null 
                || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction " 
                    + " with definition [" + definition + "] specifies isolation " 
                    + " level which is incompatible with existing transaction: " 
                    + (currentIsolationLevel != null ?
                       isoConstants.toCode(currentIsolationLevel, 
                       DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            // 判断只读属性是否一致,如果不一致,则抛出异常
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction " 
                    + " with definition [" + definition + "] is not marked as " 
                    + " read-only but existing transaction is");
            }
        }
    }
    
    // 由于SUPPORTS和REQUIRED都是继承父事务来执行当前方法,因而这里只是对父事务进行封装,然后返回
    boolean newSynchronization = 
        (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, 
         debugEnabled, null);
}

这里handleExistingTransaction()方法主要是在当前已经存在事务的情况下对是否需要新建事务的判断。这里判断的依据主要是根据当前方法的事务传播性来进行的:①如果传播性为NEVER,则直接抛出异常;②如果传播性为NOT_SUPPORTED,则会挂起当前事务,并且在无事务的状态下执行当前方法;③如果为REQUIRES_NEW,则会挂起当前事务,并且会新建一个事务执行当前方法;④如果是嵌套事务,则会判断当前数据源是否支持嵌套事务,并且会判断是否需要使用保存点的方式执行嵌套事务;⑤如果为SUPORTS或REQUIRED,则会直接复用当前事务执行目标方法。

3. 开启事务

关于事务的开启,在前面已经涉及到了,也就是这里的doBegin()方法,这里doBegin()的主要工作是开启一个数据库的Connection,并且设置autoCommit,isolation和readOnly等属性。这里需要说明的是,doBegin()方法是一个模板方法,不同的TransactionManager是有不同的实现方式的,这里主要以DataSourceTransactionManager的实现原理进行讲解。如下是doBegin()方法的源码:

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    // 对于DataSourceTransactionManager,其transaction对象就是DataSourceTransactionObject类型
    // 的,因而这里可以直接对其进行强转
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        // 对于初次开启的事务,这里是没有ConnectionHolder的,因而会走这里的if逻辑
        if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 通过DataSource对象获取一个数据库Connection
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            // 新建一个ConnectionHolder,这里第二个参数表示是否为新建的ConnectionHolder
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        // 设置同步状态
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        // 这里prepareConnectionForTransaction()方法的主要工作是读取definition中的readOnly和
        // isolation属性,并将其设置到ConnectionHolder中,如果ConnectionHolder中之前已经存在了
        // isolation数值,并且不和definition中的相同,则将其设置到previousIsolationLevel中
        Integer previousIsolationLevel = 
            DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        // 设置autoCommit为false,以此控制事务的提交
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }

        // 这里主要是从Statement层面对readOnly属性进行设置
        prepareTransactionalConnection(con, definition);
        txObject.getConnectionHolder().setTransactionActive(true);

        // 判断definition中的timeout属性是否存在有效值,存在则将其设置到ConnectionHolder中
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // 如果是新建的ConnectionHolder,则将其绑定到ThreadLocal对象中
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(obtainDataSource(), 
                txObject.getConnectionHolder());
        }
    } catch (Throwable ex) {
        // 如果上述执行过程中抛出了异常,并且是新建的ConnectionHolder,则将Connection释放
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        } throw new CannotCreateTransactionException("Could not open JDBC Connection " 
            + " for transaction", ex);
    }
}

4. 异常处理

对于异常处理,Spring事务主要会分两种情况:①如果当前异常是可以略过的异常,则直接提交;②如果当前异常不能略过,则进行回滚。具体的实现源码在 completeTransactionAfterThrowing() 方法中,如下是该方法的源码:

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, 
          Throwable ex) {
    // 如果当前存在事务信息,并且事务状态不为空,则进行处理
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" 
               + txInfo.getJoinpointIdentification() + "] after exception: " + ex);
        }
        // 如果当前事务配置了需要在当前异常类型进行回滚,则进行回滚。
        if (txInfo.transactionAttribute != null 
            && txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                // 这里在进行回滚的时候主要是借助于Connection对象进行回滚的,
                // 另外,在进行回滚的时候也会调用事务事件函数
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            } catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by rollback exception",ex);
                ex2.initApplicationException(ex);
                throw ex2;
            } catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by rollback exception",ex);
                throw ex2;
            }
        } else {
            try {
                // 如果当前异常不是需要回滚的类型,则不进行回滚,而直接提交当前事务
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            } catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            } catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                throw ex2;
            }
        }
    }
}

可以看到,这里进行回滚的操作主要在TransactionManager.rollback()方法中,如下是该方法的源码:

@Override
public final void rollback(TransactionStatus status) throws TransactionException {
    // 如果当前事务已经完成,则抛出异常
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback" 
            + " more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // 进行事务回滚
    processRollback(defStatus, false);
}

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
        boolean unexpectedRollback = unexpected;
        try {
            // 这里会触发before completion时间,需要注意的是,
            // 只有事务状态是新建的事务事件同步器时才会触发
            triggerBeforeCompletion(status);

            // 如果当前事务是一个保存点类型的事务,则回滚到保存点处
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Rolling back transaction to savepoint");
                }
                status.rollbackToHeldSavepoint();
            } else if (status.isNewTransaction()) {
                // 如果当前事务是一个新建的事务,此时才会进行回滚
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                doRollback(status);
            } else {
                // 如果不是上面两种情况,则说明当前事务是在一个大的事务中,此时会设置回滚状态,
                // 异常则由外部调用直接抛出
                if (status.hasTransaction()) {
                    // 如果设置了仅仅局部回滚,或者是设置了全局的异常回滚,则设置回滚状态
                    if (status.isLocalRollbackOnly() 
                        || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed -" 
                                + " marking existing transaction as rollback-only");
                        }
                        doSetRollbackOnly(status);
                    } else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - " 
                                + "letting transaction originator decide on rollback");
                        }
                    }
                } else {
                    logger.debug("Should roll back transaction but cannot - " 
                                 + "no transaction available");
                }
                if (!isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = false;
                }
            }
        } catch (RuntimeException | Error ex) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }

        // 触发after completion事件,这里也只有新建的事务事件同步器才会触发
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
        if (unexpectedRollback) {
            throw new UnexpectedRollbackException(
                "Transaction rolled back because it has been marked as rollback-only");
        }
    } finally {
        // 清理当前线程中保存的事务状态,如果当前事务外层有挂起的事务,则重新加载该事务
        cleanupAfterCompletion(status);
    }
}

这里对于事务的回滚,可以看出,只有最外层的事务(其会被标记为新建的事务)才会回滚,而保存点事务,或者是内部事务,都不会回滚,其只是回滚到保存点处或者是标记回滚状态。

5. 提交事务

对于事务的提交,其主要在 commitTransactionAfterReturning() 方法中,在该方法中,Spring首先会判断该事务是否已经设置了全局的回滚,或者是前面由于异常而需要局部回滚,那么就会执行回滚相关的策略;如果不需要回滚,就会判断该事务是否为保存点事务,如果是,则释放当前保存点,如果不是,则判断当前事务是否为最外层的事务,如果是,此时才会进行提交动作。另外,在事务提交之前和之后,会分别触发before completion和after completion事件。如下是 commitTransactionAfterReturning() 方法的源码:

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" 
                         + txInfo.getJoinpointIdentification() + "]");
        }
        // 如果事务信息和事务状态不为空,则执行事务提交策略
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}
@Override
public final void commit(TransactionStatus status) throws TransactionException {
    // 如果当前事务还未完成,则抛出异常
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback" 
            + " more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // 如果当前事务由于异常而设置了需要局部回滚,则进行回滚
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus, false);
        return;
    }

    // 如果当前事务设置了全局的需要回滚事务,那么就进行事务回滚
    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but" 
                         + " transactional code requested commit");
        }
        processRollback(defStatus, true);
        return;
    }

    // 执行事务提交命令
    processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        // 标记before completion事件师傅触发完成
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            // 准备事务提交相关的操作,这里只是一个hook方法,用于供给子类进行事务的自定义
            prepareForCommit(status);
            // 触发before commit事件
            triggerBeforeCommit(status);
            // 触发before completion事件
            triggerBeforeCompletion(status);
            // 标识before completion事件是否调用过
            beforeCompletionInvoked = true;

            // 如果当前事务是一个保存点事务,则释放当前的保存点,以使外层事务继续执行
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                status.releaseHeldSavepoint();
            } else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                // 如果当前事务是一个最外层的事务,则对该事务进行提交
                unexpectedRollback = status.isGlobalRollbackOnly();
                doCommit(status);
            } else if (isFailEarlyOnGlobalRollbackOnly()) {
                // 如果标识了全局的需要回滚,则进行回滚
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            // 通过抛出异常的方式进行回滚
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                    "Transaction silently rolled back because it has been"  
                    + " marked as rollback-only");
            }
        } catch (UnexpectedRollbackException ex) {
            // 在抛出异常时触发after completion事件
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        } catch (TransactionException ex) {
            // 判断如果需要在commit失败时进行回滚,则进行回滚,否则触发after completion事件
            if (isRollbackOnCommitFailure()) {
                doRollbackOnCommitException(status, ex);
            } else {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        } catch (RuntimeException | Error ex) {
            // 如果before completion事件没有触发,则对其进行再次触发
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
        }
        
        try {
            // 触发after commit事件
            triggerAfterCommit(status);
        } finally {
            // 触发after completion事件
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    } finally {
        // 清理当前线程中保存的事务状态信息
        cleanupAfterCompletion(status);
    }
}

6. 小结

本文首先讲解了Spring事务整体的结构,然后讲解了Spring是如何处理事务传播性的,接着对事务的创建,回滚和提交原理进行了详细介绍。

原文  https://my.oschina.net/zhangxufeng/blog/1973493
正文到此结束
Loading...