Flow Control(流量控制的方法):背压(Backpressure)、节流(Throttling)、打包处理、调用栈阻塞(Callstack Blocking)
1.Backpressure:也称为ReactivePull,就是下游需要多少(具体是通过下游的request请求指定需要多少),上游就发送多少。这有点类似于TCP里的流量控制,接收方根据自己的接收窗口的情况来控制接收速率,并通过反向的ACK包来控制发送方的发送速率。
2.Throttling:说白了就是丢弃。消费不过来,就处理其中一部分,剩下的丢弃。还是举音视频直播的例子,在下游处理不过来的时候,就需要丢弃数据包。具体策略:sample(也叫throttleLast)、throttleFirst、debounce (也叫throttleWithTimeout)
Sample(throttleLast):sample,采样。类比一下音频采样,8kHz的音频就是每125微秒采一个值。sample可以配置成,比如每100毫秒采样一个值,但100毫秒内上游可能过来很多值,选哪个值呢,就是选最后那个值。所以它也叫throttleLast。
ThrottleFirst:跟sample类似,比如还是每100毫秒采样一个值,但选这100毫秒内的第一个值。在Android开发中有时候可以把throttleFirst用作点击事件的防抖动处理,就是因为它可以在指定的一段时间内处理第一个点击事件(即采样第一个值),但丢弃后面的点击事件。
Debounce:也叫throttleWithTimeout,名字里就包含一个例子。比如,一个网络程序维护一个TCP连接,不停地收发数据,但中间没数据可以收发的时候,就有间歇。这段间歇的时间,可以称为idle time。当idle time超过一个预设值的时候,就算超时了(timeout),这个时候可能就需要把连接断开了。实际上一些做server端的网络程序就是这么工作的。每收发一个数据包之后,启动一个计时器,等待一个idle time。如果计时器到时之前,又有收发数据包的行为,那么计时器重置,等待一个新的idle time;而如果计时器时间到了,就超时了(time out),这个连接就可以关闭了。debounce的行为,跟这个非常类似,可以用它来找到那些连续的收发事件之后的idle time超时事件。换句话说,debounce可以把连续发生的事件之间的较大的间歇找出来。
3.打包处理:打包就是把上游来的小包裹打成大包裹,分发到下游。这样下游需要处理的包裹的个数就减少了。RxJava中提供了两类这样的机制:buffer和window。buffer和window的功能基本一样,只是输出格式不太一样:buffer打包后的包裹用一个List表示,而window打包后的包裹又是一个Observable。
4.调用栈阻塞(CallstackBlocking):这种方式只适用于整个调用链都在一个线程上同步执行的情况,这要求中间的各个operator都不能启动新的线程。在平常使用中这种应该是比较少见的,因为我们经常使用subscribeOn或observeOn来切换执行线程,而且有些复杂的operator本身也会在内部启动新的线程来处理。
在RxJava2.x中,Observable不再支持Backpressure,而是改用Flowable来专门支持Backpressure。上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:
BackpressureStrategy.BUFFER,
BackpressureStrategy.DROP,
BackpressureStrategy.LATEST
BackpressureStrategy.BUFFER是不丢弃数据的处理方式。把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。
BackpressureStrategy.DROP和BackpressureStrategy.LATEST比较类似,都会丢弃数据。这两种策略相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。但这两种策略在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。