在看到Jurgen Hoeller引入 新的Spring 5功能后, 我终于开始尝试在尚未发布的Spring Boot 2.0.0 Snapshot中尝试新的Spring WebFlux项目。开始吧:
Maven WebFlux项目生成
演示(反应端点)
在刚刚生成的Spring WebFlux项目中,让我们构建一个REST端点,以 Reactive方式 获取通用存储Item 。首先让我们开始:
@RestController <b>public</b> <b>class</b> ItemsReactiveController { @Autowired <b>private</b> IItemsService iItemsService; <b>public</b> Flux<Item> findById(String id) { <b>try</b> { System.out.println(<font>"Getting the data from DB..."</font><font>); Thread.sleep(500); } <b>catch</b> (InterruptedException e) { e.printStackTrace(); } <b>return</b> Flux.just(iItemsService.findById(id)); } @GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE) <b>public</b> <b>void</b> getItemById(@PathVariable String id) { System.out.println(</font><font>"Controller start...."</font><font>); findById(id).subscribe(<b>new</b> Subscriber<Item>() { @Override <b>public</b> <b>void</b> onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override <b>public</b> <b>void</b> onNext(Item t) { System.out.println(</font><font>"Retrieved: "</font><font>+t.getName()); } @Override <b>public</b> <b>void</b> onError(Throwable t) { } @Override <b>public</b> <b>void</b> onComplete() { } }); System.out.println(</font><font>"End of method."</font><font>); } } </font>
代码详细
方法findById返回Flux <Item>类型。 Flux 是一种以反应方式返回0..N项的数据类型。另外一种可能性是使用返回0或1项的 Mono <Item>类型。
Jurgen Hoeller清楚地提到如果端点返回上述数据类型之一,那么实际上没有返回结果,但是Spring给调用者一个管道,其中结果将最终落地。正如您从NodeJS所知,但是在Spring方式中,引擎盖下的机制非常接近EventLoop。要从Reactive端点获取任何数据,您需要订阅返回的管道。如果您想获得结果的回调,那么管道上的订阅阶段或错误就会被使用来自反应流的 订阅者 ,这是 Project reactor 的底层实现。
第一次测试:
项目源码: https://bitbucket.org/tomask79/spring-reactive-rest.git
让我们用Oracle Store中现有Item的{id}调用先前创建的端点(我不会厌烦使用的JPA配置,它不是演示的主题)。点击浏览器: http://localhost:8081/store/{id }
系统输出:
Controller start.... Getting the data from DB... <p>[EL Fine]: sql: 2017-03-20 14:19:56.321--ServerSession(26882836)--Connection(29914401)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?) bind => [1 parameter bound] Retrieved: <Name of the Item> End of method.
如您所见,代码输出每个步骤:
默认情况下,从主线程获取订阅数据,当然因为我使用的数据存储不提供反应式驱动程序(Oracle),因此调用存储是阻塞的。目前,支持反应式编程(解锁通话)的存储是:
当然,激活检查新项目 Spring Data“Kay” ,才能在上面提到的Spring Data项目中启用反应范例。
要实际启用异步发布我们的项目,我们需要将控制器更改为:
<b>package</b> com.example.controller; <b>import</b> com.example.domain.Item; <b>import</b> com.example.service.IItemsService; <b>import</b> org.reactivestreams.Subscriber; <b>import</b> org.reactivestreams.Subscription; <b>import</b> org.springframework.beans.factory.annotation.Autowired; <b>import</b> org.springframework.http.MediaType; <b>import</b> org.springframework.web.bind.annotation.GetMapping; <b>import</b> org.springframework.web.bind.annotation.PathVariable; <b>import</b> org.springframework.web.bind.annotation.RequestParam; <b>import</b> org.springframework.web.bind.annotation.RestController; <b>import</b> reactor.core.publisher.Flux; <b>import</b> reactor.core.scheduler.Scheduler; <b>import</b> reactor.core.scheduler.Schedulers; <font><i>/** * Created by Tomas.Kloucek on 17.3.2017. */</i></font><font> @RestController <b>public</b> <b>class</b> ItemsReactiveController { @Autowired <b>private</b> IItemsService iItemsService; <b>public</b> Flux<Item> findById(String id) { <b>try</b> { System.out.println(</font><font>"Getting the data from DB..."</font><font>); Thread.sleep(500); } <b>catch</b> (InterruptedException e) { e.printStackTrace(); } <b>return</b> Flux.just(iItemsService.findById(id)).publishOn(Schedulers.parallel()); } @GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE) <b>public</b> <b>void</b> getItemById(@PathVariable String id) { System.out.println(</font><font>"Controller start...."</font><font>); findById(id).subscribe(v -> { System.out.println(</font><font>"Consumed: "</font><font>+v.getId()); }); System.out.println(</font><font>"End of method."</font><font>); } } </font>
现在,如果我们从浏览器再次点击端点,输出将是:
Controller start.... Getting the data from DB... <p>[EL Fine]: sql: 2017-03-20 14:16:49.69--ServerSession(18245293)--Connection(9048111)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?) bind => [1 parameter bound] End of method. Consumed: <ID of your Item entity>
正如您所看到的,Spring在给出订阅请求数据之前就已到达方法的最后(End of method)。
如何创建客户端以调用Reactive Endpoint
让我们用代码创建另一个Spring Boot WebReactive应用程序:
@SpringBootApplication <b>public</b> <b>class</b> DemoApplication { @Bean <b>public</b> WebClient webClient() { <b>return</b> WebClient.create(<font>"http://<reactiveAppHost>:<reactiveAppPort>"</font><font>); } @Bean CommandLineRunner launch(WebClient webClient) { <b>return</b> args -> { webClient.get().Yuri(</font><font>"/store/{id}"</font><font>) .accept(MediaType.TEXT_EVENT_STREAM) .exchange() .flatMap(cr -> cr.bodyToFlux(Item.<b>class</b>)) .subscribe(v -> { System.out.println(</font><font>"Received from MS: "</font><font>+v.getName()); }); }; } <b>public</b> <b>static</b> <b>void</b> main(String args[]) { <b>new</b> SpringApplicationBuilder(DemoApplication.<b>class</b>).properties (Collections.singletonMap(</font><font>"server.port"</font><font>, </font><font>"8082"</font><font>)) .run(args); } } </font>
代码详细:
要调用Reactive端点,您需要首先获取 WebClient 实例。在演示案例中放入create方法 http://localhost:8081 .。由WebClient.exchange()方法调用执行的自调用方法,
但要实际在管道上放置订阅以获取结果,您需要调用 ClientRequest .bodyToFlux(<ResultClassMapping> .class),这种订阅才是可能的。如果我们运行这个应用程序,那么结果应该是:
Started DemoApplication in 4.631 seconds (JVM running <b>for</b> 4.954) Received from MS: <Item text>
这部分客户端代码:
git clone https://tomask79@bitbucket.org/tomask79/spring-reactive-rest-client.git
用于异步调用REST端点的API是否必要?
我对这种订阅和异步调用端点的新反应趋势的观点是一种悲观。如果我需要异步调用端点,那么JMS或AMPQ是第一个让我进入大脑的想法,特别是在MicroServices中。但我们会看到这将如何发展。Spring Framework 5中的其他计划更改很有希望:
本站文章点击标题看原文!