seata 官方给出了一系列 demo 样例,不过我在用的过程中发现总有这个那个的问题,所以自己维护了一份基于 dubbo 的 demo 在 github 上,适配的 seata 版本是 0.8.0。
案例的设计直接参考官方 quick start给出的案例:
整个案例分为三个服务,分别是存储服务、订单服务和账户服务,这些服务通过 dubbo 进行发布和调用,内部调用逻辑如上面图所示。
整个 demo 的工程样例如下所示:
这个案例除了在数据库需要建立业务表以外,还要额外建立一张 undo_log 表,这个表的主要作用是记录事务的前置镜像和后置镜像。
全局事务进行到提交阶段,则删除该表对应的记录,全局事务如果需要回滚,则会利用这个表里记录的镜像数据,恢复数据。
undo_log 表里的数据实际上是“朝生夕死”的,数据不需要在表里存活太久。表结构如下所示:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
每个服务都对应了一个 starter 类,这个类主要用来在 spring 环境下,将该服务启动,并通过 dubbo 发布出去,以账户服务为例:
/**
* The type Dubbo account service starter.
*/
public class DubboAccountServiceStarter {
/**
* 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform
*
* @param args the input arguments
*/
public static void main(String[] args) {
ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});
accountContext.getBean("service");
JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");
accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");
accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");
new ApplicationKeeper(accountContext).keep();
}
}
首先通过
ClassPathXmlApplicationContext
读取 dubbo-account-service.xml 这个 spring 配置文件并启动 spring 容器环境,并通过 spring 的 jdbc template 对账户表的数据进行初始化。
dubbo-account-service.xml 配置文件中进行了各类 bean 的配置,包括 dubbo 与 spring 结合时的标准配置:
<bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
<constructor-arg ref="accountDataSource" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="accountDataSourceProxy" />
</bean>
<dubbo:application name="dubbo-demo-account-service" />
<dubbo:registry address="zookeeper://localhost:2181" />
<dubbo:protocol name="dubbo" port="20881" />
<dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>
<bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl">
<property name="jdbcTemplate" ref="jdbcTemplate"/>
</bean>
<bean class="io.seata.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="dubbo-demo-account-service"/>
<constructor-arg value="my_test_tx_group"/>
</bean>
这份配置里主要有两个需要引起注意的关键点
jdbcTemplate 这个 bean 所依赖的数据源 bean,是一个类名为 io.seata.rm.datasource.DataSourceProxy 的数据源类,通过它的名字可以很明显地看出这是一个代理模式的应用,因为 seata 为完成全局事务的逻辑,需要在普通的 sql 操作前后添加一些逻辑,比如说 sql 执行前对 sql 进行语法解析,生成前置镜像,sql 执行后生成后置镜像,通过代理的方式,可以方便地对 connection,statement 等进行代理包装,在调用的时候进行拦截,加入自己的逻辑。
配置文件中还有一个 io.seata.spring.annotation.GlobalTransactionScanner 类型的 bean,这个 bean 是支撑 seata 能在 spring 环境中通过注解的方式来划定事务边界的基础。在 spring 容器启动时,会扫描
@GlobalTransactional
注解是否存在,这个注解标识了全局事务的开始和结束,也就是我们常说的“事务的边界”
业务逻辑的具体详情在
BusinessServiceImpl
类中可以看到:
@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
// throw new RuntimeException("xxx");
}
先调用存储服务,减少库存,然后调用订单服务,新建订单。这两个动作属于一个整体的事务,任何一个动作失败,都需要撤销所有的操作。
这个方法也有两个需要注意的点:
该方法上声明了 @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") 这样的注解,用于让上文提到的 GlobalTransactionScanner 扫描的时候发现这是一个全局事务。
方法的最后有一行代码抛出了 RuntimeException,这主要是为了模仿全局事务的失败,并让 seata 走全局事务回滚逻辑。
上文提到的 GlobalTransactionScanner 类,会在 spring 容器启动的时候,也被初始化。
在它的 afterPropertiesSet 方法被调用时,会触发 seata client 的初始化
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
initClient();
}
关于 seata client 的初始化的细节,可以看看我写的另外一篇文章 《阿里开源分布式事务组件 seata : seata client 通信层解析》 。
初始化客户端做的事情主要是建立与 seata server 的连接,并注册 TM 和 RM。接下来,在 wrapIfNecessary 方法里,实现对注解的扫描,并对添加了注解的方法添加 interceptor。
这篇文章里我们暂时不讨论 TCC 模式,只讨论 AT 模式,也暂不讨论全局事务锁 GlobalLock 的实现,先忽略这些有关的逻辑,只关注事务处理逻辑。
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[] {serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
在这里,interceptor 的实现是 GlobalTransactionalInterceptor,也就是说,以上文的案例为例子,当 BusinessServiceImpl 的 purchase 方法被调用的时候,实际上这个方法会被拦截器拦截,执行拦截器里的逻辑:
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
}
}
}
在执行 handleGlobalTransaction 方法时,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程。如下所示:
/**
* Execute object.
*
* @param business the business
* @return the object
* @throws TransactionalExecutor.ExecutionException the execution exception
*/
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
事务处理逻辑实际上是一种模板,将事务相关的处理逻辑放在 try 块里,发现异常后执行回滚,正常执行则执行提交。
在这里有个需要注意的地方是,seata 不把提交这个动作放在 try 块里,因为在 seata 里,全局事务的提交实际上是可以异步执行的。
因为全局事务如果进行到提交这一阶段,那么意味着各个分支事务已经执行过本地提交,全局事务的提交阶段仅仅是删除 undo_log 里的记录,这个记录删除或者不删除,实际上不会改变全局事务已经正常完成的事实。所以它可以用程序异步去做,或者以人工介入的方式去做,所以 seata 认为,全局事务提交失败,不需要执行回滚流程。