本文目录:
后端工具和环境
在开发我的开源项目 prex 时,加入工作流,解决工作流用户与当前系统用户同步问题时,涉及到远程调用操作两个数据库所产生的事务问题,比如系统用户在增加用户同步工作流用户时,系统用户添加成功,工作流用户没有添加成功,则造成数据不一致问题,本地事务无法回滚,那么则使用分布式事务解决方案。
开源项目: gitee.com/kaiyuantuan…
指一次大的操作由不同的小操作组成的,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。从本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
通俗一点说就是单体应用被拆分成微服务应用,原来的一个模块被拆分成三个独立的应用,分别使用独立的数据源,业务操作需要调用三个服务来完成。
分布式事务作为微服务应用中的大难题,在现有的解决方案中,个人认为 Seata
是目前最轻量的解决方案
Seata
是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。 Seata
将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
两阶段提交协议的演变:
以一个示例来说明:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
以一个示例来说明整个 AT 分支的工作过程。
业务表:product
Field | Type | Key |
---|---|---|
id | bigint(20) | PRI |
name | varchar(100) | |
since | varchar(100) |
AT 分支事务的业务逻辑:
update product set name = 'GTS' where name = 'TXC';
过程:
select id, name, since from product where name = 'TXC'; 复制代码
得到前镜像:
id name since 1 TXC 2014
select id, name, since from product where id = 1`; 复制代码
得到后镜像:
id name since 1 GTS 2014
{ "branchId": 641789253, "undoItems": [{ "afterImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "GTS" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "beforeImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "TXC" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "sqlType": "UPDATE" }], "xid": "xid:xxx" } 复制代码
update product set name = 'TXC' where id = 1; 复制代码
UNDO_LOG Table:不同数据库在类型上会略有差别。
以 MySQL 为例:
Field | Type |
---|---|
branch_id | bigint PK |
xid | varchar(100) |
context | varchar(128) |
rollback_info | longblob |
log_status | tinyint |
log_created | datetime |
log_modified | datetime |
-- 注意此处0.7.0+ 增加字段 context CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; 复制代码
回顾总览中的描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.
AT 模式(参考链接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
Saga 模式是 SEATA 提供的长事务解决方案,在 Saga 模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)
Nacos
作为注册中心,Nacos 的安装及使用可以参考 seata-server
,这里下载的是 seata-server-0.9.0.zip,下载地址: github.com/seata/seata…
seata安装包
快速获取百度云下载链接 解压完成后我们得到了几个文件夹
seata server
所有的配置都在 conf 文件夹内,该文件夹内有两个文件我们必须要详细介绍下。
seata server
默认使用 file(文件方式)进行存储事务日志、事务运行信息,我们可以通过-m db 脚本参数的形式来指定,目前仅支持 file、db 这两种方式。
修改 conf 目录下的 file.conf 配置文件,主要修改自定义事务组名称,事务日志存储模式及数据库连接信息
transport { ...省略 } service { #vgroup->rgroup vgroup_mapping.prex_tx_group = "default" #修改事务组名称为:prex_tx_group,和客户端自定义的名称对应 #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" } ## transaction log store store { ## store mode: file、db mode = "db" #修改此处将事务信息存储到db数据库中 ## database store db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "druid" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost:3306/seat" #修改数据库连接地址 user = "root" #修改数据库用户名 password = "root" #修改数据库密码 min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } } 复制代码
说明:
registry.conf
配置文件,指明注册中心为 nacos,及修改 nacos 连接信息即可; registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" nacos { serverAddr = "localhost:8848" namespace = "" cluster = "default" } ... 省略 } } 复制代码
启动 seata server 的脚本位于 bin 文件内, Linux/Mac
环境使用 seata-server.sh 脚本启动,Windows 环境使用 seata-server.bat 脚本启动。
Linux/Mac
启动方式示例如下所示:
nohup sh seata-server.sh -p 8091 -h 127.0.0.1 -m db &> seata.log & 复制代码
通过 nohup 命令让 seata server 在系统后台运行。
脚本参数:
当我们看到-Server started 时并未发现其他错误信息,我们的 seata server 已经启动成功
让我们从一个微服务示例开始 用户购买商品的业务逻辑。整个业务逻辑由 3 个微服务提供支持:
创建业务数据库
db-order:存储订单的数据库
db-storage:存储库存的数据库
db-account:存储账户信息的数据库
order 订单表:
DROP TABLE IF EXISTS `order`; CREATE TABLE `order` ( `id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主键Id', `user_id` int(20) DEFAULT NULL COMMENT '用户Id', `pay_money` decimal(11,0) DEFAULT NULL COMMENT '付款金额', `product_id` int(20) DEFAULT NULL COMMENT '商品Id', `status` int(11) DEFAULT NULL COMMENT '状态', `count` int(11) DEFAULT NULL COMMENT '商品数量', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='订单表'; SET FOREIGN_KEY_CHECKS = 1; 复制代码
product 商品表:
DROP TABLE IF EXISTS `product`; CREATE TABLE `product` ( `id` int(20) NOT NULL COMMENT '主键', `product_id` int(11) DEFAULT NULL COMMENT '商品Id', `price` decimal(11,0) DEFAULT NULL COMMENT '价格', `count` int(11) DEFAULT NULL COMMENT '库存数量', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='仓储服务'; -- ---------------------------- -- Records of product -- ---------------------------- BEGIN; INSERT INTO `product` VALUES (1, 1, 50, 100); COMMIT; SET FOREIGN_KEY_CHECKS = 1; 复制代码
account 账户表:
DROP TABLE IF EXISTS `account`; CREATE TABLE `account` ( `id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主键Id', `user_id` int(20) DEFAULT NULL COMMENT '用户Id', `balance` decimal(11,0) DEFAULT NULL COMMENT '余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC; -- ---------------------------- -- Records of account -- ---------------------------- BEGIN; INSERT INTO `account` VALUES (1, 1, 100); COMMIT; SET FOREIGN_KEY_CHECKS = 1; 复制代码
需要在每个数据库中创建日志回滚表,建表 sql 在 seata-server 的/conf/db_undo_log.sql 中。
三个服务,一个订单服务,一个仓储服务,一个账户服务。当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题 复制代码
nacos-seata-account-server 账户服务
nacos-seata-order-server 订单服务
nacos-seata-storage-server 仓储服务
对 nacos-seata-account-server、nacos-seata-order-server 和 nacos-seata-storage-server 三个 seata 的客户端进行配置,它们配置大致相同,我们下面以 nacos-seata-account-server 的配置为例;
修改 application.yml 文件,自定义事务组的名称
spring: cloud: alibaba: seata: tx-service-group: prex_tx_group #自定义事务组名称需要与seata-server中的对应 复制代码
service { #vgroup->rgroup vgroup_mapping.prex_tx_group = "default" #修改自定义事务组名称 #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" disableGlobalTransaction = false } 复制代码
添加并修改 registry.conf 配置文件,主要是将注册中心改为 nacos
registry { # file 、nacos 、eureka、redis、zk type = "nacos" #修改为nacos nacos { serverAddr = "localhost:8848" #修改为nacos的连接地址 namespace = "" cluster = "default" } } 复制代码
代码只展示核心代码 具体代码文章尾部链接
@EnableDiscoveryClient @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) @MapperScan("com.xd.example.seata.mapper") public class NacosSeataAccountServerApplication { public static void main(String[] args) { SpringApplication.run(NacosSeataAccountServerApplication.class, args); } } 复制代码
MyBatisPlusConfig:
/** * @Classname MyBatisPlusConfig * @Description 配置MybatisPlus使用Seata对数据源进行代理 * @Author Created by Lihaodong (alias:小东啊) im.lihaodong@gmail.com * @Date 2019-11-25 11:21 * @Version 1.0 */ @Configuration public class MyBatisPlusConfig { @Value("${mybatis-plus.mapper-locations}") private String mapperLocations; /** * @param sqlSessionFactory SqlSessionFactory * @return SqlSessionTemplate */ @Bean public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { return new SqlSessionTemplate(sqlSessionFactory); } /** * 从配置文件获取属性构造datasource,注意前缀,这里用的是druid,根据自己情况配置, * 原生datasource前缀取"spring.datasource" * * @return */ @Bean @ConfigurationProperties(prefix = "spring.datasource.hikari") public DataSource hikariDataSource() { return new HikariDataSource(); } /** * 构造datasource代理对象,替换原来的datasource * * @param hikariDataSource * @return */ @Primary @Bean("dataSource") public DataSourceProxy dataSourceProxy(DataSource hikariDataSource) { return new DataSourceProxy(hikariDataSource); } @Bean(name = "sqlSessionFactory") public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(dataSourceProxy); ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); bean.setMapperLocations(resolver.getResources(mapperLocations)); SqlSessionFactory factory = null; try { factory = bean.getObject(); } catch (Exception e) { throw new RuntimeException(e); } return factory; } /** * MP 自带分页插件 * * @return */ @Bean public PaginationInterceptor paginationInterceptor() { PaginationInterceptor page = new PaginationInterceptor(); page.setDialectType("mysql"); return page; } } 复制代码
package com.xd.example.seata.service.impl; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.xd.example.seata.domain.Order; import com.xd.example.seata.mapper.OrderMapper; import com.xd.example.seata.service.IOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.xd.example.seata.service.RemoteAccountService; import com.xd.example.seata.service.RemoteStorageService; import io.seata.core.context.RootContext; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * <p> * 订单表 服务实现类 * </p> * * @author lihaodong * @since 2019-11-25 */ @Slf4j @Service public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService { @Autowired private RemoteStorageService remoteStorageService; @Autowired private RemoteAccountService remoteAccountService; @GlobalTransactional(rollbackFor = Exception.class) @Override public void createOrder(Order order) { log.info("下单开始,用户:{},商品:{},数量:{},金额:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney()); //创建订单 order.setStatus(0); boolean save = save(order); log.info("保存订单{}", save ? "成功" : "失败"); log.info("当前 XID: {}", RootContext.getXID()); //远程调用库存服务扣减库存 log.info("扣减库存开始"); remoteStorageService.decrease(order.getProductId(), order.getCount()); log.info("扣减库存结束"); //远程调用账户服务扣减余额 log.info("扣减余额开始"); remoteAccountService.decrease(order.getUserId(), order.getPayMoney()); log.info("扣减余额结束"); //修改订单状态为已完成 log.info("修改订单状态开始"); update(Wrappers.<Order>lambdaUpdate().set(Order::getStatus, 1).eq(Order::getUserId, order.getUserId())); log.info("修改订单状态结束"); log.info("下单结束"); } } 复制代码
分别运行 nacos-seata-order-server、nacos-seata-storage-server 和 nacos-seata-account-server 三个服务
可以看到 seata 注册成功
查询数据库初始数据信息
打开浏览器/Postman 调用接口进行下单操作: http://localhost:8081/order/create?userId=1&productId=1&count=1&payMoney=50
结果:
查看控制台打印: 订单服务:仓储服务:
账户服务:
package com.xd.example.seata.service.impl; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.xd.example.seata.domain.Account; import com.xd.example.seata.mapper.AccountMapper; import com.xd.example.seata.service.IAccountService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.Optional; /** * <p> * 服务实现类 * </p> * * @author lihaodong * @since 2019-11-25 */ @Slf4j @Service public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService { @Override public boolean reduceBalance(Integer userId, BigDecimal balance) throws Exception { log.info("当前 XID: {}", RootContext.getXID()); checkBalance(userId, balance); log.info("开始扣减用户 {} 余额", userId); //模拟超时异常 try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } Integer record = baseMapper.reduceBalance(userId, balance); log.info("结束扣减用户 {} 余额结果:{}", userId, record > 0 ? "操作成功" : "扣减余额失败"); return record > 0; } private void checkBalance(Integer userId, BigDecimal price) throws Exception { log.info("检查用户 {} 余额", userId); Optional<Account> account = Optional.ofNullable(baseMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getUserId, userId))); if (account.isPresent()) { BigDecimal balance = account.get().getBalance(); if (balance.compareTo(price) == -1) { log.warn("用户 {} 余额不足,当前余额:{}", userId, balance); throw new Exception("余额不足"); } } } } 复制代码
修改完会重启账户服务,再次发送请求
订单服务控制台:
可以看到订单正常,扣除库存正常,账户服务读取超时异常
发现下单后数据库数据并没有任何改变
我们在 seata-order-service 中注释掉@GlobalTransactional 来看看会发生什么
// @GlobalTransactional(name = "prex-create-order",rollbackFor = Exception.class) @Override public void createOrder(Order order) { log.info("当前 XID: {}", RootContext.getXID()); log.info("下单开始,用户:{},商品:{},数量:{},金额:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney()); //创建订单 order.setStatus(0); boolean save = save(order); log.info("保存订单{}", save ? "成功" : "失败"); ... 省略代码 } 复制代码
保存重启订单服务,再次请求接口 由于 nacos-seata-account-server 的超时会导致当库存和账户金额扣减后订单状态并没有设置为已经完成