最近在重构项目中,需要兼容多数据源,故此实现下多数据源事务。
这次重构项目中,为了支持后续庞大的数据量接入,更迭了数据库,但是为了要兼容老版本,也不能直接拿掉老的数据库。所以就有了兼容多数据源的需求,尤其是要保证事务。
其实这个需求就是要实现分布式事务,但是我们的这个场景是在一个服务内,所以可以利用 AOP 来轻量的实现这个需求,若是多个服务的话,就需要实现一个管理器。
用过 spring
的都知道,我们一般都是使用 @Transactional
注解,但是这个注解在多数据源下,只能支持制定的数据源(不指定就是默认的)。
所以我们新建个自定义注解:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MultiTransactional { String[] values() default ""; } 复制代码
再定义一个切面:
/** * 多数据源事务 * * @author 7le */ @Slf4j @Aspect @Order(-7) @Component public class MultiTransactionalAspect { @Autowired private ApplicationContext applicationContext; @Around(value = "@annotation(multiTransactional)") public Object around(ProceedingJoinPoint pjp, MultiTransactional multiTransactional) throws Throwable { Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack = new Stack<>(); Stack<TransactionStatus> transactionStatusStack = new Stack<>(); try { if (!openTransaction(dataSourceTransactionManagerStack, transactionStatusStack, multiTransactional)) { return null; } Object ret = pjp.proceed(); commit(dataSourceTransactionManagerStack, transactionStatusStack); return ret; } catch (Throwable e) { rollback(dataSourceTransactionManagerStack, transactionStatusStack); log.error(String.format( "MultiTransactionalAspect catch exception class: %s method: %s detail:", pjp.getTarget().getClass().getSimpleName(), pjp.getSignature().getName()), e); throw e; } } private boolean openTransaction(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack, Stack<TransactionStatus> transactionStatusStack, MultiTransactional multiTransactional) { String[] transactionMangerNames = multiTransactional.values(); if (ArrayUtils.isEmpty(multiTransactional.values())) { return false; } for (String beanName : transactionMangerNames) { DataSourceTransactionManager dataSourceTransactionManager = (DataSourceTransactionManager) applicationContext .getBean(beanName); TransactionStatus transactionStatus = dataSourceTransactionManager .getTransaction(new DefaultTransactionDefinition()); transactionStatusStack.push(transactionStatus); dataSourceTransactionManagerStack.push(dataSourceTransactionManager); } return true; } private void commit(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack, Stack<TransactionStatus> transactionStatusStack) { while (!dataSourceTransactionManagerStack.isEmpty()) { dataSourceTransactionManagerStack.pop().commit(transactionStatusStack.pop()); } } private void rollback(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack, Stack<TransactionStatus> transactionStatusStack) { while (!dataSourceTransactionManagerStack.isEmpty()) { dataSourceTransactionManagerStack.pop().rollback(transactionStatusStack.pop()); } } } 复制代码
这样就大功告成,只需要在需要的方法上,加上 @MultiTransactional({"xxxx","xxxxx"})
实现的代码在 springcloud-gateway
在使用的时候,需要注意一些细节,要加上 @EnableTransactionManagement
。
以及 @MultiTransactional({"xxxx","xxxxx"})
是 AOP
的方式,那就意味着是动态代理的,那下面的方式就会失效:
上面也提到过这个方案比较轻量,也是针对一些对数据一致性要求不高的场景,因为会存在数据不一致的可能。
我们用伪代码来描述下,假设2个数据源
begin1 begin2 sql1 sql2 commit1 commit2 复制代码
这种方案是可以实现在sql1 sql2之间的异常回滚。如果出现commit1提交成功,commit2提交失败(或者超时)这种情况,就会造成数据不一致,虽然这种情况概率很低,但也是一个隐患。
这个实现很类似于 3PC ,都会有在一个参与者执行任务提交后,另一个参与者出现异常而导致数据不一致的问题。
其实一般情况下,系统的需求只是要达到 最终一致性 ,那就可以考虑使用 TCC ,对每个事物进行Try,如果执行没有问题,再执行Confirm,如果执行过程中出了问题,则执行操作的逆操Cancel(自动化补偿手段)。
但是 TCC 对每个事务都需要Try,再执行Confirm,略微显得臃肿,根据不同的业务场景可以有更好的方案(比如补偿模式,定期校对模式之类),具体的可以看 分布式服务化系统一致性的“最佳实干”
抽了点时间,自己实现了一个分布式事务组件,在一些对数据一致性要求高的场景可以使用。 shine-mq 相应的设计思路在 分布式事务:基于可靠消息服务