上篇的续集。
工具:
Idea201902/JDK11/Gradle5.6.2/Mysql8.0.11/Lombok0.27/Postman7.5.0/SpringBoot2.1.9/Nacos1.1.3/Seata0.8.1/SeataServer0.8.1/Dubbo2.7.3
难度 : 新手--战士--老兵--大师
目标 :
1.使用Seata实现storage模块的TCC模式的本地模式
2.使用Seata实现多级TCC模式
步骤:
为了更好的遇到各种问题,同时保持时效性,我尽量使用最新的软件版本。本项目代码地址:其中的day18,https://github.com/xiexiaobiao/dubbo-project.git
1.先说下TCC(Try-Confirm-Cancel)模式:
TCC模式即将每个服务业务操作分为两个阶段,第一个阶段检查并预留相关资源,可视为一种临时操作,第二阶段根据所有服务业务的Try状态来操作,如果都成功,则进行Confirm操作,如果任意一个Try发生错误,则全部Cancel,特征在于它不依赖 RM 对分布式事务的支持,而是通过对业务逻辑的分解来实现分布式事务,不同于AT的是就是需要自行定义各个阶段的逻辑,对业务有 侵入 。TCC使用要求就是业务接口都必须实现三段逻辑:
准备操作 Try:完成所有业务检查,预留必须的业务资源。
确认操作 Confirm:真正执行的业务逻辑,不做任何业务检查,只使用 Try 阶段预留的业务资源。因此,只要 Try 操作成功,Confirm 必须能成功。另外,Confirm 操作需满足幂等性,保证一笔分布式事务能且只能成功一次。
取消操作 Cancel:释放 Try 阶段预留的业务资源。同样的,Cancel 操作也需要满足幂等性。
借用一张图来解释一下(此图来源见文末参考文章):
try-先扣款30元 --> confirm-空操作 --> cancel-退回30元。
如果再优化一下,将更接近TCC的定义:
try-冻结30元,预留资源 --> confirm-扣款30元 --> cancel-退回30元。
2.项目延续自前篇文章,不变,仅修改了storage模块,整体为多模块Dubbo微服务架构,目标一,即实现图一中TCC模式。
3.先从 com.biao.mall.storage.conf.SeataAutoConfig
开始:此类为配置类,完成Seata框架依赖元素的注入,
先自动注入DataSourceProperties,获取JDBC信息;
再注入一个GlobalTransactionScanner,用于扫描TCC事务;
@Configuration public class SeataAutoConfig { private DataSourceProperties dataSourceProperties; @Autowired public SeataAutoConfig(DataSourceProperties dataSourceProperties){ this.dataSourceProperties = dataSourceProperties; } /** * init durid datasource * @Return: druidDataSource datasource instance */ @Bean @Primary public DruidDataSource druidDataSource(){ DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(dataSourceProperties.getUrl()); druidDataSource.setUsername(dataSourceProperties.getUsername()); druidDataSource.setPassword(dataSourceProperties.getPassword()); druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); druidDataSource.setInitialSize(0); druidDataSource.setMaxActive(180); druidDataSource.setMaxWait(60000); druidDataSource.setMinIdle(0); druidDataSource.setValidationQuery("Select 1 from DUAL"); druidDataSource.setTestOnBorrow(false); druidDataSource.setTestOnReturn(false); druidDataSource.setTestWhileIdle(true); druidDataSource.setTimeBetweenEvictionRunsMillis(60000); druidDataSource.setMinEvictableIdleTimeMillis(25200000); druidDataSource.setRemoveAbandoned(true); druidDataSource.setRemoveAbandonedTimeout(1800); druidDataSource.setLogAbandoned(true); return druidDataSource; } /** * init datasource proxy * @Param: druidDataSource datasource bean instance * @Return: DataSourceProxy datasource proxy */ @Bean public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){ return new DataSourceProxy(druidDataSource); } /** * init mybatis sqlSessionFactory * @Param: dataSourceProxy datasource proxy * @Return: DataSourceProxy datasource proxy */ @Bean public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(dataSourceProxy); factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver() .getResources("classpath:/mapper/*Mapper.xml")); factoryBean.setTransactionFactory(new JdbcTransactionFactory()); return factoryBean.getObject(); } /** * init global transaction scanner * @Return: GlobalTransactionScanner */ @Bean public GlobalTransactionScanner globalTransactionScanner(){ return new GlobalTransactionScanner("${spring.application.name}", "my_test_tx_group"); } }
4.核心之一 com.biao.mall.storage.service.ProductService
接口:
@LocalTCC,注解标识此TCC为本地模式:即该事务是本地调用,非RPC调用;
@TwoPhaseBusinessAction,注解标识为TCC模式,其中定义了commitMethod 和rollbackMethod
@LocalTCC public interface ProductService extends IService<ProductEntity> { /** * 扣减库存 */ // ObjectResponse decreaseStorage(CommodityDTO commodityDTO); /** TCC 模式 */ @TwoPhaseBusinessAction(name = "StorageAction",commitMethod = "commit",rollbackMethod = "rollback") boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "commodityDTO") CommodityDTO commodityDTO); boolean commit(BusinessActionContext actionContext); boolean rollback(BusinessActionContext actionContext); }
5. com.biao.mall.storage.impl.ProductServiceImpl
,上面接口的实现类:try阶段执行业务逻辑,commit阶段空操作,rollback执行回退,看官当然可以实现一个优化的方案:try阶段执行数量冻结逻辑,commit阶段真实提交操作,rollback执行回退;
@Service public class ProductServiceImpl extends ServiceImpl<ProductDao, ProductEntity> implements ProductService { @Override public boolean prepare(BusinessActionContext actionContext, CommodityDTO commodityDTO) { System.out.println("actionContext获取Xid prepare>>> "+actionContext.getXid()); System.out.println("actionContext获取TCC参数 prepare>>> "+actionContext.getActionContext("commodityDTO")); int storage = baseMapper.decreaseStorage(commodityDTO.getCommodityCode(), commodityDTO.getCount()); //测试rollback时打开 /*int a = 1/0; System.out.println(a);*/ if (storage > 0){ return true; } return false; } @Override public boolean commit(BusinessActionContext actionContext) { System.out.println("actionContext获取Xid commit>>> "+actionContext.getXid()); return true; } @Override public boolean rollback(BusinessActionContext actionContext) { System.out.println("actionContext获取Xid rollback>>> "+actionContext.getXid()); //必须注意actionContext.getActionContext返回的是Object,且不可使用以下语句直接强转! //CommodityDTO commodityDTO = (CommodityDTO) actionContext.getActionContext("commodityDTO"); CommodityDTO commodityDTO = JSONObject.toJavaObject((JSONObject)actionContext.getActionContext("commodityDTO"),CommodityDTO.class); int storage = baseMapper.increaseStorage(commodityDTO.getCommodityCode(), commodityDTO.getCount()); if (storage > 0){ return true; } return false; } }
对于以上代码,rollback方法中,必须注意actionContext.getActionContext返回的是Object,且不可使用以下语句直接强转,运行会报错!
CommodityDTO commodityDTO = (CommodityDTO)actionContext.getActionContext("commodityDTO");
6. com.biao.mall.storage.controller.ProductController
,写个简单的API入口:
@RestController @RequestMapping("/product") public class ProductController { @Autowired private ProductService productService; private static final Logger LOGGER = LoggerFactory.getLogger(ProductController.class); @PostMapping("/dec") @GlobalTransactional public String handleBusiness(@RequestBody CommodityDTO commodityDTO) { LOGGER.info("请求参数:{}", commodityDTO.toString()); LOGGER.info("全局XID:{}", RootContext.getXID()); ObjectResponse<Object> response = new ObjectResponse<>(); if (productService.prepare(null,commodityDTO)){ return "success"; }; return null; } }
7.成功Commit测试:依次启动:Nacos-->SeataServer-->storage
Postman提交数据:
后台输出,注意数字标识:
获得请求参数对象DTO(1)
Seata开启全局事务(9)
Try阶段处理(2,3)
标识事务模式为TCC(4)
Commit阶段处理(5,6)
第二阶段成功(7),全局事务成功(8)
8.rollback测试:
打开 com.biao.mall.storage.impl.ProductServiceImpl
中prepare方法中的异常测试代码,再次运行:
Seata开启全局事务(1)
Rollback阶段处理(2)
第二阶段回退成功(3),全局事务成功(4)
目标一达成!
9.目标二,目前暂时没办法在该项目框架下实现,因基于 @Reference
注解获取的Dubbo服务,在Seata框架下,分支事务中 BusinessActionContext
实例一直是Null,折腾一整天,发现这是Seata的一个未关闭的Issue,作罢,以后再写!替代方案是基于XML方式,注册和获取Dubbo服务,再使用Spring模式的ApplicationContext获取服务Bean,是可以正常使用的,但我觉得这套方案不是未来的方向,过于原始,故放弃了,看官可以尝试!
复盘记:
1.SeataServer,即TC,其安装目录下文件 /seata/bin/sessionStore/root.data
会持久化事务执行状态,经测试,如果提交阶段失败,即将 storage/src/main/java/com/biao/mall/storage/impl/ProductServiceImpl.java
中commit空提交改为返回false,发生错误重启TC或应用,会自动继续尝试commit提交,再重启应用,RM注册失败,重启SeataServer,也会自动继续commit,见下图,说明有进行文件形式的持久化机制!为啥?因为对于二阶段式提交,只要try成功,commit是必须要成功的(或者try成功后,rollback一定要成功),如不这样,数据从理论上就是处于半状态,这系列动作本来就是一个事务,故由TC来保证此原则的执行。要强行恢复正常,手动删掉root.data即可!
2.后台的红色告警:
使用依赖树分析:
更新依赖即可消除,神奇的是org.apache.dubbo:dubbo:2.7.3中,我测试3.24,3.25,3.26都会报警告,只有3.23.1-GA刚刚好,这口味真独特,过老过嫩都不要!
compile group: 'org.javassist', name: 'javassist', version: '3.23.1-GA'
3.事实上com.biao.mall.storage.conf.SeataAutoConfig中:可以将DataSourceProxy不进行DI,执行效果如下图,可以看到sql的过程了,因为此时是由Spring来操作的,DataSourceProxy配置是AT模式必须的,因为要由Seata来代理完成数据操作,这也是TCC和AT模式的一大区别!
4.参考文章:https://juejin.im/post/5cbfd9a26fb9a03212505785