1.什么是atomikos
Atomikos是一个轻量级的分布式事务管理器,实现了Java Transaction API (JTA)规范,可以很方便的和Spring Boot集成,支持微服务场景下跨节点的全局事务。Atomikos公司官方网址为:https://www.atomikos.com/。其旗下最著名的产品就是事务管理器。产品分两个版本:
- TransactionEssentials:开源的免费产品
- ExtremeTransactions:上商业版,需要收费。
2.环境搭建
第一个mysql数据库
docker run --name docker-mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3333:3306 -d mysql
第二个mysql数据库
docker run --name docker-mysql-2 -e MYSQL_ROOT_PASSWORD=123456 -p 3334:3306 -d mysql
初始化数据
create database demo;
create table user_info
(
user_id varchar(64) not null primary key,
username varchar(100) null ,
age int(3) null ,
gender tinyint(1) null ,
remark varchar(255) null ,
create_time datetime null ,
create_id varchar(64) null ,
update_time datetime null ,
update_id varchar(64) null ,
enabled tinyint(1) default 1 null
);
说明
msyql账号root
mysql密码123456
3.项目代码
实验目的:实现2个mysql数据的分布式事务管理,要么全部成功,只要有一个失败就会滚。
pom..xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>atomikos</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.0</version><!-- 1.3.0以上的版本没有@MapperScan以及@Select注解 -->
</dependency>
<!-- automatic+jta的分布式事务管理 -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-jta-atomikos -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<!--boot 2.1默认 mysql8的版本; boot 2.0默认mysql5版本-->
<version>8.0.13</version>
<!--<version>5.1.46</version>-->
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>1.18.2</version>
</dependency>
</dependencies>
</project>
mapper
创建2个mapper连接不同的数据库
package com.et.atomikos.mapper1;
import org.apache.catalina.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
public interface UserInfoMapper1 {
// query
@Select("SELECT * FROM user_info WHERE username = #{username}")
User findByName(@Param("username") String username);
// add
@Insert("INSERT INTO user_info(user_id,username, age) VALUES(#{userId},#{username}, #{age})")
int insert(@Param("userId") String userId,@Param("username") String username, @Param("age") Integer age);
}
package com.et.atomikos.mapper2;
import org.apache.catalina.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
public interface UserInfoMapper2 {
// query
@Select("SELECT * FROM user_info WHERE username = #{username}")
User findByName(@Param("username") String username);
// add
@Insert("INSERT INTO user_info(user_id,username, age) VALUES(#{userId},#{username}, #{age})")
int insert(@Param("userId") String userId,@Param("username") String username, @Param("age") Integer age);
}
service
创建2个service,分别用不同mapper
package com.et.atomikos.mapper1;
import com.et.atomikos.mapper1.UserInfoMapper1;
import com.et.atomikos.mapper2.UserInfoMapper2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class ManyService1 {
@Autowired
private UserInfoMapper1 userInfoMapper1;
@Autowired
private UserInfoMapper2 userInfoMapper2;
@Transactional
public int insert(String userId,String username, Integer age) {
int insert = userInfoMapper1.insert(userId,username, age);
int i = 1 / age;// if age is zero ,then a error will be happened.
return insert;
}
@Transactional
public int insertDb1AndDb2(String userId,String username, Integer age) {
int insert = userInfoMapper1.insert(userId,username, age);
int insert2 = userInfoMapper2.insert(userId,username, age);
int i = 1 / age;// if age is zero ,then a error will be happened.
return insert + insert2;
}
}
package com.et.atomikos.mapper2;
import com.et.atomikos.mapper2.UserInfoMapper2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class ManyService2 {
@Autowired
private UserInfoMapper2 userInfoMapper2;
@Transactional
public int insert(String userId,String username, Integer age) {
int i = userInfoMapper2.insert(userId,username, age);
System.out.println("userInfoMapper2.insert end :" + null);
int a = 1 / 0;//touch a error
return i;
}
}
config
初始化数据源1和数据源2
package com.et.atomikos.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.datasource.test1")
public class DBConfig1 {
// @Value("${mysql.datasource.test1.jdbcurl}")
//@Value("${jdbcurl}")
private String jdbcurl;
//private String url;
private String username;
private String password;
private int minPoolSize;
private int maxPoolSize;
private int maxLifetime;
private int borrowConnectionTimeout;
private int loginTimeout;
private int maintenanceInterval;
private int maxIdleTime;
private String testQuery;
}
package com.et.atomikos.config;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.sql.SQLException;
/**
* @author liuhaihua
* @version 1.0
* @ClassName MyBatisConfig1
* @Description todo
* @date 2024年04月18日 13:37
*/
@Configuration
@MapperScan(basePackages = "com.et.atomikos.mapper1", sqlSessionTemplateRef = "test1SqlSessionTemplate")
public class MyBatisConfig1 {
@Bean(name = "test1DataSource") //test1DataSource
public DataSource testDataSource(DBConfig1 testConfig) throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
//mysqlXaDataSource.setUrl(testConfig.getUrl());
mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(testConfig.getPassword());
mysqlXaDataSource.setUser(testConfig.getUsername());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
// 将本地事务注册到创 Atomikos全局事务
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("test1DataSource");
xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
xaDataSource.setTestQuery(testConfig.getTestQuery());
return xaDataSource;
}
@Bean(name = "test1SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1DataSource") DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
return bean.getObject();
}
@Bean(name = "test1SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(
@Qualifier("test1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
数据源2也是类似配置
controller
package com.et.atomikos.controller;
import com.et.atomikos.mapper1.ManyService1;
import com.et.atomikos.mapper2.ManyService2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class HelloWorldController {
@Autowired
private ManyService1 manyService1;
@Resource
private ManyService2 manyService2;
//http://localhost:8088/datasource1?userId=9&username=datasource1&age=2
@RequestMapping(value = "datasource1")
public int datasource1(String userId,String username, Integer age) {
return manyService1.insert(userId,username, age);
}
//http://localhost:8088/datasource2?userId=9&username=datasource2&age=2
@RequestMapping(value = "datasource2")
public int datasource2(String userId,String username, Integer age) {
return manyService2.insert(userId,username, age);
}
//http://localhost:8088/insertDb1AndDb2?userId=1&username=tom5&age=2
//http://localhost:8088/insertDb1AndDb2?userId=2&username=tom5&age=0 //touch a error
@RequestMapping(value = "insertDb1AndDb2")
public int insertDb1AndDb2(String userId,String username, Integer age) {
return manyService1.insertDb1AndDb2(userId,username, age);
}
}
application.yaml
server:
port: 8088
spring:
application:
name: manyDatasource
datasource:
# spring.datasource.test1
# druid:
test1:
jdbcurl: jdbc:mysql://localhost:3333/demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
initial-size: 1
min-idle: 1
max-active: 20
test-on-borrow: true
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
minPoolSize: 3
maxPoolSize: 25
maxLifetime: 20000
borrowConnectionTimeout: 30
loginTimeout: 30
maintenanceInterval: 60
maxIdleTime: 60
test2:
jdbcurl: jdbc:mysql://localhost:3334/demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
minPoolSize: 3
maxPoolSize: 25
maxLifetime: 20000
borrowConnectionTimeout: 30
loginTimeout: 30
maintenanceInterval: 60
maxIdleTime: 60
mybatis:
mapper-locations: classpath:mapper/*.xml
spring.resources.static-locations: classpath:static/,file:static/
logging:
level:
czs: debug
org.springframework: WARN
org.spring.springboot.dao: debug
4.测试
启动Spring Boot 应用
插入第一个数据测试
http://localhost:8088/datasource1?userId=9&username=datasource1&age=2
插入第二个数据库
http://localhost:8088/datasource2?userId=9&username=datasource2&age=2
同时插入2个数据库
http://localhost:8088/insertDb1AndDb2?userId=1&username=tom5&age=2
异常回滚测试
http://localhost:8088/insertDb1AndDb2?userId=2&username=tom5&age=0 //touch a error
5.参考