TCC分布式事务解决方案在开源界的主要实现为Byte-TCC、TCC-Transaction等。其中笔者了解较多并且业界使用率较高的为TCC-Transaction这一实现。
本文,我将带领读者对TCC-Transaction这一分布式事务框架进行一次源码解析,提高自己的阅读源码的能力,也希望能够对读者深入了解TCC-Transaction有所帮助。
源码地址为 https://github.com/changmingxie/tcc-transaction ,我们关注最新版本1.2.x。
源码下载后导入IDEA中,项目目录结构如下图:
模块及其对应职责说明如下:
tcc-transaction |-transaction-tcc-api 框架API定义,公共类/核心实体定义/枚举/工具类等 |-transaction-tcc-core 框架核心逻辑 |-transaction-tcc-dubbo 框架整合Dubbo实现 |-transaction-tcc-spring 框架Spring整合,包含获取数据库连接/切面获取等 |-transaction-tcc-server 后台管理页面,对事务进行手工重试等 |-transaction-tcc-unit-test 单元测试 |-transaction-tcc-tutorial-sample 样例工程 |-tcc-transaction-dubbo-sample |-tcc-transaction-http-sample |-tcc-transaction-sample-domain |-tcc-transaction-server-sample
项目核心模块为 tcc-transaction-core ,它实现了TCC核心业务逻辑,也是本次源码解析的重点对象。
我们从Dubbo使用样例入手进行分析,关于如何使用TCC-Transaction的更多说明,请参照官方文档: 使用指南1.2.x
我们从一个调用案例入手开始进行分析,样例路径为org.mengyun.tcctransaction.sample.dubbo.order.service.PaymentServiceImpl。
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false, delayCancelExceptions = {SocketTimeoutException.class, com.alibaba.dubbo.remoting.TimeoutException.class}) public void makePayment(@UniqueIdentity String orderNo, Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) { System.out.println("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss")); //check if the order status is DRAFT, if no, means that another call makePayment for the same order happened, ignore this call makePayment. if (order.getStatus().equals("DRAFT")) { order.pay(redPacketPayAmount, capitalPayAmount); try { orderRepository.updateOrder(order); } catch (OptimisticLockingFailureException e) { //ignore the concurrently update order exception, ensure idempotency. } } String result = capitalTradeOrderService.record(buildCapitalTradeOrderDto(order)); String result2 = redPacketTradeOrderService.record(buildRedPacketTradeOrderDto(order)); } ...省略confirmMakePayment实现... ...省略cancelMakePayment实现...
这段代码为模拟支付扣款操作,可以看到在方法上添加了@Compensable注解,它是TCC-Transaction框架的核心注解,作用为: 开启tcc事务支持 ,注解可以设置一下参数
参数名 | 描述 |
---|---|
propagation | 事务传播属性,REQUIRED(必须存在事务,不存在则进行创建),SUPPORTS(如果有事务则在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在是否都创建新的事务) |
confirmMethod | confirm阶段方法实现 |
cancelMethod | cancel阶段方法实现 |
transactionContextEditor | 设置transactionContextEditor |
asyncConfirm | 是否使用异步confirm |
asyncCancel | 是否使用异步cancel |
看到了@Compensable注解以及对应的confirm、cancle方法,处于技术敏感,我们可以猜测在框架中一定存在切面逻辑对@Compensable进行拦截并处理;在切面逻辑中一定有对confirm、cancel方法的调用。从这个猜想出发,我们通过阅读相关代码去验证自己的猜想。
我们进入tcc-transaction-core模块的代码目录,目录结构如下:
org.mengyun.tcctransaction |-common |-context |-interceptor TCC事务拦截器 |-recover TCC事务补偿 |-repository 事务存储 |-serializer |-support |-utils
我们主要关注interceptor目录,该目录下的interceptor实现了对注解@Compensable的解析以及对事务的代理逻辑。
CompensableTransactionAspect切面主要实现了对@Compensable的解析以及对事务的代理。
@Aspect public abstract class CompensableTransactionAspect { private CompensableTransactionInterceptor compensableTransactionInterceptor; public void setCompensableTransactionInterceptor( CompensableTransactionInterceptor compensableTransactionInterceptor) { this.compensableTransactionInterceptor = compensableTransactionInterceptor; } @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void compensableService() { } @Around("compensableService()") public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { return compensableTransactionInterceptor.interceptCompensableMethod(pjp); } public abstract int getOrder(); }
CompensableTransactionAspect的实现类为ConfigurableTransactionAspect.java, 加载顺序order= Ordered.HIGHEST_PRECEDENCE(-2147483648)。
该切面对标注了@Compensable的方法进行拦截,通过@Around为业务方法添加环绕增强。可以看到具体的增强方法实现为CompensableTransactionInterceptor.interceptCompensableMethod(pjp);
接着上述的分析,我们看一下CompensableTransactionInterceptor.interceptCompensableMethod(pjp)的逻辑。
[CompensableTransactionInterceptor.java] public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { // 初始化一个TCC方法执行上下文 CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp); // 校验事务支持是否开启 boolean isTransactionActive = transactionManager.isTransactionActive(); // 校验事务隔离级别 if (!TransactionUtils.isLegalTransactionContext( isTransactionActive, compensableMethodContext)) { throw new SystemException ("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName()); } // 根据事务方法类型判断执行哪个逻辑 switch (compensableMethodContext.getMethodRole(isTransactionActive)) { case ROOT: return rootMethodProceed(compensableMethodContext); case PROVIDER: return providerMethodProceed(compensableMethodContext); default: return pjp.proceed(); } }
我们主要关注switch代码段
switch (compensableMethodContext.getMethodRole(isTransactionActive)) { case ROOT: //处理主事务切面,即:本次事务的入口方法 return rootMethodProceed(compensableMethodContext); case PROVIDER: //处理提供者事务切面 return providerMethodProceed(compensableMethodContext); default: //消费者事务直接执行,会对应执行远端提供者事务切面 return pjp.proceed(); }
当事务方法为ROOT方法(即分布式事务的主方法)时,执行rootMethodProceed(compensableMethodContext);方法为PROVIDER(提供者)方法时,执行providerMethodProceed(compensableMethodContext)。默认为消费者事务,则直接执行。
我们以此看一下这几种事务切面的执行逻辑。
对于事务的Root方法,执行rootMethodProceed逻辑,代码逻辑:
private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Object returnValue = null; Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>(); allDelayCancelExceptions.addAll(this.delayCancelExceptions); allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions())); try { // 创建事务, 将主事务的信息写入db或者zk或者redis中去,事务信息写入具体方式可配置 transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity()); try { // 执行完成之后会马上进到另外一个切面中去 returnValue = compensableMethodContext.proceed(); } catch (Throwable tryingException) { // 如果try失败,则进行回滚 if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) { logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException); // 回滚事务 transactionManager.rollback(asyncCancel); } throw tryingException; } // 提交事务 transactionManager.commit(asyncConfirm); } finally { // 最终如果执行成功,则删除之前的事务记录;如果执行失败则不作任何处理,等待job进行补偿操作 transactionManager.cleanAfterCompletion(transaction); } return returnValue; }
注意关注这段代码
// 执行完成之后会马上进到另外一个切面中去 returnValue = compensableMethodContext.proceed();
当所有的切面都执行完成之后才会执行后续的逻辑,也就是真正执行业务方法。
该方法为一个典型的模板方法,对事务通过begin、commit、rollback进行了抽象。
我们进入三个方法详细的分析。
首先进入begin方法
[TransactionManager.java] public Transaction begin(Object uniqueIdentify) { // 0 Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT); // 1 transactionRepository.create(transaction); // 2 registerTransaction(transaction); return transaction; }
0.首先声明并初始化一个分布式事务对象Transaction,标记为ROOT事务,事务初始状态为TRYING。这里采用了经典的状态机策略
public Transaction(TransactionType transactionType) { this.xid = new TransactionXid(); // 事务初始状态设置成TRYING this.status = TransactionStatus.TRYING; this.transactionType = transactionType; }
1.将事务信息存储到数据源中,数据源可以是数据库、redis、zk等,可配置;TransactionRepository是具体的持久化策略的抽象
2.注册事务,在TransactionManager中,通过双向队列(Deque
private static final ThreadLocal
接着看一下commit()方法
[TransactionManager.java] public void commit(boolean asyncCommit) { // 从ThreadLocal中获取当前事务 final Transaction transaction = getCurrentTransaction(); // 设置事务状态为CONFIRMING transaction.changeStatus(TransactionStatus.CONFIRMING); // 更新存储中的事务信息 transactionRepository.update(transaction); // 如果异步commit属性为true if (asyncCommit) { try { Long statTime = System.currentTimeMillis(); // 通过本地线程池异步进行事务提交 executorService.submit(new Runnable() { @Override public void run() { commitTransaction(transaction); } }); logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime)); } catch (Throwable commitException) { logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException); throw new ConfirmingException(commitException); } } else { // 否则同步进行事务提交 commitTransaction(transaction); } }
commit(boolean asyncCommit)方法执行事务的提交过程,具体提交逻辑在commitTransaction(transaction)中完成。
[TransactionManager.java] private void commitTransaction(Transaction transaction) { try { // 提交事务 transaction.commit(); // 删除本次提交的本地事务记录,如果commit异常,不会把数据库内事务记录删除, // 通过job重试进行补偿 transactionRepository.delete(transaction); } catch (Throwable commitException) { logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException); throw new ConfirmingException(commitException); } } [Transaction.java] public void commit() { // 对每一个分支执行提交操作 for (Participant participant : participants) { participant.commit(); } }
可以看到,在事务提交完成之后,对本地持久化的事务记录进行了物理删除,具体删除方式取决于持久化机制。感兴趣的同学可以自行查看 org.mengyun.tcctransaction.repository 目录下的实现。
我们看一下方法rollback()是如何实现事务回滚逻辑的
[TransactionManager.java] public void rollback(boolean asyncRollback) { // 从ThreadLocal中获取当前事务 final Transaction transaction = getCurrentTransaction(); transaction.changeStatus(TransactionStatus.CANCELLING); // 更新事务状态为CANCELLING transactionRepository.update(transaction); // 如果异步rollback属性为true if (asyncRollback) { try { executorService.submit(new Runnable() { @Override public void run() { // 通过线程池执行回滚逻辑 rollbackTransaction(transaction); } }); } catch (Throwable rollbackException) { logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException); throw new CancellingException(rollbackException); } } else { // 异步rollback设置为false,同步执行回滚 rollbackTransaction(transaction); } }
和commit方法类似,在rollback(boolean asyncRollback)执行事务的回滚操作,具体的操作在rollbackTransaction(transaction)中执行:
private void rollbackTransaction(Transaction transaction) { try { // 事务回滚 transaction.rollback(); // 删除本次回滚的本地事务记录,如果rollback异常,不会把数据库内事务记录删除, // 通过job重试进行补偿 transactionRepository.delete(transaction); } catch (Throwable rollbackException) { logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException); throw new CancellingException(rollbackException); } }
无论是否提交/回滚,最终都会执行cleanAfterCompletion(transaction)方法进行现场清理操作。
public void cleanAfterCompletion(Transaction transaction) { if (isTransactionActive() && transaction != null) { // 从ThreadLocal中获取当前事务 Transaction currentTransaction = getCurrentTransaction();‘ // 弹出当前事务 if (currentTransaction == transaction) { CURRENT.get().pop(); if (CURRENT.get().size() == 0) { CURRENT.remove(); } } else { throw new SystemException("Illegal transaction when clean after completion"); } } }
事务执行结束,从栈中弹出当前结束的事务。
看完rootMethodProceed根事务切面逻辑,再来看提供者切面事务逻辑就好理解多了,方法逻辑如下:
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { // 获取异步回滚、异步提交标识 Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); try { // 判断当前事务状态 switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) { // 如果事务状态为TRYING case TRYING: // 通过使用transactionContext创建分支事务 transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext()); // 执行被切方法逻辑 return compensableMethodContext.proceed(); // 如果事务状态为CONFIRMING case CONFIRMING: try { // 对事务状态进行更新 transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); // 提交事务,不执行切面方法 transactionManager.commit(asyncConfirm); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. } break; // 如果事务状态为CANCELLING case CANCELLING: try { // 更新事务状态 transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); // 执行事务回滚,不执行切面方法 transactionManager.rollback(asyncCancel); } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. } break; } } finally { // 对现场进行清理 transactionManager.cleanAfterCompletion(transaction); } Method method = compensableMethodContext.getMethod(); // 处理原始类型返回值,返回原始类型的默认值,因为不能返回null return ReflectionUtils.getNullValue(method.getReturnType()); } public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException { // 根据事务id从事务持久化组件中查询到本事务 Transaction transaction = transactionRepository.findByXid(transactionContext.getXid()); // 不为空 if (transaction != null) { // 对事务状态进行更新,根据传参不同,执行TRYING->CONFIRMING或者TRYING->CANCELING等操作 transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus())); // 对事务栈进行操作,执行嵌套事务入栈 registerTransaction(transaction); return transaction; } else { throw new NoExistedTransactionException(); } }
这里进行小结,可以看到在provider类型的方法切面,对于远程的Participant,如果transaction的status为trying,则通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑;
如果是status为confirming或canceling,则会调用对应的confirm或cancel配置的方法,跳过被切方法
对于普通类型方法直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地proxy方法,所以直接执行即可
ResourceCoordinatorAspect切面主要是为了执行资源协调,它的实现为ConfigurableCoordinatorAspect
[ResourceCoordinatorAspect.java] @Aspect public abstract class ResourceCoordinatorAspect { private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor; @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void transactionContextCall() { } @Around("transactionContextCall()") public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp); } public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) { this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor; } public abstract int getOrder(); } [ConfigurableCoordinatorAspect.java] @Aspect public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered { private TransactionConfigurator transactionConfigurator; public void init() { ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor(); resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager()); this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE + 1; } public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) { this.transactionConfigurator = transactionConfigurator; } }
ConfigurableCoordinatorAspect的职责为设置事务的参与者;在一个事务内,每个被@Compensable注解的方法都是事务参与者。
可以看到该切面的优先级为 Ordered.HIGHEST_PRECEDENCE + 1,order的数值大于CompensableTransactionAspect。由于 @Order中的值越小,优先级越高 ,因此切面ResourceCoordinatorAspect的优先级小于CompensableTransactionAspect。
从代码可以看出,设置事务参与者逻辑是通过ResourceCoordinatorInterceptor.interceptTransactionContextMethod方法执行的。
[ResourceCoordinatorInterceptor.java] public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { // 从当前ThreadLocal中获取事务 Transaction transaction = transactionManager.getCurrentTransaction(); if (transaction != null) { switch (transaction.getStatus()) { case TRYING: // 只需要在TRYING阶段将参与者的信息提取出来设置到transaction中 enlistParticipant(pjp); break; case CONFIRMING: break; case CANCELLING: break; } } // 执行目标方法 return pjp.proceed(pjp.getArgs()); }
我们可以得知,在trying阶段,框架会把所有事务参与者加入到当前事务中去。
对于Root方法,先创建主事务,事务参与者包括Root方法对应的本地参与者及Normal方法对应的远程参与者;
对于Provider方法,首先通过主事务上下文创建分支事务,事务参与者包括Provider方法对应的本地参与者以及它所包含的Normal方法对应的远程参与者。而远程参与者又可以开启新的分支事务。
我们可以合理的猜想,如果事务嵌套的层级很多,一定会存在性能问题。
我们详细看一下enlistParticipant(pjp)是如何生成的事务参与者对象。
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException { // 首先获取@Compensable信息 Method method = CompensableMethodUtils.getCompensableMethod(pjp); if (method == null) { // @Compensable标注的方法为空则抛出异常 throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName())); } Compensable compensable = method.getAnnotation(Compensable.class); // 回去confirm和cancle方法名 String confirmMethodName = compensable.confirmMethod(); String cancelMethodName = compensable.cancelMethod(); // 获取当前事务以及全局事务id Transaction transaction = transactionManager.getCurrentTransaction(); TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); // 设置事务上下文到Editor中 // Editor用来统一提取事务上下文,如果是dubbo则对应设置dubbo的rpc上下文 // 此处的上下文设置之后就会调用try逻辑 if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) { FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs()); } // 通过目标类名,方法名,参数类型获取目标类 Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes()); // confirm逻辑调用上下文 InvocationContext confirmInvocation = new InvocationContext(targetClass, confirmMethodName, method.getParameterTypes(), pjp.getArgs()); //cancel逻辑调用上下文 InvocationContext cancelInvocation = new InvocationContext(targetClass, cancelMethodName, method.getParameterTypes(), pjp.getArgs()); // 此处较为关键,confirm和cancle具有相同地位,都被抽象成InvocationContext Participant participant = new Participant( xid, confirmInvocation, cancelInvocation, compensable.transactionContextEditor()); // 将participant设置到transaction中,并同步到持久化存储中 transactionManager.enlistParticipant(participant); } [TransactionManager.java] public void enlistParticipant(Participant participant) { Transaction transaction = this.getCurrentTransaction(); transaction.enlistParticipant(participant); transactionRepository.update(transaction); } [Transaction.java] public void enlistParticipant(Participant participant) { participants.add(participant); }
从上述的代码逻辑中,我们可以得到结论,CompensableTransactionAspect开启事务,ResourceCoordinatorAspect对注解@Compensable进行解析,将confirm与cancel的具体逻辑设置到事务管理器中。
当上述两个切面都执行完成之后,开始执行try中的方法。如果try成功则执行commit否则执行rollback。
每个分支事务最终被封装到Transaction的participants中,每个分布式事务都有一个自己的 ThreadLocal
我们再次回顾commit的逻辑,查看Transaction.commit()方法
[Transaction.java] public void commit() { // 对每一个分支执行提交操作 for (Participant participant : participants) { participant.commit(); } }
participant就是切面ResourceCoordinatorAspect 添加的。我们再看一下participant.commit()的逻辑:
[Transaction.java] public void commit() { terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass); }
可以看到最终事务提交是通过invoke反射实现的,我们进入invoke逻辑
public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) { // 如果事务执行上下文方法名不为空 if (StringUtils.isNotEmpty(invocationContext.getMethodName())) { try { Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance(); Method method = null; method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes()); // 实例化原事务执行者的代理对象 FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs()); // 反射执行 return method.invoke(target, invocationContext.getArgs()); } catch (Exception e) { throw new SystemException(e); } } return null; }
最终通过method.invoke(target, invocationContext.getArgs())方法完成了真实的事务提交操作。
到此我们对TCC-TRANSACTION的事务提交主流程进行了完整的分析。
通过分析我们可以知道TCC-TRANSACTION的核心逻辑是通过两个切面CompensableTransactionAspect、ResourceCoordinatorAspect 实现的。通过对事务进行包装与代理,实现了类二阶段的分布式事务解决方案。
实际上,TCC-TRANSACTION还有一个重要的补偿逻辑我们还没有分析,它是基于定时调度实现的。
限于本文的篇幅,就不再继续展开。我将单独用一篇文章来对TCC-TRANSACTION的补偿过程进行分析,我们下文再会。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。