上文中,我们对TCC-Transaction的事务提交阶段主流程进行了详细的解析。上文遗留了一个问题:
如果在事务执行过程中出现了down机、停电、网络异常等情况,事务一致性就无法得到保证,此时应该怎么做?
这个问题在TCC-Transaction框架中是通过定时任务+状态机方式实现的,这种方式也是我们日常开发中经常使用的一种策略。本文,我们对事务恢复主逻辑进行分析,使TCC-Transaction源码解析形成一个闭环。
事务补偿定时任务的核心逻辑由tcc-transaction-spring模块下的RecoverScheduledJob.java完成,对失败的confirm、cancel操作进行失败补偿操作。代码逻辑如下:
public class RecoverScheduledJob { private TransactionRecovery transactionRecovery; private TransactionConfigurator transactionConfigurator; private Scheduler scheduler; // 通过init方法启动quartz定时任务 public void init() { try { MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean(); jobDetail.setTargetObject(transactionRecovery); jobDetail.setTargetMethod("startRecover"); jobDetail.setName("transactionRecoveryJob"); jobDetail.setConcurrent(false); jobDetail.afterPropertiesSet(); CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean(); cronTrigger.setBeanName("transactionRecoveryCronTrigger"); cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression()); cronTrigger.setJobDetail(jobDetail.getObject()); cronTrigger.afterPropertiesSet(); scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject()); scheduler.start(); } catch (Exception e) { throw new SystemException(e); } } ...省略getter setter... }
通过quartz进行任务调度,通过RecoverConfig中的配置初始化定时任务,通过MethodInvokingJobDetailFactoryBean的targetObject与targetMethod指定了定时任务具体执行类及具体方法。
我们注意以下代码,它将任务的核心逻辑设置到jobDetail中
jobDetail.setTargetObject(transactionRecovery); jobDetail.setTargetMethod("startRecover");
最终通过quartz的Scheduler对任务发起调度,这里通过cron表达式触发器进行调度。
TransactionRecovery是TCC-Transaction框架中事务补偿的核心实现
public class TransactionRecovery { ...... private TransactionConfigurator transactionConfigurator;
通过startRecover开启事务补偿重试任务。
public void startRecover() { // 获取待补偿的任务列表 List<Transaction> transactions = loadErrorTransactions(); // 对待补偿的任务列表执行补偿操作 recoverErrorTransactions(transactions); }
首先通过loadErrorTransactions()获取待补偿的任务列表:
private List<Transaction> loadErrorTransactions() { long currentTimeInMillis = Calendar.getInstance().getTimeInMillis(); // 获取配置的具体事务持久化策略,如:基于数据库、zk、redis等 TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository(); // 获取重试策略,包含:cron表达式,最大重试次数等 RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig(); // 获取在RecoverDuration间隔之前未完成的transaction列表,查询方式依具体的持久化策略而定 return transactionRepository .findAllUnmodifiedSince( new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000)); }
接着看一下recoverErrorTransactions方法逻辑,对待补偿的任务列表进行补偿操作。
private void recoverErrorTransactions(List<Transaction> transactions) { // 对需要进行重试的事务列表进行迭代 for (Transaction transaction : transactions) { // 如果重试次数超过配置的最大重试次数,则打印异常日志;跳过不再重试 if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) { ...省略异常日志... continue; } // 如果是分支事务,并且超过最长超时时间则忽略不再重试 if (transaction.getTransactionType().equals(TransactionType.BRANCH) && (transaction.getCreateTime().getTime() transactionConfigurator.getRecoverConfig().getMaxRetryCount() *transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000 > System.currentTimeMillis())) { continue; } try { // 增加重试次数 transaction.addRetriedCount(); // 如果当前事务状态为CONFIRMING if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) { // 设置事务状态为CONFIRMING transaction.changeStatus(TransactionStatus.CONFIRMING); // 修改当前事务状态 transactionConfigurator.getTransactionRepository().update(transaction); // 提交事务 transaction.commit(); // 删除事务记录(删除失败则不作处理) transactionConfigurator.getTransactionRepository().delete(transaction); // 如果事务状态为CANCELLING或者事务为根事务(根事务没提交) } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING) || transaction.getTransactionType().equals(TransactionType.ROOT)) { // 设置事务状态为CANCELLING transaction.changeStatus(TransactionStatus.CANCELLING); // 修改当前事务状态 transactionConfigurator.getTransactionRepository().update(transaction); // 回滚事务 transaction.rollback(); // 删除事务记录(删除失败则不作处理) transactionConfigurator.getTransactionRepository().delete(transaction); } } catch (Throwable throwable) { ...省略异常日志... } } } public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) { this.transactionConfigurator = transactionConfigurator; } }
上述注释已经很明确的指出了事务补偿job的核心逻辑,就不再赘述。我们总结一下:
对于trying阶段的异常事务,不会进行重试;而是会触发canceling操作;
对于confirming、canceling阶段的异常事务,定时进行重试补偿,尽最大努力去尝试提交事务,如果达到了最大重试次数还是处理失败则不再处理。这种极端的情况需要人工进行介入。
TCC-Transaction提供的后台server模块允许我们对事务进行手工补偿操作,这极大的提高了框架的可靠性以及易用性。
本文我们主要对TCC-Transaction的事务补偿阶段逻辑进行了分析。TCC-Transaction框架通过定时补偿,对异常事务进行处理,保证了分布式事务的最终一致性。
通过这一过程,我们能够看出,TCC-Transaction框架本质上也是柔性事务的一种解决方案,如果仔细分析,它也是满足BASE原则的。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。