不久前,JDBC驱动程序的反应变体称为R2DBC发布了,它允许数据异步流式传输到已订阅它的任何端点,结合使用像R2DBC这样的反应式驱动程序和Spring WebFlux,可以编写一个完整的响应式应用程序来异步进行数据的接收和发送。在这篇文章中,我们将重点关注数据库端:从连接到数据库,然后最终保存和检索数据。
我们使用Spring Data实现数据库端的反应式应用,与所有Spring Data模块一样,它为我们提供了开箱即用的配置,可以减少我们需要编写的样板代码量以获得我们的应用程序设置。最重要的是,它在数据库驱动程序上提供了一个层,更容易编制任务变得更容易。
对于这篇文章的内容,我使用的是Postgres数据库,当然,H2和Microsoft SQL Server都有自己的R2DBC驱动程序实现。
依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-r2dbc</artifactId> <version>1.0.0.M1</version> </dependency> <dependency> <groupId>io.r2dbc</groupId> <artifactId>r2dbc-postgresql</artifactId> <version>1.0.0.M6</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> </dependencies> <repositories> <repository> <id>repository.spring.milestone</id> <name>Spring Milestone Repository</name> <url>http:<font><i>//repo.spring.io/milestone</url></i></font><font> </repository> </repositories> </font>
使用Spring Boot的次数越多,就越习惯于使用spring-boot-starter导入单个依赖项。我希望会有spring-boot-starter-r2dbc依赖,但不幸的是,没有这样一个依赖。在编写本文时,它没有自己的Spring Boot模块,我相信未来会让R2DBC驱动程序更容易设置。目前,我们需要手动填写一些额外的依赖项。此外,R2DBC库只有Milestone版本(更多证明它们是新的)所以我们需要确保引入Spring Milestone库。
连接到数据库
感谢Spring Data为我们做了很多工作,需要手动创建的唯一Bean ConnectionFactory包含数据库的连接细节:
@Configuration @EnableR2dbcRepositories <b>class</b> DatabaseConfiguration( @Value(<font>"/${spring.data.postgres.host}"</font><font>) <b>private</b> val host: String, @Value(</font><font>"/${spring.data.postgres.port}"</font><font>) <b>private</b> val port: Int, @Value(</font><font>"/${spring.data.postgres.database}"</font><font>) <b>private</b> val database: String, @Value(</font><font>"/${spring.data.postgres.username}"</font><font>) <b>private</b> val username: String, @Value(</font><font>"/${spring.data.postgres.password}"</font><font>) <b>private</b> val password: String ) : AbstractR2dbcConfiguration() { override fun connectionFactory(): ConnectionFactory { <b>return</b> PostgresqlConnectionFactory( PostgresqlConnectionConfiguration.builder() .host(host) .port(port) .database(database) .username(username) .password(password).build() ) } } </font>
这里要注意的第一件事是扩展AbstractR2dbcConfiguration。该类包含一堆我们不再需要手动创建的Bean。实现connectionFactory是类的唯一要求,因为创建DatabaseClientBean 需要它。
这种结构是Spring Data模块的典型结构,因此在尝试不同的模块时会感觉非常熟悉。此外,我希望一旦自动配置可用,就可以删除这个手动配置,并且只能通过自动配置application.properties驱动。
Spring可以连接到正在运行的Postgres实例的配置:
Postgres的port属性默认值5432 ;host,database,username和password是PostgresqlConnectionFactory需要的定义,缺少一个会抛出异常。
这个例子的最后一条值得注意的信息是使用@EnableR2dbcRepositories。此注释指示Spring查找扩展Spring Repository接口的任何存储库接口。这用作检测Spring Data存储库的基础接口。我们将在下一节中进一步了解这一点。要从中获取的主要信息是您需要使用@EnableR2dbcRepositories注释来充分利用Spring Data的功能。
创建Spring Data Repository
如上所述,在本节中,我们将介绍添加Spring Data Repository。这些存储库是Spring Data的一个很好的特性,这意味着您不需要编写大量额外代码来编写查询。不幸的是,至少就目前而言,Spring R2DBC不能像其他Spring Data模块那样进行推断查询(我相信这会在某些时候添加)。这意味着您需要使用@Query注释并手动编写SQL。让我们来看看:
@Repository <b>interface</b> PersonRepository : R2dbcRepository<Person, Int> { @Query(<font>"SELECT * FROM people WHERE name = $1"</font><font>) fun findAllByName(name: String): Flux<Person> @Query(</font><font>"SELECT * FROM people WHERE age = $1"</font><font>) fun findAllByAge(age: Int): Flux<Person> } </font>
此接口扩展R2dbcRepository。又扩展了ReactiveCrudRepository,ReactiveCrudRepository提供标准的CRUD功能,据我所知,R2dbcRepository它不提供任何额外的功能,而是为更好的上下文命名而创建的接口。
R2dbcRepository接受两个通用参数,一个是作为输入并作为输出生成的实体类。第二个是主键的类型。因此,在这种情况下,Person类由PersonRepository(有意义)管理,内部的主键字段Person是Int。
在这个类中函数的返回类型是ReactiveCrudRepository提供的Flux和Mono,这些是Spring使用的Project Reactor类型,作为默认的Reactive Stream类型。Flux表示多个元素的流,而 Mono表示单个结果。
最后,正如我之前在示例中提到的,每个函数都使用注释@Query。语法非常简单,SQL是注释中的一个字符串。$1($2,$3等...更多输入)表示输入到函数的值。完成此操作后,Spring将处理其余内容并将输入传递到各自的输入参数中,收集结果并将其映射到存储库的指定实体类。
快速查询实体
这里不多说,只是简单地展示了Person使用的类PersonRepository。
@Table(<font>"people"</font><font>) data <b>class</b> Person( @Id val id: Int? = <b>null</b>, val name: String, val age: Int ) </font>
id已被设为可为空并提供null默认值以允许Postgres自己生成下一个合适的值。如果主键不是可空null的并且id提供了值,则Spring实际上会尝试在保存时运行更新而不是插入。
该实体将映射到people下面定义的表:
CREATE TABLE people ( id SERIAL PRIMARY KEY, name VARCHAR NOT NULL, age INTEGER NOT NULL );
看看发生什么?
现在让我们来看看它实际上在做什么。下面是一些代码,它们插入一些记录并以几种不同的方式检索它们:
@SpringBootApplication <b>class</b> Application : CommandLineRunner { @Autowired <b>private</b> lateinit <b>var</b> personRepository: PersonRepository override fun run(vararg args: String?) { personRepository.saveAll( listOf( Person(name = <font>"Dan Newton"</font><font>, age = 25), Person(name = </font><font>"Laura So"</font><font>, age = 23) ) ).log().subscribe() personRepository.findAll().subscribe { log.info(</font><font>"findAll - $it"</font><font>) } personRepository.findAllById(Mono.just(1)).subscribe { log.info(</font><font>"findAllById - $it"</font><font>) } personRepository.findAllByName(</font><font>"Laura So"</font><font>).subscribe { log.info(</font><font>"findAllByName - $it"</font><font>) } personRepository.findAllByAge(25).subscribe { log.info(</font><font>"findAllByAge - $it"</font><font>) } } } </font>
这段代码实际上有可能没有实际插入或读取某些记录,反应式应用程序意味着异步执行操作,因此该应用程序已开始在不同的线程中处理函数调用,不阻塞主线程,这些异步进程可能永远不会完全执行。出于这个原因,在这段代码中应该调用Thread.sleep,但我从示例中删除它们以保持一切都很整洁。
运行上面代码的输出如下所示:
2019-02-11 09:04:52.294 INFO 13226 --- [ main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate) 2019-02-11 09:04:52.295 INFO 13226 --- [ main] reactor.Flux.ConcatMap.1 : request(unbounded) 2019-02-11 09:04:52.572 INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1 : onNext(Person(id=35, name=Dan Newton, age=25)) 2019-02-11 09:04:52.591 INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1 : onNext(Person(id=36, name=Laura So, age=23)) 2019-02-11 09:04:52.591 INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1 : onComplete() 2019-02-11 09:04:54.472 INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application : findAll - Person(id=35, name=Dan Newton, age=25) 2019-02-11 09:04:54.473 INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application : findAll - Person(id=36, name=Laura So, age=23) 2019-02-11 09:04:54.512 INFO 13226 --- [actor-tcp-nio-4] com.lankydanblog.tutorial.Application : findAllByName - Person(id=36, name=Laura So, age=23) 2019-02-11 09:04:54.524 INFO 13226 --- [actor-tcp-nio-5] com.lankydanblog.tutorial.Application : findAllByAge - Person(id=35, name=Dan Newton, age=25)
说明:
这并不是真实地表示如何在实际应用程序中使用Reactive Streams,而是希望演示如何使用它们并对它们的执行方式有一些了解。
结论
总而言之,Reactive Streams已经出现在一些RDBMS数据库中,这要归功于R2DBC驱动程序和Spring Data,它们在顶层构建了一层,使一切变得更加整洁。通过使用Spring Data R2DBC,我们可以创建与数据库的连接并开始查询它,而无需太多代码。尽管Spring已经为我们做了很多事情,但它可能会做得更多。目前,它没有Spring Boot自动配置支持。这有点烦人。但是,我确信有人会尽快做到这一点,并使一切都比现在更好。
这篇文章中使用的代码可以在我的 GitHub上找到 。