本文由 玉刚说写作平台
提供写作赞助
原作者: 四月葡萄
版权声明:本文版权归微信公众号 玉刚说 所有,未经许可,不得以任何形式转载
本文主要是对RxJava的消息订阅和线程切换进行源码分析,相关的使用方式等不作详细介绍。
本文源码基于 rxjava:2.1.14
。
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
上面这段话来自于RxJava在github上面的官方介绍。翻译成中文的大概意思就是:
RxJava是一个在Java虚拟机上的响应式扩展,通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。
它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。
简单点来说, RxJava就是一个使用了观察者模式,能够异步的库。
上面说到,RxJava扩展了观察者模式,那么什么是观察模式呢?我们先来了解一下。
举个例子,以微信公众号为例,一个微信公众号会不断产生新的内容,如果我们读者对这个微信公众号的内容感兴趣,就会订阅这个公众号,当公众号有新内容时,就会推送给我们。我们收到新内容时,如果是我们感兴趣的,就会点进去看下;如果是广告的话,就可能直接忽略掉。这就是我们生活中遇到的典型的观察者模式。
在上面的例子中,微信公众号就是一个被观察者( Observable
),不断的产生内容(事件),而我们读者就是一个观察者( Observer
) ,通过订阅( subscribe
)就能够接受到微信公众号(被观察者)推送的内容(事件),根据不同的内容(事件)做出不同的操作。
RxJava的扩展观察者模式中就是存在这么4种角色:
角色 | 角色功能 |
---|---|
被观察者( Observable ) |
产生事件 |
观察者( Observer ) |
响应事件并做出处理 |
事件( Event ) |
被观察者和观察者的消息载体 |
订阅( Subscribe ) |
连接被观察者和观察者 |
RxJava中的事件分为三种类型: Next
事件、 Complete
事件和 Error
事件。具体如下:
事件类型 | 含义 | 说明 |
---|---|---|
Next |
常规事件 | 被观察者可以发送无数个Next事件,观察者也可以接受无数个Next事件 |
Complete |
结束事件 | 被观察者发送Complete事件后可以继续发送事件,观察者收到Complete事件后将不会接受其他任何事件 |
Error |
异常事件 | 被观察者发送Error事件后,其他事件将被终止发送,观察者收到Error事件后将不会接受其他任何事件 |
在分析RxJava消息订阅原理前,我们还是先来看下它的简单使用步骤。这里为了方便讲解,就不用链式代码来举例了,而是采用分步骤的方式来逐一说明(平时写代码的话还是建议使用链式代码来调用,因为更加简洁)。其使用步骤如下:
Observable Observer subscribe
这里我们就根据上面的步骤来实现这个例子,如下:
//步骤1. 创建被观察者(Observable),定义要发送的事件。 Observable observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onNext("文章3"); emitter.onComplete(); } }); //步骤2. 创建观察者(Observer),接受事件并做出响应操作。 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; //步骤3. 观察者通过订阅(subscribe)被观察者把它们连接到一起。 observable.subscribe(observer);
其输出结果为:
onSubscribe onNext : 文章1 onNext : 文章2 onNext : 文章3 onComplete
下面我们对消息订阅过程中的源码进行分析,分为两部分:创建被观察者过程和订阅过程。
首先来看下创建被观察者( Observable
)的过程,上面的例子中我们是直接使用 Observable.create()
来创建 Observable
,我们点进去这个方法看下。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
可以看到, create()
方法中也没做什么,就是创建一个 ObservableCreate
对象出来,然后把我们自定义的 ObservableOnSubscribe
作为参数传到 ObservableCreate
中去,最后就是调用 RxJavaPlugins.onAssembly()
方法。
我们先来看看 ObservableCreate
类:
public final class ObservableCreate<T> extends Observable<T> {//继承自Observable public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source;//把我们创建的ObservableOnSubscribe对象赋值给source。 } }
可以看到, ObservableCreate
是继承自 Observable
的,并且会把 ObservableOnSubscribe
对象给存起来。
再看下 RxJavaPlugins.onAssembly()
方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { //省略无关代码 return source; }
很简单,就是把上面创建的 ObservableCreate
给返回。
所以 Observable.create()
中就是把我们自定义的 ObservableOnSubscribe
对象重新包装成一个 ObservableCreate
对象,然后返回这个 ObservableCreate
对象。 注意,这种重新包装新对象的用法在RxJava中会频繁用到,后面的分析中我们还会多次遇到。 放个图好理解,包起来哈~
Observable.create()
的时序图如下所示:
接下来我们就看下订阅过程的代码,同样,点进去 Observable.subscribe()
:
public final void subscribe(Observer<? super T> observer) { //省略无关代码 observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); //省略无关代码 }
可以看到,实际上其核心的代码也就两句,我们分开来看下:
public static <T> Observer<? super T> onSubscribe( @NonNull Observable<T> source, @NonNull Observer<? super T> observer) { //省略无关代码 return observer; }
跟之前代码一样,这里同样也是把原来的 observer
返回而已。 再来看下 subscribeActual()
方法。
protected abstract void subscribeActual(Observer<? super T> observer);
Observable
类的 subscribeActual()
中的方法是一个抽象方法,那么其具体实现在哪呢?还记得我们前面创建被观察者的过程吗,最终会返回一个 ObservableCreate
对象,这个 ObservableCreate
就是 Observable
的子类,我们点进去看下:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); //触发我们自定义的Observer的onSubscribe(Disposable)方法 observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
可以看到, subscribeActual()
方法中首先会创建一个 CreateEmitter
对象,然后把我们自定义的观察者 observer
作为参数给传进去。这里同样也是包装起来,放个图:
CreateEmitter
实现了
ObservableEmitter
接口和
Disposable
接口,如下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { //代码省略 }
然后就是调用了 observer.onSubscribe(parent)
,实际上就是调用观察者的 onSubscribe()
方法,即告诉观察者已经成功订阅到了被观察者。
继续往下看, subscribeActual()
方法中会继续调用 source.subscribe(parent)
,这里的 source
就是 ObservableOnSubscribe
对象,即这里会调用 ObservableOnSubscribe
的 subscribe()
方法。 我们具体定义的 subscribe()
方法如下:
Observable observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onNext("文章3"); emitter.onComplete(); } });
ObservableEmitter
,顾名思义,就是被观察者发射器。 所以, subscribe()
里面的三个 onNext()
方法和一个 onComplete()
会逐一被调用。 这里的 ObservableEmitter
接口其具体实现为 CreateEmitter
,我们看看 CreateEmitte
类的 onNext()
方法和 onComplete()
的实现:
//省略其他代码 @Override public void onNext(T t) { //省略无关代码 if (!isDisposed()) { //调用观察者的onNext() observer.onNext(t); } } @Override public void onComplete() { if (!isDisposed()) { try { //调用观察者的onComplete() observer.onComplete(); } finally { dispose(); } } }
可以看到,最终就是会调用到观察者的 onNext()
和 onComplete()
方法。至此,一个完整的消息订阅流程就完成了。 另外,可以看到,上面有个 isDisposed()
方法能控制消息的走向,即能够切断消息的传递,这个后面再来说。
Observable
(被观察者)和 Observer
(观察者)建立连接(订阅)之后,会创建出一个发射器 CreateEmitter
,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件做出响应处理。可以看到,是订阅之后, Observable
(被观察者)才会开始发送事件。
放张事件流的传递图:
再来看下订阅过程的时序流程图:
之前有提到过切断消息的传递,我们先来看下如何使用:
Observable observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onNext("文章3"); emitter.onComplete(); } }); Observer<String> observer = new Observer<String>() { private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe : " + d); mDisposable=d; } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); mDisposable.dispose(); Log.d(TAG, "切断观察者与被观察者的连接"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; observable.subscribe(observer);
输出结果为:
onSubscribe : null onNext : 文章1 切断观察者与被观察者的连接
可以看到,要切断消息的传递很简单,调用下 Disposable
的 dispose()
方法即可。调用 dispose()
之后,被观察者虽然能继续发送消息,但是观察者却收不到消息了。 另外有一点需要注意,上面 onSubscribe
输出的 Disposable
值是 "null"
,并不是空引用 null
。
我们这里来看看下 dispose()
的实现。 Disposable
是一个接口,可以理解 Disposable
为一个连接器,调用 dispose()
后,这个连接器将会中断。其具体实现在 CreateEmitter
类,之前也有提到过。我们来看下 CreateEmitter
的 dispose()
方法:
@Override public void dispose() { DisposableHelper.dispose(this); }
就是调用 DisposableHelper.dispose(this)
而已。
public enum DisposableHelper implements Disposable { DISPOSED ; //其他代码省略 public static boolean isDisposed(Disposable d) { //判断Disposable类型的变量的引用是否等于DISPOSED //即判断该连接器是否被中断 return d == DISPOSED; } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { //这里会把field给设为DISPOSED current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } }
可以看到 DisposableHelper
是一个枚举类,并且只有一个值: DISPOSED
。 dispose()
方法中会把一个原子引用 field
设为 DISPOSED
,即标记为中断状态。因此后面通过 isDisposed()
方法即可以判断连接器是否被中断。
再回头看看 CreateEmitter
类中的方法:
@Override public void onNext(T t) { //省略无关代码 if (!isDisposed()) { //如果没有dispose(),才会调用onNext() observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { //如果dispose()了,会调用到这里,即最终会崩溃 RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { //省略无关代码 if (!isDisposed()) { try { //如果没有dispose(),才会调用onError() observer.onError(t); } finally { //onError()之后会dispose() dispose(); } //如果没有dispose(),返回true return true; } //如果dispose()了,返回false return false; } @Override public void onComplete() { if (!isDisposed()) { try { //如果没有dispose(),才会调用onComplete() observer.onComplete(); } finally { //onComplete()之后会dispose() dispose(); } } }
从上面的代码可以看到:
dispose
, observer.onNext()
才会被调用到。 onError()
和 onComplete()
互斥,只能其中一个被调用到,因为调用了他们的任意一个之后都会调用 dispose()
。 onError()
后 onComplete()
, onComplete()
不会被调用到。反过来,则会崩溃,因为 onError()
中抛出了异常: RxJavaPlugins.onError(t)
。实际上是 dispose
后继续调用 onError()
都会炸。 上面的例子和分析都是在同一个线程中进行,这中间也没涉及到线程切换的相关问题。但是在实际开发中,我们通常需要在一个子线程中去进行一些数据获取操作,然后要在主线程中去更新UI,这就涉及到线程切换的问题了,通过RxJava我们也可以把线程切换写得还简洁。
关于RxJava如何使用线程切换,这里就不详细讲了。 我们直接来看一个例子,并分别打印RxJava在运行过程中各个角色所在的线程。
new Thread() { @Override public void run() { Log.d(TAG, "Thread run() 所在线程为 :" + Thread.currentThread().getName()); Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "Observable subscribe() 所在线程为 :" + Thread.currentThread().getName()); emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "Observer onSubscribe() 所在线程为 :" + Thread.currentThread().getName()); } @Override public void onNext(String s) { Log.d(TAG, "Observer onNext() 所在线程为 :" + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { Log.d(TAG, "Observer onError() 所在线程为 :" + Thread.currentThread().getName()); } @Override public void onComplete() { Log.d(TAG, "Observer onComplete() 所在线程为 :" + Thread.currentThread().getName()); } }); } }.start();
输出结果为:
Thread run() 所在线程为 :Thread-2 Observer onSubscribe() 所在线程为 :Thread-2 Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 Observer onNext() 所在线程为 :main Observer onNext() 所在线程为 :main Observer onComplete() 所在线程为 :main
从上面的例子可以看到:
Observer
(观察者)的 onSubscribe()
方法运行在当前线程中。 Observable
(被观察者)中的 subscribe()
运行在 subscribeOn()
指定的线程中。 Observer
(观察者)的 onNext()
和 onComplete()
等方法运行在 observeOn()
指定的线程中。 下面我们对线程切换的源码进行一下分析,分为两部分: subscribeOn()
和 observeOn()
。
首先来看下 subscribeOn()
,我们的例子中是这么个使用的:
.subscribeOn(Schedulers.io())
subscribeOn()
方法要传入一个 Scheduler
类对象作为参数, Scheduler
是一个调度类,能够延时或周期性地去执行一个任务。
通过 Schedulers
类我们可以获取到各种 Scheduler
的子类。RxJava提供了以下这些线程调度类供我们使用:
Scheduler类型 | 使用方式 | 含义 | 使用场景 |
---|---|---|---|
IoScheduler | Schedulers.io() |
io操作线程 | 读写SD卡文件,查询数据库,访问网络等IO密集型操作 |
NewThreadScheduler | Schedulers.newThread() |
创建新线程 | 耗时操作等 |
SingleScheduler | Schedulers.single() |
单例线程 | 只需一个单例线程时 |
ComputationScheduler | Schedulers.computation() |
CPU计算操作线程 | 图片压缩取样、xml,json解析等CPU密集型计算 |
TrampolineScheduler | Schedulers.trampoline() |
当前线程 | 需要在当前线程立即执行任务时 |
HandlerScheduler | AndroidSchedulers.mainThread() |
Android主线程 | 更新UI等 |
下面我们来看下 Schedulers.io()
的代码,其他的 Scheduler
子类都差不多,就不逐以分析了,有兴趣的请自行查看哈~
@NonNull static final Scheduler IO; @NonNull public static Scheduler io() { //1.直接返回一个名为IO的Scheduler对象 return RxJavaPlugins.onIoScheduler(IO); } static { //省略无关代码 //2.IO对象是在静态代码块中实例化的,这里会创建按一个IOTask() IO = RxJavaPlugins.initIoScheduler(new IOTask()); } static final class IOTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { //3.IOTask中会返回一个IoHolder对象 return IoHolder.DEFAULT; } } static final class IoHolder { //4.IoHolder中会就是new一个IoScheduler对象出来 static final Scheduler DEFAULT = new IoScheduler(); }
可以看到, Schedulers.io()
中使用了静态内部类的方式来创建出了一个单例 IoScheduler
对象出来,这个 IoScheduler
是继承自Scheduler的。这里mark一发,后面会用到这个 IoScheduler
的。
然后,我们就来看下subscribeOn()的代码:
public final Observable<T> subscribeOn(Scheduler scheduler) { //省略无关代码 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
可以看到,首先会将当前的 Observable
(其具体实现为 ObservableCreate
)包装成一个新的 ObservableSubscribeOn
对象。 放个图:
跟前面一样, RxJavaPlugins.onAssembly()
也是将 ObservableSubscribeOn
对象原样返回而已,这里就不看了。 可以看下 ObservableSubscribeOn
的构造方法:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; }
也就是把 source
和 scheduler
这两个保存一下,后面会用到。
然后 subscribeOn()
方法就完了。好像也没做什么,就是重新包装一下对象而已,然后将新对象返回。即将一个旧的被观察者包装成一个新的被观察者。
接下来我们回到订阅过程,为什么要回到订阅过程呢?因为事件的发送是从订阅过程开始的啊。 虽然我们这里用到了线程切换,但是呢,其订阅过程前面的内容跟上一节分析的是一样的,我们这里就不重复了,直接从不一样的地方开始。还记得订阅过程中 Observable
类的 subscribeActual()
是个抽象方法吗?因此要看其子类的具体实现。在上一节订阅过程中,其具体实现是在 ObservableCreate
类。但是由于我们调用 subscribeOn()
之后, ObservableCreate
对象被包装成了一个新的 ObservableSubscribeOn
对象了。因此我们就来看看 ObservableSubscribeOn
类中的 subscribeActual()
方法:
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
subscribeActual()
中同样也将我们自定义的 Observer
给包装成了一个新的 SubscribeOnObserver
对象。同样,放张图:
Observer
的
onSubscribe()
方法,可以看到,到目前为止,还没出现过任何线程相关的东西,所以
Observer
的
onSubscribe()
方法就是运行在当前线程中。 然后我们重点看下最后一行代码,首先创建一个
SubscribeTask
对象,然后就是调用
scheduler.scheduleDirect()
.。 我们先来看下
SubscribeTask
类:
//SubscribeTask是ObservableSubscribeOn的内部类 final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { //这里的source就是我们自定义的Observable对象,即ObservableCreate source.subscribe(parent); } }
很简单的一个类,就是实现了 Runnable
接口,然后 run()
中调用 Observer.subscribe()
。
再来看下 scheduler.scheduleDirect()
方法
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
往下看:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //createWorker()在Scheduler类中是个抽象方法,所以其具体实现在其子类中 //因此这里的createWorker()应当是在IoScheduler中实现的。 //Worker中可以执行Runnable final Worker w = createWorker(); //实际上decoratedRun还是这个run对象,即SubscribeTask final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //将Runnable和Worker包装成一个DisposeTask DisposeTask task = new DisposeTask(decoratedRun, w); //Worker执行这个task w.schedule(task, delay, unit); return task; }
我们来看下创建 Worker
和 Worker
执行任务的过程。
final AtomicReference<CachedWorkerPool> pool; public Worker createWorker() { //就是new一个EventLoopWorker,并且传一个Worker缓存池进去 return new EventLoopWorker(pool.get()); } static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); //构造方法 EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); //从缓存Worker池中取一个Worker出来 this.threadWorker = pool.get(); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { //省略无关代码 //Runnable交给threadWorker去执行 return threadWorker.scheduleActual(action, delayTime, unit, tasks); } }
注意,不同的 Scheduler
类会有不同的 Worker
实现,因为 Scheduler
类最终是交到 Worker
中去执行调度的。
我们来看下 Worker
缓存池的操作:
static final class CachedWorkerPool implements Runnable { ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { //如果缓冲池不为空,就从缓存池中取threadWorker ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } //如果缓冲池中为空,就创建一个并返回。 ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } }
我们再来看下 threadWorker.scheduleActual()
。 ThreadWorker
类没有实现 scheduleActual()
方法,其父类 NewThreadWorker
实现了该方法,我们点进去看下:
public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; volatile boolean disposed; public NewThreadWorker(ThreadFactory threadFactory) { //构造方法中创建一个ScheduledExecutorService对象,可以通过ScheduledExecutorService来使用线程池 executor = SchedulerPoolFactory.create(threadFactory); } public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { //这里的decoratedRun实际还是run对象 Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //将decoratedRun包装成一个新对象ScheduledRunnable ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); //省略无关代码 if (delayTime <= 0) { //线程池中立即执行ScheduledRunnable f = executor.submit((Callable<Object>)sr); } else { //线程池中延迟执行ScheduledRunnable f = executor.schedule((Callable<Object>)sr, delayTime, unit); } //省略无关代码 return sr; } }
这里的 executor
就是使用线程池去执行任务,最终 SubscribeTask
的 run()
方法会在线程池中被执行,即 Observable
的 subscribe()
方法会在IO线程中被调用。这与上面例子中的输出结果符合:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
Observer
(观察者)的 onSubscribe()
方法运行在当前线程中,因为在这之前都没涉及到线程切换。 subscribeOn(指定线程)
,那么 Observable
(被观察者)中 subscribe()
方法将会运行在这个指定线程中去。 来张总的 subscribeOn()
切换线程时序图
如果我们多次设置 subscribeOn()
,那么其执行线程是在哪一个呢?先来看下例子
//省略前后代码,看重点部分 .subscribeOn(Schedulers.io())//第一次 .subscribeOn(Schedulers.newThread())//第二次 .subscribeOn(AndroidSchedulers.mainThread())//第三次
其输出结果为:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
即只有第一次的 subscribeOn()
起作用了。这是为什么呢? 我们知道,每调用一次 subscribeOn()
就会把旧的被观察者包装成一个新的被观察者,经过了三次调用之后,就变成了下面这个样子:
ObservableSubscribeOn
(第一次)那一层时,管你之前是在哪个线程,
subscribeOn(Schedulers.io())
都会把线程切到IO线程中去执行,所以多次设置
subscribeOn()
时,只有第一次生效。
我们再来看下 observeOn()
,还是先来回顾一下我们例子中的设置:
//指定在Android主线程中执行 .observeOn(AndroidSchedulers.mainThread())
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //省略无关代码 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
同样,这里也是新包装一个 ObservableObserveOn
对象,注意,这里包装的旧被观察者是 ObservableSubscribeOn
对象了,因为之前调用过 subscribeOn()
包装了一层了,所以现在是如下图所示:
RxJavaPlugins.onAssembly()
也是原样返回。
我们看看 ObservableObserveOn
的构造方法。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; }
里面就是一些变量赋值而已。
和 subscribeOn()
差不多,我们就直接来看 ObservableObserveOn
的 subscribeActual()
方法了。
@Override protected void subscribeActual(Observer<? super T> observer) { //判断是否当前线程 if (scheduler instanceof TrampolineScheduler) { //是当前线程的话,直接调用里面一层的subscribe()方法 //即调用ObservableSubscribeOn的subscribe()方法 source.subscribe(observer); } else { //创建Worker //本例子中的scheduler为AndroidSchedulers.mainThread() Scheduler.Worker w = scheduler.createWorker(); //这里会将Worker包装到ObserveOnObserver对象中去 //注意:source.subscribe没有涉及到Worker,所以还是在之前设置的线程中去执行 //本例子中source.subscribe就是在IO线程中执行。 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
同样,这里也将 observer
给包装了一层,如下图所示:
source.subscribe()
中将会把事件逐一发送出去,我们这里只看下 ObserveOnObserver
中的 onNext()
方法的处理, onComplete()
等就不看了,实际上都差不多。
@Override public void onNext(T t) { //省略无关代码 if (sourceMode != QueueDisposable.ASYNC) { //将信息存入队列中 queue.offer(t); } schedule(); }
就是调用 schedule()
而已。
void schedule() { if (getAndIncrement() == 0) { //ObserveOnObserver同样实现了Runnable接口,所以就把它自己交给worker去调度了 worker.schedule(this); } }
Android主线程调度器里面的代码就不分析了,里面实际上是用 handler
来发送 Message
去实现的,感兴趣的可以看下。 既然 ObserveOnObserver
实现了 Runnable
接口,那么就是其 run()
方法会在主线程中被调用。 我们来看下 ObserveOnObserver
的 run()
方法:
@Override public void run() { //outputFused默认是false if (outputFused) { drainFused(); } else { drainNormal(); } }
这里会走到 drainNormal()
方法。
void drainNormal() { int missed = 1; //存储消息的队列 final SimpleQueue<T> q = queue; //这里的actual实际上是SubscribeOnObserver final Observer<? super T> a = actual; //省略无关代码 //从队列中取出消息 v = q.poll(); //... //这里调用的是里面一层的onNext()方法 //在本例子中,就是调用SubscribeOnObserver.onNext() a.onNext(v); //... }
至于 SubscribeOnObserver.onNext()
,里面也没切换线程的逻辑,就是调用里面一层的 onNext()
,所以最终会调用到我们自定义的 Observer
中的 onNext()
方法。因此, Observer
的 onNext()
方法就在 observeOn()
中指定的线程中给调用了,在本例中,就是在Android主线程中给调用。