RxJava
事件的发出和消费都在同一个线程,基于同步的观察者模式。观察者模式的核心是后台处理,前台回调的异步机制。要实现异步,需要引入 RxJava
的另一个概念 - 线程调度器 Scheduler
。
在不指定线程的情况下, RxJava
遵循的是线程不变的原则。即在哪个线程调用 subscribe()
方法,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到线程调度器 Scheduler
。
在 RxJava
中, Scheduler
- 调度器,相当于线程控制器, RxJava
通过它来指定每一段代码应该运行在什么样的线程。 RxJava
已经内置了几个 Scheduler
,它们已经适合大多数的使用场景:
直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
。
总是启用新线程,并在新线程执行操作。
I/O
操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
。行为模式和 newThread()
差不多,区别在于 io()
内部采用的是一个无数量上限的线程池,可以重用空闲的线程。因此多数情况下 io()
比 newThread()
更有效率。
注意:不要把计算任务放在 io()
中,可以避免创建不必要的线程。
计算任务所使用的 Scheduler
。这个计算指的是 CPU
密集型计算,即不会被 I/O
等操作限制性能的操作,例如图形的计算。这个 Scheduler
使用的固定的线程池,大小为 CPU
核数。
注意:不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
Android
还有一个专用的 AndroidSchedulers.mainThread()
,它指定的操作将在 Android
主线程运行。
有了这几个 Scheduler
,就可以使用 subscribeOn()
和 observeOn()
两个方法来对线程进行控制了。
subscribeOn()
: 指定 subscribe()
所发生的线程,即 Observable.OnSubscribe
被激活时所处的线程,或者叫做 事件产生 的线程。
observeOn()
: 指定 Subscriber
所运行在的线程,或者叫做 事件消费 的线程。
直接看代码:
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } }); 复制代码
上面这段代码中,由于 subscribeOn(Schedulers.io())
的指定,被创建的事件的内容 1
、 2
、 3
、 4
将会在 IO
线程 发出;由于 observeOn(AndroidScheculers.mainThread())
的指定,因此 subscriber
数字的打印将发生在 主线程 。
事实上,这种使用方式非常常见,它适用于多数的 『 后台线程取数据,主线程显示 』的程序策略。
以下是一个完整的例子:
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }) // 指定事件发出,即图片读取发生在 IO 线程 .subscribeOn(Schedulers.io()) // 指定事件消费 - 回调,即页面图片渲染发生在主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } }); 复制代码
这样的好处是,加载图片的过程发生在 IO
线程 ,而设置图片则发生在了 主线程 。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成界面上的丝毫卡顿。
上面介绍到可以利用 subscribeOn()
结合 observeOn()
来实现线程控制,让事件的产生和消费发生在不同的线程。在了解了 map()
和 flatMap()
等变换方法后,一个问题就产生了 - 能不能多切换几次线程?
因为 observeOn()
指定的是 Subscriber
的线程,而这个 Subscriber
并不是 subscribe()
参数中的 Subscriber
,而是 observeOn()
执行时,当前 Observable
所对应的 Subscriber
,即它的直接下级 Subscriber
。
也就是说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。
直接查看示例代码:
Observable.just(1, 2, 3, 4) // 事件发出的 IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.newThread()) .map(mapOperator) // IO 线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // Android 主线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); 复制代码
上面的代码,通过 observeOn()
的多次调用,程序实现了线程的多次切换。不过,不同于 observeOn()
的是, subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。
其实, subscribeOn()
和 observeOn()
的内部实现,也是用的 lift()
(见上文),具体看图(不同颜色的箭头表示不同的线程):
从图中可以看出, subscribeOn()
进行了线程切换的工作(图中的 schedule...
的位置)。
subscribeOn()
的线程切换发生在 OnSubscribe
中,即在它 通知上一级 OnSubscribe
时,这时事件还没有开始发送,因此 subscribeOn()
的线程控制只能在 事件发出的开端 造成影响,即只允许一次线程切换。
从图中可以看出,和 observeOn()
进行了线程切换的工作(图中的 schedule...
的位置)。
observeOn()
的线程切换则发生在它内建的 Subscriber
中,即发生在它即将给 下一级 Subscriber
发送事件时,因此 observeOn()
控制的是它 后面 的线程,允许多次线程切换。
最后用一张图来解释当多个 subscribeOn()
和 observeOn()
混合使用时,线程调度是怎么发生的:
图中共有 5
处对事件的操作,由图中可以看出:
① 和 ② 两处受第一个 subscribeOn()
影响,运行在 红色线程 ;
③ 和 ④ 处受第一个 observeOn()
的影响,运行在 绿色线程 ;
⑤ 处受第二个 onserveOn()
影响,运行在 紫色线程 ;
而第二个 subscribeOn()
,由于在 通知过程 中线程就被第一个 subscribeOn()
截断,因此对整个流程并没有任何影响。
注意:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
虽然超过一个的 subscribeOn()
对事件处理的流程没有影响,但在 流程之前 却是有用的。在前面的文章介绍 Subscriber
的时候,提到过 Subscriber
的 onStart()
可以用作流程开始前的 初始化处理 。
由于 onStart() 在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如:在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为无法预测 subscribe() 会在什么线程执行。
与 Subscriber.onStart()
相对应的,有一个方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
同样是在 subscribe()
调用后 而且 在事件发送前 执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe()
执行在 subscribe()
发生的线程;而如果在 doOnSubscribe()
之后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
示例代码如下:
Observable.create(onSubscribe) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { // 需要在主线程执行 progressBar.setVisibility(View.VISIBLE); } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 复制代码
上面的代码,在 doOnSubscribe()
的后面跟一个 subscribeOn()
,就能指定特定工作的线程了!
RxJava
的提供的各种事件及事件转换模型,以及基于转换的线程调度器,结合观察者模式,使得 RxJava
在异步编程体验、灵活性和运行效率上领先于其他的开源框架!
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。