在 spring源码系列9:事务代理的创建 一节, 事务通过定义
在 spring源码系列10:AOP代理对象的执行 一节。 总结出,不管是AOP-JDK代理还是CGLB动态代理,都会执行Advice完成增强功能。
也就是说:事务的核心功能就在这个 TransactionInterceptor
@Override public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); } 复制代码
交给父类 TransactionAspectSupport.invokeWithinTransaction()
去执行
invokeWithinTransaction方法比较长,我们看前半部分。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. 1. final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); 2. final PlatformTransactionManager tm = determineTransactionManager(txAttr); 3. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. 4. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. 5. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception 6. completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { 7. cleanupTransactionInfo(txInfo); } 8. commitTransactionAfterReturning(txInfo); return retVal; } ... } 复制代码
我们使用注解@Transactional时,注解元信息会被包装成TransactionAttribute ,此处拿到的就是@Transactional的元数据
找到一个合适的事务管理器
protected PlatformTransactionManager determineTransactionManager(TransactionAttribute txAttr) { // Do not attempt to lookup tx manager if no tx attributes are set //未设置事务属性,也没有设置beanFactory ,直接返回 if (txAttr == null || this.beanFactory == null) { return getTransactionManager(); } //如果@Transactional 指定了具体事务管理器,则根据beanname去容器中找到他 String qualifier = txAttr.getQualifier(); if (StringUtils.hasText(qualifier)) { return determineQualifiedTransactionManager(qualifier); } //如果是TransactionAspectSupport.transactionManagerBeanName指定了具体事务管理器, //beanname去容器中找 else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.transactionManagerBeanName); } //没有beanName指定具体的事务管理器Bean。 /** 1.查看transactionManager属性是否设置事务管理器对象 2.查看事务管理器缓存中有没有 3.去容器中寻找,找PlatformTransactionManager接口的实现类。getBean(PlatformTransactionManager), 找到放到事务管理器缓存中。 **/ else { PlatformTransactionManager defaultTransactionManager = getTransactionManager(); if (defaultTransactionManager == null) { defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); if (defaultTransactionManager == null) { defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); this.transactionManagerCache.putIfAbsent( DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } } return defaultTransactionManager; } } 复制代码
以集成了DataSourceTransactionManager为例。determineTransactionManager返回的就是DataSourceTransactionManager对象。
获取目标方法全名
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. 如果事务名称没有指定,则使用方法名作为事务名 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() {return joinpointIdentification;} }; } ... TransactionStatus status = tm.getTransaction(txAttr); ... return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); } 复制代码
重点在
getTransaction ()是一个模板方法,PlatformTransactionManager定义了getTransaction 方法。抽象类AbstractPlatformTransactionManager实现了getTransaction 方法。因为是模板方法。方法内很多方法都是在具体的PlatformTransactionManager实现的。(本文以DataSourceTransactionManager为例)
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { 1.尝试获取事务对象,并封装当前线程中以当前datasource为key的Connection。 Object transaction = doGetTransaction(); 2.没有配置事务属性,则创建一个默认的事务属性 if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } 3.判断是否有已经存在是否:判断条件是当前线程有Connection.并且Connection是活跃的 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. 已存在事务,则处理传播属性,然后返回。(处理两个事务之间的传播属性关系) return handleExistingTransaction(definition, transaction, debugEnabled); } ===========当前没有事务 // Check definition settings for new transaction. 4.超时时间校验 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } 5.开始处理当前事务 // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } 复制代码
下面拆解此方法
4.1.1:doGetTransaction获取事务对象 doGetTransaction的实现在DataSourceTransactionManager中,doGetTransactiond创建一个DataSourceTransactionObject用于表示事务。并尝试获取一个与当前线程关联的Connection,这一部分工作交给事务同步管理器TransactionSynchronizationManager来完成。核心在doGetResource方法上。
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; } 复制代码
以当前Datasoure对象为key ,从ThreadLocal对象resources中获取Connection
4.1.2:没有事务属性创建一个事务属性
4.1.3:判断当前是否有事务存在( 重点 ) 默认是false,子类可以重写isExistingTransaction方法。DataSourceTransactionManager中重写了此方法
@Override protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); } 复制代码
查看事务对象获取的Connectin是否为空,Connection是否活跃。 是否存在事务指:当前线程中是否存在以当前数据源为key的获取连接Connection. 4.1.3.1: handleExistingTransaction( 处理当前有事务的情况 ) 事务的传播属性处理
suspend(transaction)
挂起在代码里的表现是:(1)将当前是我的ConnectionHolder设置为null(2)从ThreadLocal对象resources中移除当前线程的Connection(3)将事务的挂起状态封装到SuspendedResourcesHolder中设置到TransactionStatus中,返回。 suspend(transaction)
挂起, doBegin(transaction, definition)
创建新事务; doBegin(transaction, definition)
创建新事务 4.1.4: 校验超时
4.1.5: 处理当前没有事务的情况
doBegin(transaction, definition)
事务创建doBegin
至此:getTransaction 返回一个封装了事务的TransactionStatus对象。 总结下:getTransaction 方法 重要点 :
prepareTransactionInfo方法主要是封装:事务管理器 PlatformTransactionManager
对象, TransactionAttribute
事务属性对象,方法名 joinpointIdentification
, TransactionStatus
对象 成一 TransactionInfo
对象。并把 TransactionInfo
对象通过 bindToThread()
方法绑定到 ThreadLocal<TransactionInfo> transactionInfoHolder
。
至此: createTransactionIfNecessary完成,得到一个TransactionInfo对象。
执行下一个增强。在没其他增强的情况下,这通常会导致调用目标对象。
在目标方法执行错误的情况下,catch异常,执行回滚。 核心在DataSourceTransactionManager.doRollback方法
Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } 复制代码
finally 一定执行,清除事务信息。 其实就是把ThreadLocal transactionInfoHolder里的新事务信息清除掉。设置为原事务信息
提交事务 核心在DataSourceTransactionManager.doCommit方法
Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit(); } 复制代码
至此整个事务执行原理完成。里面涉及到很多细节,由于篇幅的原因不能一一列出。建议多看代码。