在上篇文章 Spring 事务初始化源码分析 中分析了 Spring 事务初始化的一个过程,当初始化完成后,Spring 是如何去获取事务,当目标方法异常后,又是如何进行回滚的,又或是目标方法执行成功后,又是怎么提交的呢?此外,事务的提交和回滚由底层数据库进行控制,而在 Spring 事务使用详解中知道,Spring 事务行为可以传播,这个传播方式由 Spring 来进行控制,它是怎么控制的呢?这篇文章就来分析下 Spring 事务提交回滚的源码。
还记得在 Spring 事务初始化源码分析 中注册了一个 bean,名字为 TransactionInterceptor 吗?,它就是用来执行事务功能的,它是一个方法拦截器,如下所示:
它实现了 MethodInterceptor 接口,而该接口只有一个 invoke 方法,用来执行目标方法
public Object invoke(MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ?AopUtils.getTargetClass(invocation.getThis()) : null); // 调用父类的方法 return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
父类的 invokeWithinTransaction 方法定义了一个事务方法执行的框架,而每一步再细分为方法进行实现,代码如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation){ // 1. 获取事务属性 TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 2. 获取事务管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 3. 获取需要事务的方法名称:类目.方法名 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 4. 声明式事务 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // 5. 获取该方法上事务的信息 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 6. 目标方法执行,它是一个拦截器链 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 7. 事务回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 8. 清除事务信息 cleanupTransactionInfo(txInfo); } // 9. 事务提交 commitTransactionAfterReturning(txInfo); return retVal; } else { // 10. 编程式事务,流程和声明式事务一致 } }
一个事务方法执行流程大概有以下几个步骤:
1. 获取事务属性
2. 获取事务管理器
3. 获取需要事务的方法名称
5. 获取该方法上事务的信息
6. 目标方法执行
7. 事务回滚
8. 清除事务信息
9. 事务提交
首先去获取方法上面 Translational 注解的属性,在 Spring 事务初始化源码分析 中已经分析过了,即在 AnnotationTransactionAttributeSource.computeTransactionAttribute 中进行获取。
每个事务都由对应的事务管理器,所以在事务开始钱需要获取对应的事务管理器
protected PlatformTransactionManager determineTransactionManager(TransactionAttribute txAttr) { if (txAttr == null || this.beanFactory == null) { return getTransactionManager(); } // 事务管理器名称 String qualifier = txAttr.getQualifier(); if (StringUtils.hasText(qualifier)) { return determineQualifiedTransactionManager(this.beanFactory, qualifier); } else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } else { // 默认事务管理器 PlatformTransactionManager defaultTransactionManager = getTransactionManager(); defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); // ..... return defaultTransactionManager; } }
这里主要去获取名称的名称,为 全限定类名+方法名的方式:method.getDeclaringClass().getName() + '.' + method.getName();
该部分是 Spring 事务最复杂的部分,比如说去创建一个事务,设置事务的隔离级别,超时时间,对事务传播方式的处理,事务的挂起和恢复等;事务信息 TransactionInfo 包含了目标方法执行前的所有状态信息,如果方法执行失败,则会根据该信息来进行回滚。
对应方法为:
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
代码如下所示:
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // 设置事务的名称,为方法全限定名joinpointIdentification if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 获取事务 status = tm.getTransaction(txAttr); } } // 创建事务信息 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
在方法 getTransaction 中获取事务,是最为复杂的逻辑,在其中处理隔离级别,超时时间和传播方式等。
public final TransactionStatus getTransaction(TransactionDefinition definition){ // 获取事务 Object transaction = doGetTransaction(); // ... // 如果已经存在事务了,则处理事务的传播方式,如挂起存在的事务,新建事务等 if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } // ..... // 如果不存在事务,且事务的传播方式为 mandatory, 则抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException("...."); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); // 如果事务的传播方式为 requested, requestes_new,nested,则会新建一个事务 try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 第三个参数为true表示新建事务 DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 构造 transaction,包括隔离级别,timeout,如果是新连接,则绑定到当前线程 doBegin(transaction, definition); // 同步新事务 prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
获取事务 doGetTransaction(),在该方法中,会根据 DataSource 获取一个连接,如下:
protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); //如果设置了允许嵌套事务,则开启保存点;只有嵌套事务才有保存点 txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 根据 DataSource 获取连接,ConnectionHolder为一个数据库连接 ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }
之后,判断当前线程是否存在事务,如果存在事务,则根据事务的传播方式来处理已存在的事务,这里先不看。
如果不存在事务且事务的传播方式为 requested, requestes_new,nested,则会新建一个事务:
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//definition事务属性 //transaction事务 //newTransaction是否事务新事务 //suspendedResources需要挂起的事务 protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
当获取到一个新的事务后,需要设置事务的一些信息,比如隔离级别,timeout 等,这些功能不是由 Spring 来控制,而是由底层的数据库来控制的,数据库连接的设置是在 doBegin 方法中进行处理:
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 数据库连接 Connection con = null; //如果当前事务不存在数据库连接,或者,当前连接的事务同步设置为 true,则需要获取新的数据库连接 if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 获取新连接 Connection newCon = obtainDataSource().getConnection(); // 事务绑定新连接 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 获取和设置隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // 由 Spring 来控制提交方式 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } prepareTransactionalConnection(con, definition); // 设置当前线程存在事务的标志 txObject.getConnectionHolder().setTransactionActive(true); // 获取和设置超时时间 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } //如果是新连接,则绑定到当前线程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } //其他代码...... } // ====获取隔离级别 public static Integer prepareConnectionForTransaction(Connection con, TransactionDefinition definition){ // 设置只读标识 if (definition != null && definition.isReadOnly()) { con.setReadOnly(true); //.... } // 获取隔离级别 Integer previousIsolationLevel = null; if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { // 从数据库连接获取隔离级别 int currentIsolation = con.getTransactionIsolation(); if (currentIsolation != definition.getIsolationLevel()) { previousIsolationLevel = currentIsolation; con.setTransactionIsolation(definition.getIsolationLevel()); } } return previousIsolationLevel; }
当设置完事务的信息后,需要把事务信息记录在当前线程中:
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
现在来处理已经存在事务的情况,
if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); }
判断是否存在事务,依据是事务中有连接,且 TransactionActive 为 true
protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
如果已经存在事务,则会根据事务的传播方式来进行处理,比如 requires_new, nested 等是如何处理:
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction){ // 如果传播方式为 never, 则抛异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException("..."); } // 如果传播方式为 not_supported, 则把当前存在的事务挂起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { // 挂起当前事务 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 如果传播方式为 requires_new, 则挂起当前事务,新建一个新事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { // 挂起当前事务 SuspendedResourcesHolder suspendedResources = suspend(transaction); // 如果还没有激活事务,则新建事务 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 设置数据库的隔离级别,timeout等 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; //.... } // 如果传播方式为 nested,则新建事务,但是不会把存在的事务挂起,它是一个子事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 如果不支持嵌套事务,抛异常 if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException(""); } // 如果支持保存点,则创建保存点 if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 创建保存点 status.createAndHoldSavepoint(); return status; } else { // 如果不支持保存点,则和 requires_new 是一样的 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // 如果传播方式为 supports和required boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
挂起事务,就是把当前事务的状态记录下来,后续在对该事务进行恢复。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); Object suspendedResources = null; if (transaction != null) { suspendedResources = doSuspend(transaction); } String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } //..... } // 挂起事务doSuspend protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 把事务的连接置空 txObject.setConnectionHolder(null); // 从当前线程中移除 return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
当经过上面一系列操作获取到事务信息后,再根据事务信息来封装到 TransactionInfo 中:
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { // 封装事务信息 TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // 设置事务状态 txInfo.newTransactionStatus(status); } }
到这里,目标方法执行之前的事务准备工作都已做好了,之后,会调用 InvocationCallback.proceedWithInvocation 来执行目标方法,如果执行失败,则会进行事务的回滚操作:
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { // 判断异常是不是 RunntimeException 和 Error if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { // 回滚事务 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); // ......... } else { // 如果是其他类型的异常,则正常提交 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); // ....... } } } //判断是否回滚的异常,当前可以通过rolbackFor属性来修改 public boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error); }
public final void rollback(TransactionStatus status){ // 如果事务已完成,则回滚会抛异常 if (status.isCompleted()) { throw new IllegalTransactionStateException("...."); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus, false); } // 回滚事务 private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; // 自定义触发器的调用,不知道干嘛用??? triggerBeforeCompletion(status); // 如果有保存点,则回滚到保存点 if (status.hasSavepoint()) { status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { // 如果当前事务为独立的事务,则回滚 doRollback(status); } else { // 如果一个事务中又有事务,如 required,该事务可以看作一个事务链, //那么当其中的一个事务需要回滚的时候,并不是立马进行回滚, //而是只是设置回滚状态,到最后再统一回滚 if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { // 只是设置回滚状态 doSetRollbackOnly(status); } } //....... } //.......... }finally { // 清空记录并恢复被挂起的事务 cleanupAfterCompletion(status); } }
事务的回滚操作,如果是嵌套事务,且有保存点的话,直接回滚到保存点,嵌套事务的回滚不会影响到外部事务,也就是说,外部事务不会回滚。回滚到保存点是根据底层数据库来操作的:
public void rollbackToHeldSavepoint() throws TransactionException { Object savepoint = getSavepoint(); // 回滚到保存点 getSavepointManager().rollbackToSavepoint(savepoint); // 释放保存点 getSavepointManager().releaseSavepoint(savepoint); setSavepoint(null); } // 回滚到保存点 public void rollbackToSavepoint(Object savepoint) throws TransactionException { ConnectionHolder conHolder = getConnectionHolderForSavepoint(); conHolder.getConnection().rollback((Savepoint) savepoint); conHolder.resetRollbackOnly(); // ...... } // 释放保存点 public void releaseSavepoint(Object savepoint) throws TransactionException { ConnectionHolder conHolder = getConnectionHolderForSavepoint(); conHolder.getConnection().releaseSavepoint((Savepoint) savepoint); }
如果没有保存点,则直接回滚,也是使用数据库的API 来操作的:
protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); con.rollback(); }
还有一种情况, 如果一个事务中又有事务,如 required, 该事务可以看作一个事务链,那么当其中的一个事务需要回滚的时候,并不是立马进行回滚,而是只是设置回滚状态,到最后再统一回滚。
事务回滚后需要对事务信息进行清除:
private void cleanupAfterCompletion(DefaultTransactionStatus status) { // 设置完成状态 status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { // 清除事务信息 doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { // 恢复被挂起的事务 Object transaction = (status.hasTransaction() ? status.getTransaction() : null); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } }
清除事务信息:
protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 从当前线程中移除数据库连接 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(obtainDataSource()); } //重置数据库连接 Connection con = txObject.getConnectionHolder().getConnection(); if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); // 如果是新连接,则释放连接 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); } txObject.getConnectionHolder().clear(); }
恢复被挂起的事务:
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder){ if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { doResume(transaction, suspendedResources); } List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); doResumeSynchronization(suspendedSynchronizations); } } } // 恢复事务,把事务和当前线程绑定 protected void doResume(Object transaction, Object suspendedResources) { TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources); }
当目标方法执行成功,没有抛出异常,则事务可以正常提交了;但是再上面分析事务回滚的时候,还有一种情况没有分析,就是如果一个事务嵌套再一个事务里面,是一个事务链,如果其中的某个事务需要回滚,它并不会真正的立马进行回滚,而是设置一个回滚标识,由最外层的事务来统一进行回滚;所以再提交事务之前,还需要进行判断。
public final void commit(TransactionStatus status) throws TransactionException { // 如果事务已完成,则不能提交 if (status.isCompleted()) { throw new IllegalTransactionStateException("..."); } // 判断嵌套事务是否设置了回滚标识,如果嵌套事务设置了回滚标识,则整个事务链都不会提交 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { processRollback(defStatus, true); return; } // 提交事务 processCommit(defStatus); }
提交事务:
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { //..... // 如果由保存点则释放保存点 if (status.hasSavepoint()) { unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { unexpectedRollback = status.isGlobalRollbackOnly(); // 提交 doCommit(status); } } catch (RuntimeException | Error ex) { // 如果提交过程中出现异常,则还是会回滚 doRollbackOnCommitException(status, ex); throw ex; } // ......... } // 数据库连接进行回滚 protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); con.commit(); }
到这里,Spring 事务的获取,提交,回滚去分析完毕了,流程还是比较清除的
可以关注本人公众号查看更多文章:Java技术大杂烩