为了解决在事务运行过程中大颗粒度资源锁定的问题,业界提出一种新的事务模型,它是基于业务层面的事务定义。锁粒度完全由业务自己控制。它本质是一种补偿的思路。它把事务运行过程分成 Try、Confirm / Cancel 两个阶段。在每个阶段的逻辑由业务代码控制。这样就事务的锁粒度可以完全自由控制。业务可以在牺牲隔离性的情况下,获取更高的性能。
整体流程如下图:
摘自: www.iocoder.cn/TCC-Transac…
在tcc-transaction中,一个tcc事务可以包含多个业务活动,tcc-transaction把每个事务活动都抽象成事务的参与者,每个事务可以包含多个参与者。
我们先看一看tcc-transaction基础架构设计。
事务管理器负责管理事务以及参与者,spring aop的方式在业务前后进行拦截,负责将参与者加入到事务中,并将事务注册进事务管理器,同时将事务存储在事务存储器进行持久化,事务补偿job通过定时检查事务存储器里的事务状态来对事务进行补偿。
显而易见,tcc-transaction入口就是事务拦截器,事务拦截器通过两个切面的方式将所有组件组织在一起。
我们先看看Transaction这个类的属性。
/* * 事务编号 */ private TransactionXid xid; /** * 事务状态 */ private TransactionStatus status; /** * 事务类型 */ private TransactionType transactionType; /** * 重试次数 */ private volatile int retriedCount = 0; /** * 创建时间 */ private Date createTime = new Date(); /** * 最后更新时间 */ private Date lastUpdateTime = new Date(); /** * 乐观锁控制 */ private long version = 1; /** * 多个参与者 */ private List<Participant> participants = new ArrayList<Participant>(); // 附带属性映射 用于隐式传参 private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>(); 复制代码
其中包含TransactionXid、TransactionStatus、TransactionType以及Participant。
TransactionXid
/** * xid 格式标识 */ private int formatId = 1; /** * 全局事务编号 */ private byte[] globalTransactionId; /** * 分支事务编号 */ private byte[] branchQualifier; 复制代码
TransactionStatus
TRYING(1), CONFIRMING(2), CANCELLING(3); 复制代码
TransactionType
/** * 根事务 */ ROOT(1), /** * 分支事务 */ BRANCH(2); 复制代码
/** * 事务id */ private TransactionXid xid; /** * confirm方法上下文 */ private InvocationContext confirmInvocationContext; /** * cancel方法上下文 */ private InvocationContext cancelInvocationContext; /** * 执行器 */ private Terminator terminator = new Terminator(); /** * 上下文编辑 */ Class<? extends TransactionContextEditor> transactionContextEditorClass; public void rollback() { terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass); } public void commit() { terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass); } 复制代码
提供事务的获取、发起、提交、回滚,参与者的新增等等方法。
这些先看看,有个印象,下面会详细讲解其中的内容。
事务拦截器由两个spring aop切面实现,如下图。
我们先来看一看使用示例:
参与者需要声明 try / confirm / cancel 三个类型的方法,和 TCC 的操作一一对应。在程序里,通过 @Compensable 注解标记在 try 方法上,并填写对应的 confirm / cancel 方法。
// try 方法 @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = DubboTransactionContextEditor.class) public String record(CapitalTradeOrderDto tradeOrderDto) { } // confirm方法 public void confirmRecord(CapitalTradeOrderDto tradeOrderDto) { } // cancel方法 public void cancelRecord(CapitalTradeOrderDto tradeOrderDto) { } 复制代码
很明显,肯定是有切面切入了@Compensable:注解的方法,我们先来找一下。 在源码的spring支持中有这么一段配置:
<bean id="transactionConfigurator" class="org.mengyun.tcctransaction.spring.support.:" init-method="init"/> <bean id="compensableTransactionAspect" class="org.mengyun.tcctransaction.spring.ConfigurableTransactionAspect" init-method="init"> <property name="transactionConfigurator" ref="transactionConfigurator"/> </bean> <bean id="resourceCoordinatorAspect" class="org.mengyun.tcctransaction.spring.ConfigurableCoordinatorAspect" init-method="init"> <property name="transactionConfigurator" ref="transactionConfigurator"/> </bean> 复制代码
SpringTransactionConfigurator是一个配置相关的类,我们先不管。
先看看类图:
我们先看看ConfigurableTransactionAspect类。
@Aspect public class ConfigurableTransactionAspect extends CompensableTransactionAspect implements Ordered { private TransactionConfigurator transactionConfigurator; public void init() { // 设置事务管理器 TransactionManager transactionManager = transactionConfigurator.getTransactionManager(); // 这个是具体的拦截实现类 CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor(); compensableTransactionInterceptor.setTransactionManager(transactionManager); compensableTransactionInterceptor.setDelayCancelExceptions(transactionConfigurator.getRecoverConfig().getDelayCancelExceptions()); this.setCompensableTransactionInterceptor(compensableTransactionInterceptor); } @Override public int getOrder() { // order越小,则优先级越高。 return Ordered.HIGHEST_PRECEDENCE; } public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) { this.transactionConfigurator = transactionConfigurator; } } 复制代码
ConfigurableTransactionAspect继承了CompensableTransactionAspect同时实现了Orderd,且Order值为最小值,这就是我们要找的第一个切面。
这里子类就是给父类提供了初始化,切面具体逻辑在父类。我们看看父类的逻辑。
@Aspect public abstract class CompensableTransactionAspect { private CompensableTransactionInterceptor compensableTransactionInterceptor; public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) { this.compensableTransactionInterceptor = compensableTransactionInterceptor; } /** * 切点 */ @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void compensableService() { } /** * 切面 * @param pjp * @return * @throws Throwable */ @Around("compensableService()") public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { return compensableTransactionInterceptor.interceptCompensableMethod(pjp); } public abstract int getOrder(); } 复制代码
可以看到具体逻辑还是在compensableTransactionInterceptor中,我们继续跟。
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { // 获得 具体的方法 注解 传播行为 以及事务上下文 CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp); // 事务是否存在于当前线程中 // transactionManager为单例的,其中包含一个threadlocal , 里面是一个Deque(双端队列) // 判断该threadlocal里的Deque是否为空 boolean isTransactionActive = transactionManager.isTransactionActive(); // 判断事务上下文是否合法 // 传播级别为支持当前事务 而不支持新事务、同时当前线程中不存在事务,且事务上下文为空 // 则是非法 抛出异常 if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) { throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName()); } // 判断是根事务还是分支事务 // 如果传播级别为必须新开事务 或者 (允许新建事务且线程中不存在事务,且事务上下文为空) 则是根事务 // ---------------------------------------------------------------------------------------------------- // 如果不满足上述条件, 且(事务的传播级别为支持当前事务或者为支持当前事务,并不允许开启事务)并且当前没有活动事务,并且事务上下文不为空 // 则是分支事务 switch (compensableMethodContext.getMethodRole(isTransactionActive)) { case ROOT: return rootMethodProceed(compensableMethodContext); case PROVIDER: return providerMethodProceed(compensableMethodContext); default: // 如果事务存在于线程中 或者事务的传播行为为支持当前事务,没有事务则以非事务执行 则直接执行 return pjp.proceed(); } } 复制代码
首先获取注解对应的参数。
public CompensableMethodContext(ProceedingJoinPoint pjp) { this.pjp = pjp; // 获取到具体方法 this.method = getCompensableMethod(); // 获取到注解 this.compensable = method.getAnnotation(Compensable.class); // 注解上的传播行为 this.propagation = compensable.propagation(); // 获得事务上下文 this.transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()); } 复制代码
tcc-transaction支持四种传播行为。
/** * 支持当前事务,如果当前没有事务,就新建一个事务。 */ REQUIRED(0), /** * 支持当前事务,如果当前没有事务,就以非事务方式执行。 */ SUPPORTS(1), /** * 支持当前事务,如果当前没有事务,就抛出异常。 */ MANDATORY(2), /** *新建事务,如果当前存在事务,把当前事务挂起。 */ REQUIRES_NEW(3); 复制代码
FactoryBuilder是一个抽象的单例工厂,整句话就是从注解的transactionContextEditor.class中获得一个单例得transactionContextEditor,然后调用这个transactionContextEditor的get方法获得事务的上下文。
我们回到interceptCompensableMethod。isTransactionActive判断当前线程是否包含事务。
public boolean isTransactionActive() { // 从threadLocal里获取双向队列 Deque<Transaction> transactions = CURRENT.get(); return transactions != null && !transactions.isEmpty(); } 复制代码
就是看一下当前线程里的双向队列是否有值。
继续往下走。
// 判断事务上下文是否合法 // 传播级别为支持当前事务而且不支持新事务、同时当前线程中不存在事务,且事务上下文为空 // 则是非法 抛出异常 if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) { throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName()); } 复制代码
// 非法的必要条件 public static boolean isLegalTransactionContext(boolean isTransactionActive, CompensableMethodContext compensableMethodContext) { if (compensableMethodContext.getPropagation().equals(Propagation.MANDATORY) // 事务传播行为为支持当前事务,没有事务则抛出异常 && !isTransactionActive // 没有当前事务 && compensableMethodContext.getTransactionContext() == null //事务上下文为空 ) { return false; } return true; } 复制代码
接着走。
switch (compensableMethodContext.getMethodRole(isTransactionActive)) { // 判断是根事务还是分支事务 // 如果传播级别为必须新开事务 或者 (允许新建事务且线程中不存在事务,且事务上下文为空) 则是根事务 case ROOT: return rootMethodProceed(compensableMethodContext); // 如果不满足上述条件, 且(事务的传播级别为支持当前事务或者为支持当前事务,并不允许开启事务)并且当前没有活动事务,并且事务上下文不为空 case PROVIDER: return providerMethodProceed(compensableMethodContext); default: // 如果事务存在于线程中 或者事务的传播行为为支持当前事务,没有事务则以非事务执行 则直接执行 return pjp.proceed(); } } 复制代码
看一下根事务的处理方法。
private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Object returnValue = null; Transaction transaction = null; // 是否同步confirm boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); // 是否同步cancel boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); // 加入延迟cancel相关的异常 Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>(); allDelayCancelExceptions.addAll(this.delayCancelExceptions); allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions())); try { // 创建根事务 // 在内存中创建一个事务,声明为根事务 // 将刚刚的事务持久化到redis、zookeeper或者mysql 再或者磁盘 // 将事务注册进transactionManager中的treadlocal中的deque中 transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity()); try { // 执行 returnValue = compensableMethodContext.proceed(); } catch (Throwable tryingException) { // 异常处理 // 如果不是需要延迟的异常类型 if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) { logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException); // 立即回滚 // 获取当前线程的deque队头的事务, // 改变事务状态 并持久化改变的事务状态 // rollback所有参与者 // delete持久化的事务 transactionManager.rollback(asyncCancel); } // 抛出异常 throw tryingException; } // 提交 // 和上述回滚步骤一致 transactionManager.commit(asyncConfirm); } finally { // 将事务从当前线程事务队列移除 transactionManager.cleanAfterCompletion(transaction); } return returnValue; } 复制代码
看一下begin方法。
/** * 发起根事务 * @param uniqueIdentify * @return 事务 */ public Transaction begin(Object uniqueIdentify) { // 创建根事务 Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT); // 存储 事务 transactionRepository.create(transaction); // 注册事务 registerTransaction(transaction); return transaction; } 复制代码
private void registerTransaction(Transaction transaction) { if (CURRENT.get() == null) { CURRENT.set(new LinkedList<Transaction>()); } CURRENT.get().push(transaction); } 复制代码
分支事务的处理方法。
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); try { // 事务上下问中的状态 switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) { // 如果是try阶段 case TRYING: // 传播发起分支 // 在内存中创建一个事务,声明为分支事务,事务id为事务上下文中的事务id // 持久化事务 // 注册进deque中 transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext()); return compensableMethodContext.proceed(); case CONFIRMING: try { // 传播获取分支 // 从持久化中取出事务 // 注册进deque中 transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); transactionManager.commit(asyncConfirm); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. } break; case CANCELLING: try { // 传播获取分支 transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); transactionManager.rollback(asyncCancel); } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. } break; } } finally { transactionManager.cleanAfterCompletion(transaction); } Method method = compensableMethodContext.getMethod(); return ReflectionUtils.getNullValue(method.getReturnType()); } 复制代码
看一下propagationNewBegin方法。
/** * 传播发起分支事务 * @param transactionContext 事务上下文 * @return 分支事务 */ public Transaction propagationNewBegin(TransactionContext transactionContext) { // 创建分支事务 Transaction transaction = new Transaction(transactionContext); // 存储事务 transactionRepository.create(transaction); // 注册事务 registerTransaction(transaction); return transaction; } 复制代码
我们理一下逻辑。
理一理根事务和分支事务的不同。
两个问题:
还是先看看类图。
来看看ConfigurableCoordinatorAspect类。
@Aspect public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered { private TransactionConfigurator transactionConfigurator; public void init() { ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor(); resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager()); this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE + 1; } public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) { this.transactionConfigurator = transactionConfigurator; } } 复制代码
和上面的ConfigurableTransactionAspect大抵类似,不过优先级要低一些。也就是当ConfigurableTransactionAspect执行到proceed时,就会进入这个切面。
看看父类。
@Aspect public abstract class ResourceCoordinatorAspect { private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor; /** * 切点为打了Compensable注解的方法 */ @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void transactionContextCall() { } /** * 环绕 * @param pjp * @return * @throws Throwable */ @Around("transactionContextCall()") public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp); } public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) { this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor; } public abstract int getOrder(); } 复制代码
直接看interceptTransactionContextMethod。
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { Transaction transaction = transactionManager.getCurrentTransaction(); if (transaction != null) { switch (transaction.getStatus()) { case TRYING: enlistParticipant(pjp); break; case CONFIRMING: break; case CANCELLING: break; } } return pjp.proceed(pjp.getArgs()); } 复制代码
一起看看enlistParticipant方法干了什么。
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException { // 获得 @Compensable 注解 Method method = CompensableMethodUtils.getCompensableMethod(pjp); if (method == null) { throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName())); } Compensable compensable = method.getAnnotation(Compensable.class); // 获得 确认执行业务方法 和 取消执行业务方法 String confirmMethodName = compensable.confirmMethod(); String cancelMethodName = compensable.cancelMethod(); // 获取 当前线程事务第一个(栈顶)元素 Transaction transaction = transactionManager.getCurrentTransaction(); // 创建 事务编号 TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); // 设置事务上下文 if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) { FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs()); } // 获得类 Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes()); // 创建 确认执行方法调用上下文 和 取消执行方法调用上下文 InvocationContext confirmInvocation = new InvocationContext(targetClass, confirmMethodName, method.getParameterTypes(), pjp.getArgs()); InvocationContext cancelInvocation = new InvocationContext(targetClass, cancelMethodName, method.getParameterTypes(), pjp.getArgs()); Participant participant = new Participant( xid, confirmInvocation, cancelInvocation, compensable.transactionContextEditor()); // 添加 事务参与者 到 事务 transactionManager.enlistParticipant(participant); } 复制代码
总的来说一句话,创建设置事务上下文并生成一个参与者加入到trasaction里,修改事务存储器里的值。
ConfigurableTransactionAspect作为第一个切面,负责事务的生成、处理以及存储。 ConfigurableCoordinatorAspect作为第二个切面,负责参与者的生成和事务上下文的创建。
1、分支事务是不知道自己应该提交还是回滚的。
2、我们看一下官方的demo。
package org.mengyun.tcctransaction.sample.dubbo.capital.api; import org.mengyun.tcctransaction.api.Compensable; import org.mengyun.tcctransaction.sample.dubbo.capital.api.dto.CapitalTradeOrderDto; /** * Created by changming.xie on 4/1/16. */ public interface CapitalTradeOrderService { @Compensable String record(CapitalTradeOrderDto tradeOrderDto); } 复制代码
这是一个dubbo RPC接口,也就是说,这个接口也打上了@Compensable注解(之前一直没想明白这个问题),也就是说,在调用远程RPC服务时,调用方还会再进入一次切面(走两次切面逻辑),然后把远程参与者加入到调用方的Transaction里。也就是说,当主事务回滚或提交的时候,会调用远程参与者的接口,并告知当前事务状态,远程参与者根据不同的状态调用不同的方法来做confirm或commit。
如下文:
/** * transaction提交TCC事务 */ public void commit() { for (Participant participant : participants) { participant.commit(); } } /** * participant提交事务 */ public void commit() { terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass); } 复制代码
当远程分支收到这个RPC调用时,会去判断当前事务正在什么阶段,然后调用对应的方法。
switch (TransactionStatus.valueOf(transactionContext.getStatus())) { case TRYING: // 传播发起分支事务 transaction = transactionManager.propagationNewBegin(transactionContext); return pjp.proceed(); case CONFIRMING: try { // 传播获取分支事务 transaction = transactionManager.propagationExistBegin(transactionContext); // 提交事务 transactionManager.commit(); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. } break; case CANCELLING: try { // 传播获取分支事务 transaction = transactionManager.propagationExistBegin(transactionContext); // 回滚事务 transactionManager.rollback(); } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. } break; 复制代码
至此就走完所有的正常流程了。
todo 事务补偿
todo dubbo支持