本文探讨如何使用RDBC2或MongoDB来使用Spring Reactive的事务支持。
在还没有加入响应式/反应式事务集成之间,Spring认为没有必须进行Reactive事务管理,因此,Spring Framework不支持Reactive @Transaction。
随着时间的推移,MongoDB开始支持MongoDB Server 4.0的多文档事务,R2DBC(反应式SQL数据库驱动程序的规范)开始出现,最终在Template API中提供inTransaction(…) 方法作为执行原生本级事务的工作单元。
虽然将inTransaction(…)方法用于较小的工作块很方便,但它并不反映Spring支持事务的方式。在使用命令式编程模型时,Spring Framework允许两种事务管理安排:@Transactional和TransactionTemplate(声明性的各自的程序化事务管理)。
这两种事务管理方法都建立在PlatformTransactionManager管理事务资源事务的基础之上。PlatformTransactionManager可以是Spring提供的事务管理器实现,也可以是基于JTA的Java EE实现。
两种方法的共同之处在于它们将事务状态绑定到ThreadLocal存储,这允许事务状态管理而不传递TransactionStatus对象。事务管理应该在后台以非侵入方式进行。因为我们没有让线程继续在事务中继续有作用工作的设想,因此ThreadLocal只在命令式编程中工作。
命令式编程事务管理工作机制
事务管理需要将其事务状态与执行相关联。在命令式编程中,这通常是ThreadLocal存储 - 事务状态被绑定到一个线程,假设前提是事务代码在容器调用它的同一个线程上执行。
反应式编程模型消除了命令式(同步/阻塞)编程模型的这一基本假设。仔细研究反应式执行情况,你会发现代码在不同的线程上执行。使用进程间通信时,这会更加明显。我们再也不能安全地假设我们的代码在同一个线程上完全执行了。
这种变化使的依赖ThreadLocal的事务管理实现无效。
我们需要一种不同的安排来反映事务状态,而不是一直传递一个TransactionStatus对象。
关联带外数据并不是反应空间中的新要求。我们在其他领域遇到过这种要求,例如SecurityContextSpring Security for reactive方法安全性(仅举一例)。Project Reactor是Spring自身构建其响应支持的反应库,自3.1版本开始就为订阅者的上下文提供了支持。
Reactor Context是替代ThreadLocal命令式编程的反应式编程, 上下文允许将上下文数据绑定到特定的执行。对于反应式编程,这是一个Subscription。
Reactor Context允许Spring将事务状态以及所有资源和同步绑定到特定的Subscription状态 。使用Project Reactor的所有反应代码现在都可以参与响应式事务。
反应性事务管理
从Spring Framework 5.2 M2开始,Spring通过ReactiveTransactionManagerSPI 支持响应式/反应式事务管理。
ReactiveTransactionManager是使用事务资源的反应式和非阻塞集成的事务管理抽象。它是一个会返回Publisher的反应式@Transactional方法元注解,使用TransactionalOperator实现可编程的事务管理。
两个反应式事务管理器实现是:
让我们来看看反应式式事务的样子:
<b>class</b> TransactionalService { <b>final</b> DatabaseClient db TransactionalService(DatabaseClient db) { <b>this</b>.db = db; } @Transactional Mono<Void> insertRows() { <b>return</b> db.execute() .sql(<font>"INSERT INTO person (name, age) VALUES('Joe', 34)"</font><font>) .fetch().rowsUpdated() .then(db.execute().sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe')"</font><font>) .then(); } } </font>
反应事务看起来非常类似于注释驱动中的命令事务,主要的区别在于我们使用DatabaseClient,这是一个反应性资源抽象。所有事务管理都在幕后进行,利用Spring的事务拦截器和ReactiveTransactionManager。
Spring基于方法返回类型分辨要应用的事务管理类型:
这种区别很重要,因为您仍然可以使用命令式组件,例如JPA或JDBC查询,如果将这些查询结果包装成一个Publisher类型,Spring将应用反应而不是命令式事务管理,反应式事务管理就不会打开ThreadLocal中绑定的JPA或JDBC所需的事务。
TransactionalOperator
下一步,让我们看一下编程化事务管理TransactionalOperator:
ConnectionFactory factory = … ReactiveTransactionManager tm = <b>new</b> R2dbcTransactionManager(factory); DatabaseClient db = DatabaseClient.create(factory); TransactionalOperator rxtx = TransactionalOperator.create(tm); Mono<Void> atomicOperation = db.execute() .sql(<font>"INSERT INTO person (name, age) VALUES('joe', 'Joe')"</font><font>) .fetch().rowsUpdated() .then(db.execute() .sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe')"</font><font>) .then()) .as(rxtx::transactional); </font>
上面的代码包含一些值得注意的组件:
订阅后会懒惰地反应式事务,operator启动事务,设置适当的隔离级别并将数据库连接与其订户上下文相关联。所有参与(上游)Publisher实例都使用一个上下文绑定事务连接。
Reactive-functional operator 链可以是线性的(通过使用单个Publisher)或非线性的(通过合并多个流)。Publisher使用operator风格样式时,反应式事务将会影响所有上游。要将事务范围限制为特定的Publishers 集合,请应用回调样式,如下所示:
TransactionalOperator rxtx = TransactionalOperator.create(tm); Mono<Void> outsideTransaction = db.execute() .sql(<font>"INSERT INTO person (name, age) VALUES('Jack', 31)"</font><font>) .then(); Mono<Void> insideTransaction = rxtx.execute(txStatus -> { <b>return</b> db.execute() .sql(</font><font>"INSERT INTO person (name, age) VALUES('Joe', 34)"</font><font>) .fetch().rowsUpdated() .then(db.execute() .sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe Black')"</font><font>) .then()); }).then(); Mono<Void> completion = outsideTransaction.then(insideTransaction); </font>
在上面的示例中,事务管理仅限于在execute(…)里面订阅的Publisher实例。换句话说,事务是作用域的。execute(…)中Publisher实例参与事务,并且命名outsideTransaction的Publisher在事务之外执行其工作。
Spring Data MongoDB
R2DBC是Spring与反应式的集成之一。另一个事务集成是通过Spring Data MongoDB访问MongoDB,您可以使用反应式编程来参与多文档事务。
Spring Data MongoDB是使用ReactiveMongoTransactionManager,这是一个ReactiveTransactionManager实现。它创建会话并管理事务,以便在托管事务中执行的代码参与多文档事务。
以下示例显示了MongoDB的编程事务管理:
ReactiveTransactionManager tm = <b>new</b> ReactiveMongoTransactionManager(databaseFactory); ReactiveMongoTemplate template = … template.setSessionSynchronization(ALWAYS); TransactionalOperator rxtx = TransactionalOperator.create(tm); Mono<Void> atomic = template.update(Step.<b>class</b>) .apply(Update.set(<font>"state"</font><font>, …)) .then(template.insert(EventLog.<b>class</b>).one(<b>new</b> EventLog(…)) .as(rxtx::transactional) .then(); </font>
上面的代码设置一个ReactiveTransactionManager并用于TransactionalOperator在单个事务中执行多个写操作。ReactiveMongoTemplate被配置为参与反应式交易。