欢迎来到深入理解 RxJava2 系列第三篇。在上一篇中,我们详细地介绍了 Scheduler 与 Worker 的概念,并分析了 ComputationScheduler
与 IoScheduler
的实现,以帮助大家加深理解。本篇文章将基于 Scheduler ,来和大家分享 RxJava2 非常重要的概念:线程操作符。顺带提一下,本系列文章所有内容如不特别说明,均是指 Flowable
相关的概念,因为这是 RxJava2 遵循 RS 的实现。
RxJava 有很多基于 Scheduler 的操作符,如 timer
、 interval
、 debounce
等,但是笔者认为这些操作符与 subscribeOn
、 unsubscribeOn
、 observeOn
有本质上的区别。
其他的操作符,把 Scheduler 当做了计时工具,而 Scheduler 的调度导致线程切换是其附带属性,其核心是操作符本身的特性,如:
因此笔者定义狭义上的线程操作符,其目的是为了改变上下游的某些操作所在的线程。更严格的说法是,其目的是将上下游的某些操作由目标 Scheduler 调度执行,因为某些 Scheduler 的调度并不一定会切换线程,如 Schedulers.trampoline()
。虽然如此,但是我们还是称之为线程操作符,因为通常我们的本意是为了切换线程。
以下是所有的线程操作符:
Flowable
的 subscribe
方法,可能会调度上游 Subscription
的 request
方法 Subscription
的 cancel
方法 Subscriber
的 onNext / onError / onComplete
方法 通常 subscribeOn
与 observeOn
更受大家关注一些,因为 unsubscribeOn
使用的场景很少。因此本文就不会再花费过多笔墨在 unsubscribeOn
上,而且这个操作符本身的实现就非常简单,诸位一览便知。
subscribeOn
顾名思义,改变了上游的 subscribe
所在的线程。在传统的 Observable 中,只是改变了 Observable.subscribe
所在的线程,而在 Flowable 中不仅如此,还同样的改变了 Subscription.request
所在的线程。
这里就涉及到 subscribeOn
设计的用途,它最主要的目标是改变发射数据源的线程。因此在 Observable
中数据的发射,也就是耗时操作一般在 subscribe
所在的线程(这里不考虑在 onSubscribe
后内部开线程异步回调的情况)。
而在 RS 的规范中数据的回调是由消费者主动调用 Subscription.request
来触发的,因此在 Flowable
的实现中也要处理 request
的情况。
上面我们提到 RS 的规范中由消费者主动调用 Subscription.request
来触发回调数据,但是有些数据是异步产生的,可能在 subscribe
的一刻或者在那之前,譬如下面 2 个 API:
create 方法接受 FlowableOnSubscribe
作为真正的数据源。这个方法其实相比 RxJava1 已经做了很大的限制,通过封装了一层来支持 Backpressure。
关于此方法的细节,不再详细介绍,笔者之前有写过一篇文章分析过这个方法 《Rx2:小create,大文章》 ,有兴趣的读者可以去看看。
但是即便封装后支持了 Backpressure,背压的逻辑更多的还是隐藏在操作符内部了,对外部的使用者还是尽量屏蔽了这些细节。 FlowableEmitter
唯一能与 Backpressure 交互的接口仅是 long requested();
,并不能实时的响应 Subscription.request
。
这两者是几乎一致的,接受一个 Publisher
作为数据源,外面封了一层 Flowable
代理该 Publisher
对象,通过这种方式来提供 Flowable
的丰富的操作符。
换种角度来看,其实这两个方法更像 RxJava1.x 中的 create 方法。因为数据源是来自 Publisher
,因此使用更加自由与随意。
基于上述原因,在 subscribeOn
还提供了第二个参数来控制 request
的调度。
我们看一下方法的签名:
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn)
再看一眼唯一使用该参数的地方:
void requestUpstream(final long n, final Subscription s) { if (nonScheduledRequests || Thread.currentThread() == get()) { s.request(n); } else { worker.schedule(new Request(s, n)); } }
注意这里 nonScheduledRequests = !requestOn
,该参数的作用就很明显了。
如果 requestOn = true
,确保 Subscription.request
方法一定在目标线程执行。反之 requestOn = false
,则直接在当前线程执行 request
。
我们再看一下重载的单一参数的方法:
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return subscribeOn(scheduler, !(this instanceof FlowableCreate)); }
这里解释一下 FlowableCreate 是 Flowable.create
方法返回的类名,也就是说除了 create
作为上游的 Flowable,其他都推荐用强调度的方式。为什么单单 create
不可以用强调度呢。
我们用一个例子演示一下:
Flowable.<Integer>create(t -> { t.onNext(1); Thread.sleep(500); t.onNext(2); t.onNext(3); t.onComplete(); }, BackpressureStrategy.DROP) // 注释 1 .map(i -> i + 1) // 注释 2 .subscribeOn(Schedulers.io()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(1); new Thread(() -> { try { Thread.sleep(100); } catch (InterruptedException ignored) { } s.request(2); }).start(); } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("complete"); } });
我们在 create
中发射了一个 1,延时 500ms,再次发射 2、3,随后结束,但是我们在订阅的时候先请求了 1 个数据,随后延时 100ms 再次请求 2个数据。
按照正常的流程,虽然数据请求延迟 100ms,但是数据发射延迟了 500ms,因而 Subscriber
能正确的收到3个数据:
1 2 3 complete
非常棒,一切都很美好。此时我们把注释 2 处给取消掉,再次执行结果依然同上。
此时我们应该清楚,重载的函数传入的参数是 false。好我们再试一下,但是这次把注释 2 处的代码换成:
.subscribeOn(Schedulers.io(), true) 结果: 1 complete
很意外,2 和 3 去哪了?其实原因很简单,因为我们把参数改成 true 以后, request
方法要被 worker 调度后执行。
我们在 《深入理解 RxJava2:Scheduler(2)》 中强调过, Worker 有一个职责,保证入队的任务是串行执行的,换言之,我们的
t -> { t.onNext(1); Thread.sleep(500); t.onNext(2); t.onNext(3); t.onComplete(); }
是在 Worker 中执行的,因为这里的函数没有执行完,就无法执行后续的 request 任务。因此在数据发射过程中,上游自始至终都认为下游一开始只请求了一次数据,所以多发射的 2 与 3 就被丢弃了。
不仅如此,我们再把注释 1 与 2 同时取消掉:
.map(i -> i + 1) .subscribeOn(Schedulers.io()) 结果: 2 complete
如果读者能理解笔者上面分享的内容,就能知道是为什么,奥秘就在:
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return subscribeOn(scheduler, !(this instanceof FlowableCreate)); }
在 subscribeOn
前面增加了 map
操作符后,对象就不再是 FlowableCreate
了,而被 map
封了一层。所以导致 requestOn
错误的判别为 true
,最终导致线程锁住了 request
的个数。
因此别看 subscribeOn
简单,使用起来还是有不少道道的,望大家留心。
上面我们提过 subscribeOn
会影响发射数据的线程,从而间接的影响了消费者的消费的线程。
但是,消费线程和生产线程依然是同一个线程,这里从官网取一张示意图:
数据产生后在传递给下游的过程中,是不会发生线程切换的,请大家谨记。
笔者本想一起介绍 subscribeOn
与 observeOn
的,奈何洋洋洒洒地一写便收不住,为了避免文章过长导致读者厌倦, observeOn
以及这两者的结合与对比留待下篇分享。
感觉大家的阅读,欢迎关注笔者公众号,可以第一时间获取更新,同时欢迎留言沟通。