Seata 的动态降级需要结合配置中心的动态配置订阅功能。动态配置订阅,即通过配置中心监听订阅,根据需要读取已更新的缓存值,ZK、Apollo、Nacos 等第三方配置中心都有现成的监听器可实现动态刷新配置;动态降级,即通过动态更新指定配置参数值,使得 Seata 能够在运行过程中动态控制全局事务失效(目前只有 AT 模式有这个功能)。
那么 Seata 的默认配置中心类型 file 是怎么实现这个功能的呢?下面给大家介绍 Seata 结合动态刷新配置实现动态降级的功能。
Seata 配置中心有一个监听器基准接口,它主要有一个抽象方法和 default 方法,如下:
io.seata.config.ConfigurationChangeListener
该监听器基准接口主要有两个实现类型:
实现注册配置订阅事件监听器:用于实现各种功能的动态配置订阅,比如 GlobalTransactionalInterceptor 实现了 ConfigurationChangeListener,根据动态配置订阅实现的动态降级功能;
实现配置中心动态订阅功能:对于没有订阅功能的配置中心来说,可实现 ConfigurationChangeListener 接口来提供动态订阅功能,比如默认的 file,以及 consul、etcd 等。有些配置中心已经有订阅功能,就不要实现该接口了,比如 zk、apollo、nacos 等。
这里就用默认的 file 配置中心,以它的实现类 FileListener 举例子,它的实现逻辑如下:
如上,
dataId:为订阅的配置属性;
listener:配置订阅事件监听器,用于将外部传入的 listener 作为一个 wrapper,执行真正的变更逻辑,这里特别需要注意的是, 该监听器与 FileListener 同样实现了 ConfigurationChangeListener 接口,只不过 FileListener 是用于给 file 提供动态配置订阅功能,而 listener 用于实现配置订阅事件 ;
executor:用于处理配置变更逻辑的线程池,在 ConfigurationChangeListener#onProcessEvent 方法中用到。
FileListener#onChangeEvent 方法的实现让 file 具备了动态配置订阅的功能,它的逻辑如下:
无限循环获取订阅的配置属性当前的值,从缓存中获取旧的值,判断是否有变更,如果有变更就执行外部传入 listener 的逻辑。
ConfigurationChangeEvent 用于保存配置变更的事件类,它的成员属性如下:
这里的 getConfig 方法是如何感知 file 配置的变更呢?我们点进去,发现它最终的逻辑如下:
发现它是创建一个 future 类,然后包装成一个 Runnable 放入线程池中异步执行,最后调用 get 方法阻塞获取值,那么我们继续往下看:
allowDynamicRefresh:动态刷新配置开关;
targetFileLastModified:file 最后更改的时间缓存。
以上逻辑:
获取 file 最后更新的时间值 tempLastModified,然后对比对比缓存值 targetFileLastModified,如果 tempLastModified > targetFileLastModified,说明期间配置有更改过,这时就重新加载 file 实例,替换掉旧的 fileConfig,使得后面的操作能够获取到最新的配置值。
添加一个配置属性监听器的逻辑如下:
configListenersMap 为 FileConfiguration 的配置监听器缓存,它的数据结构如下:
ConcurrentMap<String/*dataId*/, Set<ConfigurationChangeListener>> configListenersMap
从数据结构上可看出,每个配置属性可关联多个事件监听器。
最终执行 onProcessEvent 方法,这个是监听器基准接口里面的 default 方法,它会调用 onChangeEvent 方法,即最终会调用 FileListener 中的实现。
有了以上的动态配置订阅功能,我们只需要实现 ConfigurationChangeListener 监听器,就可以做各种功能,目前 Seata 只有动态降级有用到动态配置订阅的功能。
在「 Seata AT 模式启动源码分析 」这篇文章中讲到,Spring 集成 Seata 的项目中,在 AT 模式启动时,会用 用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法,GlobalTransactionalInterceptor 实现了 MethodInterceptor,最终会执行 invoker 方法,那么想要实现动态降级,就可以在这里做手脚。
在 GlobalTransactionalInterceptor 中加入一个成员变量:
private volatile boolean disable;
在构造函数中进行初始化赋值:
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION(service.disableGlobalTransaction)这个参数目前有两个功能:
在启动时决定是否开启全局事务;
在开启全局事务后,决定是否降级。
实现 ConfigurationChangeListener:
这里的逻辑简单,就是判断监听事件是否属于 ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION 配置属性,如果是,直接更新 disable 值。
接下来在 GlobalTransactionalInterceptor#invoke 中做点手脚
如上,disable = true 时,不执行全局事务与全局锁。
配置中心订阅降级监听器
io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary
在 Spring AOP 进行 wrap 逻辑过程中,当前配置中心将订阅降级事件监听器。
Seata 配置中心实现原理
Seata AT 模式启动源码分析
分布式事务中间件 Seata 的设计原理
我对支付平台架构设计的一些思考
聊聊 Tomcat 的架构设计
关于 Kafka 的一些面试题目
基于Jenkins Pipeline自动化部署
图解:Kafka 水印备份机制
记一次 Kafka 集群线上扩容
Kafka重平衡机制
RocketMQ消息发送的高可用设计
深度解析RocketMQ Topic的创建机制
mybatis-plus 源码分析之sql注入器
Mybatis源码分析之Mapper注册与绑定
从源码的角度解析线程池运行原理
关于线程池你不得不知道的一些设置
你都理解创建线程池的参数吗?
Java并发之AQS源码分析(二)
Java并发之AQS源码分析(一)