响应式(反应式)编程的好处是背压Backpressure,可以平衡请求或响应率,这点与异步机制区别所在,也就是说,当响应堵塞时,会同时堵塞请求,因此reactive响应式=异步+同步(背压)。本文解释了Spring Web-Flux中的背压机制,假设我们编写一个Spring Web-Flux的控制器代码如下:
@RestController
public class FirstController
{
@GetMapping("/first")
public Mono<String> getAllTweets()
{
return Mono.just("I am First Mono")
}
}
这段代码背后的背压工作机制是什么?
为了理解Backpressure在WebFlux框架中如何工作,我们必须回顾一下当前默认使用的传输层。
我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器通信通常也是一样)是通过TCP连接完成的。WebFlux还是使用该传输进行客户端和服务器之间的通信。然后,为了获得背压控制,我们必须从Reactive Streams规范的角度来概述背压的含义。
规范的基本语义定义了如何通过背压来调节流元素的传输。
因此,从该声明中,我们可以得出结论,在Reactive Streams中,背压是一种通过传输(通知)接收者可以消费多少元素来调节生产的机制(消费决定生产); 在这里,我们有一个棘手的问题。TCP具有字节抽象而不是逻辑元素抽象。我们通常所说的背压控制是控制向网络发送/接收的逻辑元件的数量。即使TCP有自己的流控制,这个流控制仍然是字节而不是逻辑元素。
在WebFlux模块的当前实现中,背压由传输流控制来调节,但它不会暴露接收方的实际需求。为了最终看到交互流程,请参见下图:
为简单起见,上图显示了两个微服务之间的通信,其中左侧发送生产数据流,右侧消费该流。以下编号列表提供了该图表的简要说明:
1. 这是WebFlux框架,它正确地将逻辑元素转换为字节并返回并将它们传输到TCP /从TCP(网络)接收。
2. 这是数据长时间运行处理的开始,一旦作业完成,该数据就会请求下一个数据。
3. 在这里,虽然没有来自业务逻辑的需求,但WebFlux将来自网络的字节排队,而没有他们的确认(业务逻辑没有要求)。
4. 由于TCP流控制的性质,服务A仍然可以向网络发送数据。
正如我们可能从上图中注意到的那样,接收方的请求与发送方的请求不同(逻辑元素中的请求)。这意味着两者的请求是独立的,因此真正背压仅仅仅适用于WebFlux < - >业务逻辑(服务)交互,服务A < - >服务B交互不是完整的背压。
所有这些意味着背压控制在WebFlux中并不像我们预期的那样公平。
但我仍然想知道如何控制背压
如果我们仍然希望对WebFlux中的背压进行公平的控制,我们可以在Project Reactor支持下实现limitRate()。以下示例显示了我们如何使用:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
return tweetService.process(tweetsFlux.limitRate(10))
.then();
}
正如我们从示例中看到的那样,limitRate()运算符允许一次定义要预取的数据数。这意味着即使最终订户请求Long.MAX_VALUE元素,limitRate运行者也会将该请求拆分为块,并且不允许一次消费更多。我们可以用数据元素的发送过程来做同样的事情:
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetService.retreiveAll()
.limitRate(10);
}
上面的示例显示,即使WebFlux一次请求超过10个元素,也会limitRate()限制对预取大小的需求,并防止一次消费超过指定数量的元素。
另一种选择是实现自己的Subscriber或扩展BaseSubscriber来自Project Reactor。例如,以下是我们如何做到这一点的简单例子:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(T value) {
// do business logic there
consumed++;
if (consumed == limit) {
consumed = 0;
request(limit);
}
}
}
使用RSocket协议的公平背压
为了通过网络边界实现逻辑元素背压,我们需要一个适当的协议。幸运的是,有一种称为RScoket协议。RSocket是一种应用程序级协议,允许通过网络边界传输实际需求。该协议有一个RSocket-Java实现,允许设置RSocket服务器。在服务器到服务器通信的情况下,相同的RSocket-Java库也提供客户端实现。
对于浏览器 - 服务器通信,有一个RSocket-JS实现,能通过WebSocket连接浏览器和服务器之间的流通信。