【51CTO.com快译】
在本文中,我们将讨论如何使用 Spring WebFlux 来构建响应式 REST API 。在正式讨论之前,让我们首先来看看系统的开发,传统 REST 在实现中遇到的问题,以及当前 API 的普遍需求。
下图简要地罗列了传统应用和现代应用系统的主要特点。如今的系统讲求的是:分布式应用、云原生、高可用性和可扩展性。因此,有效地利用系统现有的资源是至关重要的。
应用程序 API 需求的演变
那么传统的 REST API 请求处理又是如何工作的呢?
传统 REST API 模型
如上图所示,传统 REST API 会带来如下问题:
下面,让我们来看看响应式 API 的优势,以及如何使用响应式编程,来解决上述问题。
那么,响应式编程的具体流程是怎样的呢?如下图所示,一旦应用程序调用了从某个数据源获取数据的操作,那么就会立即返回一个线程,并且让来自该数据源的数据作为数据 / 事件流出现。在此,应用程序是订阅者( subscriber ),数据源是发布者( publisher )。一旦数据流完成后, onComplete 事件就会被触发。
数据流工作流程
如下图所示,如果发生了任何异常情况,发布者将会触发 onError 事件。
数据流工作流程
在某些情况下,例如:从数据库中删除一个条目,发布者只会立即触发 onComplete/onError 事件,而不会调用 onNext 事件,毕竟没有任何数据可以返回。
数据流工作流程
下面,我们进一步讨论:什么是背压,以及如何将背压应用于响应流。例如,我们有一个客户端应用正在向另一个服务请求数据。该服务能够以 1000 TPS (吞吐量)的速率发布事件,而客户端应用只能以 200 TPS 的速率处理事件。
那么在这种情况下,客户端应用程序需要通过缓冲数据来进行处理。而在随后的调用中,客户端应用程序可能会缓冲更多的数据,以致最终耗尽内存。显然,这对于那些依赖该客户端应用的其他程序,会造成级联效应。为了避免此类情况,客户端应用可以要求服务在事件的末尾进行缓冲,并以客户端应用的速率去推送各种事件。这就是所谓的背压,具体流程请见下图。
背压示例
下面,我们将介绍响应流的规范(请参见 -- https://www.reactive-streams.org/ ),以及一个实现案例 --Project Reactor (请参见 -- https://projectreactor.io/ )。通常,响应流的规范定义了如下接口类型:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
public interface Subscription { public void request(long n); public void cancel(); }
下面是响应流规范的类图:
响应流规范
其实,响应流规范具有许多种实现方式,上述 Project Reactor 只是其中的一种。 Reactor 可以完全实现无阻塞、且有效的请求管理。它能够提供两个响应式和可组合的 API ,即: Flux [N] (请参见 -- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html )和 Mono [0|1] (请参见 -- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html )。它们广泛地实现了响应式扩展( Reactive Extensions )。 Reactor 为 HTTP (包括 Websocket )提供了非阻塞的背压式网络引擎、 TCP 和 UDP 。它也非常适合于微服务的架构。
图片来源: https://projectreactor.io
图片来源: https://projectreactor.io
由于 Reactor 的实施往往涉及到 Spring 5.x ,因此,我们可以使用带有 Spring servlet 栈的命令式编程,来构建 REST API 。下图展示了 Spring 如何支持响应式和 servlet 栈的实现。
图片来源: spring.io
下面是一个公布了响应式 REST API 的应用。在该应用中,我们使用到了:
下图是该应用的整体架构:
下面是 build.gradle 文件的 Groovy 代码,它包含了与 Spring WebFlux 协同使用的各种依赖项。
plugins { id 'org.springframework.boot' version '2.2.6.RELEASE' id 'io.spring.dependency-management' version '1.0.9.RELEASE' id 'java' } group = 'org.smarttechie' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-cassandra-reactive' implementation 'org.springframework.boot:spring-boot-starter-webflux' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'io.projectreactor:reactor-test' } test { useJUnitPlatform() }
在此应用程序中,我公布了如下 API 。您可以通过 GitHub 的相关链接 -- https://github.com/2013techsmarts/Spring-Reactive-Examples ,下载源代码。
在构建响应式 API 时,我们可以使用功能性样式编程模型来构建 API ,而无需使用 RestController 。当然,您需要具有如下的 router 和 handler 组件:
Router :
package org.smarttechie.router; import org.smarttechie.handler.ProductHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.web.reactive.function.server.RequestPredicates.*; @Configuration public class ProductRouter { /** * The router configuration for the product handler. * @param productHandler * @return */ @Bean public RouterFunction<ServerResponse> productsRoute(ProductHandler productHandler){ return RouterFunctions.route(GET("/products").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::getAllProducts).andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON)),productHandler::createProduct).andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::deleteProduct).andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)),productHandler::updateProduct); } }
Handler :
package org.smarttechie.handler; import org.smarttechie.model.Product; import org.smarttechie.service.ProductService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; import static org.springframework.web.reactive.function.BodyInserters.fromObject; @Component public class ProductHandler { @Autowired private ProductService productService; static Mono<ServerResponse> notFound = ServerResponse.notFound().build(); /** * The handler to get all the available products. * @param serverRequest * @return - all the products info as part of ServerResponse */ public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) { return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.getAllProducts(), Product.class); } /** * The handler to create a product * @param serverRequest * @return - return the created product as part of ServerResponse */ public Mono<ServerResponse> createProduct(ServerRequest serverRequest) { Mono<Product> productToSave = serverRequest.bodyToMono(Product.class); return productToSave.flatMap(product -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.save(product), Product.class)); } /** * The handler to delete a product based on the product id. * @param serverRequest * @return - return the deleted product as part of ServerResponse */ public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) { String id = serverRequest.pathVariable("id"); Mono<Void> deleteItem = productService.deleteProduct(Integer.parseInt(id)); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(deleteItem, Void.class); } /** * The handler to update a product. * @param serverRequest * @return - The updated product as part of ServerResponse */ public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) { return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product ->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(product))).switchIfEmpty(notFound); } }
至止,我们已经对如何公布响应式 REST API 有所了解。针对上述实现,我们使用了 Gatling (译者注:是一款功能强大的负载测试工具),在响应式 API 和非响应式 API (使用 Spring RestController 构建非响应式 API )上,进行了简单的基准化测试。其结果比较如下图所示。具体的 Gatling 负载测试脚本,请参考 GitHub 上的链接: https://github.com/2013techsmarts/Spring-Reactive-Examples 。
负载测试结果比较
原标题: Build Reactive REST APIs With Spring WebFlux, 作者 :Siva Prasad Rao Janapati
【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】
【责任编辑:庞桂玉 TEL:(010)68476606】