本系列有写过在spring boot中,普通数据库事务的处理方式,主要是通过@Transactional的注解,但是却不能满足于分布式事务的需求。例如:跨多个多种数据库的一致性事务,跨系统RPC调用的事务,等等。
在分布式领域基于CAP理论以及BASE理论,有人就提出了 柔性事务 的概念。 CAP (一致性、可用性、分区容忍性)理论大家都理解很多次了,这里不再叙述。说一下 BASE 理论,它是在CAP理论的基础之上的延伸。包括 基本可用(Basically Available)、柔性状态(Soft State)、最终一致性(Eventual Consistency)。
针对柔性事务的解决方案,业界内有下面几种:
本文专门讲解 2PC两阶段提交 的这种解决方案,前面会讲解如果在spring boot中配置多数据源,后续会通过引入Atomikos来实践2PC的分布式事务。
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource ##a数据源 spring.datasource.druid.a.url= spring.datasource.druid.a.username= spring.datasource.druid.a.password= spring.datasource.druid.a.driver-class-name=oracle.jdbc.driver.OracleDriver ## b数据源 spring.datasource.druid.b.url= spring.datasource.druid.b.username= spring.datasource.druid.b.password= spring.datasource.druid.b.driver-class-name=oracle.jdbc.driver.OracleDriver
ADataSourceConfig.java
/* ** @MapperScan:A 数据源dao层路径 ** @Primary:多数据源时,表示默认数据源的配置 */ @Configuration @MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.a", sqlSessionFactoryRef = "aSqlSessionFactory") public class ADataSourceConfig { //注册数据源 @Primary @Bean(name = "aDataSource") @ConfigurationProperties(prefix = "spring.datasource.druid.a") public DataSource aDataSource() { return DruidDataSourceBuilder.create().build(); } //注册事务管理器(很重要!!!) @Bean(name = "aTransactionManager") @Primary public DataSourceTransactionManager aTransactionManager() { return new DataSourceTransactionManager(aDataSource()); } @Bean(name = "aSqlSessionFactory") @Primary public SqlSessionFactory aSqlSessionFactory(@Qualifier("aDataSource") DataSource dataSource) throws Exception { final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource); // sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/a/*.xml")); return sessionFactoryBean.getObject(); } }
BDataSourceConfig.java
/* ** B 数据源的配置,注意都没有 @Primary 了 */ @Configuration @MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.b", sqlSessionFactoryRef = "bSqlSessionFactory") public class BDataSourceConfig { @Bean(name = "bDataSource") @ConfigurationProperties(prefix = "spring.datasource.druid.b") public DataSource bDataSource() { return DruidDataSourceBuilder.create().build(); } @Bean(name = "bTransactionManager") public DataSourceTransactionManager bTransactionManager() { return new DataSourceTransactionManager(bDataSource()); } @Bean(name = "bSqlSessionFactory") public SqlSessionFactory bSqlSessionFactory(@Qualifier("bDataSource") DataSource dataSource) throws Exception { final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource); // sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/b/*.xml")); return sessionFactoryBean.getObject(); } }
多数据源配置,核心的代码只有上面这些。先是在配置文件中定义A、B两个数据源的连接信息,然后分别构建不同数据源的配置类,并且指向对应的dao层路径。由此:
Dao层数据源: pers.demo.transaction.transaction2pc.mapper.a.xxx.java ,dao层执行的方法,都是基于A数据源的; pers.demo.transaction.transaction2pc.mapper.b.xxx.java ,dao层执行的方法,都是基于B数据源的。
Service层事务:还记得 @Transactional 事务吗?Service层中如果没有指定事务管理器,默认会取值@Primary,即A数据源的事务管理器。如果想要使用B数据源的事务管理器,需要手动声明。
@Transactional(transactionManager = "bTransactionManager")
如果你勤于思考的话,这时就会有疑惑,当前的事务管理器都是基于单个数据源定义的,那么分布式事务该如何定义事务管理器呢?
大家对 XA 有印象吗?我实在是印象深刻。实习时第一天,就是通过ADF在本地电脑上运行WebLogic服务器,然后就是配置数据源。Oracle数据源的驱动有很多,就包括 oracle.jdbc.xa.client.OracleXADataSource ,我当时还是对这个XA疑惑很久。
XA协议 由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。XA协议采用两阶段提交方式来管理分布式事务。
XA规范定义了:
简单来说,基于XA协议的数据库,都可以采用两阶段提交方式来管理分布式事务。所幸常见的关系型数据库oracle、mysql、sql server都支持,但是一些不支持事务的nosql数据库是不行的。另外,jms、rocketmq等也是支持XA协议的,同样可以通过2PC来管理分布式事务。
JTA(Java Transaction Manager) : 是Java规范,是XA在Java上的实现.
JTA是如何实现多数据源的事务管理呢?
主要的原理是两阶段提交,以上面的请求业务为例,当整个业务完成了之后只是第一阶段提交,在第二阶段提交之前会检查其他所有事务是否已经提交,如果前面出现了错误或是没有提交,那么第二阶段就不会提交,而是直接rollback操作,这样所有的事务都会做Rollback操作.
JTA的有点就是能够支持多数据库事务同时事务管理,满足分布式系统中的数据的一致性.但是也有对应的弊端:
spring boot支持JTA的框架有很多,我们这次使用Atomikos。我们还是基于之前配置多数据源的代码。
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>
ADataSourceConfig.java
@Configuration @MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.a", sqlSessionFactoryRef = "aSqlSessionFactory") @ConfigurationProperties(prefix = "spring.datasource.druid.a") @Data public class ADataSourceConfig { private String url; private String username; private String password; @Primary @Bean(name = "aDataSource") public DataSource aDataSource() { Properties properties = new Properties(); properties.setProperty("URL", url); properties.setProperty("user", username); properties.setProperty("password", password); AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setXaProperties(properties); ds.setUniqueResourceName("AOracleXADataSource"); ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource"); return ds; } @Bean(name = "aTransactionManager") @Primary public DataSourceTransactionManager aTransactionManager() { return new DataSourceTransactionManager(aDataSource()); } @Bean(name = "aSqlSessionFactory") @Primary public SqlSessionFactory aSqlSessionFactory(@Qualifier("aDataSource") DataSource dataSource) throws Exception { final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource); return sessionFactoryBean.getObject(); } }
BDataSourceConfig.java
@Configuration @MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.b", sqlSessionFactoryRef = "bSqlSessionFactory") @ConfigurationProperties(prefix = "spring.datasource.druid.b") @Data public class BDataSourceConfig { private String url; private String username; private String password; @Bean(name = "bDataSource") public DataSource bDataSource() { Properties properties = new Properties(); properties.setProperty("URL", url); properties.setProperty("user", username); properties.setProperty("password", password); AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setXaProperties(properties); ds.setUniqueResourceName("BOracleXADataSource"); ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource"); return ds; } @Bean(name = "bTransactionManager") public DataSourceTransactionManager bTransactionManager() { return new DataSourceTransactionManager(bDataSource()); } @Bean(name = "bSqlSessionFactory") public SqlSessionFactory bSqlSessionFactory(@Qualifier("bDataSource") DataSource dataSource) throws Exception { final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource); return sessionFactoryBean.getObject(); } }
在配置类中注册JTA的TransactionManager。
@Bean(name = "jtaTransactionManager") @Primary public JtaTransactionManager jtaTransactionManager () { UserTransactionManager userTransactionManager = new UserTransactionManager(); UserTransaction userTransaction = new UserTransactionImp(); return new JtaTransactionManager(userTransaction, userTransactionManager); }
DemoService.java
/** * 同时往 A和B 两个数据库中insert数据 * @param jpaUserDO */ @Transactional(transactionManager = "jtaTransactionManager") public void addJTAUser(JpaUserDO jpaUserDO){ aUserMapper.addUsername(jpaUserDO.getUsername()); bUserMapper.addUsername(jpaUserDO.getUsername()); //int a=1/0; }
通过以上验证,2PC的分布式事务试验成功!