RxJava
核心功能是一个用来完成异步操作的库,相对于其它异步操作的方法, RxJava
的 API
使用更加的简洁。并且 RxJava
中还提供了很多功能强大的操作符,帮助我们解决很多原本复杂繁琐的代码逻辑,提高了代码质量。 RxJava
的实现是基于观察者模式,观察者模式中以下有三个比较重要的概念:
被观察者是事件的发起者,被观察者与观察者建立订阅关系后,被观察者发送事件,观察者才能接收到事件。
RxJava
的基础使用也很简单,分为三个步骤,分别是创建被观察者,创建观察者和建立订阅关系,具体代码如下。
// 1. 创建被观察者 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("string1"); emitter.onNext("string2"); emitter.onNext("string3"); emitter.onComplete(); } }); // 2. 创建观察者 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe"); } @Override public void onNext(String s) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s); } @Override public void onError(Throwable e) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onError"); } @Override public void onComplete() { Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete"); } }; Log.d(getClass().getName(), Thread.currentThread().getName() + " observable:"+observable.getClass().getName()); // 3. 建立订阅关系 observable.subscribe(observer); 复制代码
运行日志:
本文中所有源码基于 RxJava2
的 2.2.11
版本。首先来看看这个基本的订阅流程源码是怎么实现的。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
使用 RxJava
可以通过 Observable
的 create
方法创建一个被观察者对象。 create
方法从参数中传入一个 ObservableOnSubscribe
类型的 source
,然后方法中先校验了 source
是否为空,接着将传入的 source
封装成一个 ObservableCreate
对象,然后调用了 RxJavaPlugins.onAssembly
方法返回创建的好的 Observable
。接着进入 onAssembly
方法查看。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码
onAssembly
方法中首先是一个 Hook
实现,这里可以理解为一个代理。可以看到这里先判断 onObservableAssembly
是否为空,为空就直接返回传入的 source
,否则再调用 apply
方法。这里可以继续跟踪一下 onObservableAssembly
。
@SuppressWarnings("rawtypes") @Nullable static volatile Function<? super Observable, ? extends Observable> onObservableAssembly; /** * Sets the specific hook function. * @param onObservableAssembly the hook function to set, null allowed */ @SuppressWarnings("rawtypes") public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } RxJavaPlugins.onObservableAssembly = onObservableAssembly; } 复制代码
它是 RxJavaPlugins
中的成员变量,默认为空,并且提供了一个 set
方法来设置它。因为默认为空,所以默认返回的就是传入的 source
。这里的代理默认是不会对 Observable
做什么操作,如果需要有特殊的需求可以调用 set
方法实现自己的代理。而默认返回的 source
类型为 ObservableCreate
对象也实现了 Observable
接口。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } ...... } 复制代码
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); } 复制代码
观察者 Observer
是一个接口,其中提供了一些方法,使用时创建接口的实现,并根据需求在方法中做自己的实现。
建立订阅关系调用了 Observable
的 subscribe
方法。
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD ...... } catch (Throwable e) { ...... } } 复制代码
方法中还是先判断了传入参数 observer
是否为空,接着还是一个 Hook
实现,这里就不细究了,获得 Hook
返回的 observer
后再次判断是否为空,之后调用了 subscribeActual
方法。
protected abstract void subscribeActual(Observer<? super T> observer); 复制代码
Observable
的 subscribeActual
方法是个抽象方法,之前看过这里的 Observable
实际实现是个 ObservableCreate
对象,所以再进入 ObservableCreate
类查看对应方法。
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
ObservableCreate
中的 subscribeActual
方法中先创建了一个 CreateEmitter
发射器对象,并将 observer
对象传入。接着调用了 observer
的 onSubscribe
方法,此时观察者的 onSubscribe
方法执行。最后调用了 source
的 subscribe
方法。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("string1"); emitter.onNext("string2"); emitter.onNext("string3"); emitter.onComplete(); } }); 复制代码
这个 source
就是在 create
方法中传入的 ObservableOnSubscribe
。它的 subscribe
方法中通过调用 ObservableEmitter
的方法发送事件,这里的 ObservableEmitter
就是之前创建的 CreateEmitter
对象,所以再来进一步看看它其中的方法。
CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } 复制代码
CreateEmitter
的构造函数接收了观察者对象,然后在调用 onNext
方法时先做了空判断,再对 isDisposed
进行取消订阅的判断,之后调用了 observer
的 onNext
方法,也就是观察者的 onNext
方法。同样的 onComplete
中最终也是调用了 observer
的 onComplete
方法。至此 RxJava
中的基本订阅流程的源码就梳理完了。
RxJava
中有个很重要的功能,就是能方便的切换线程,来看下它的使用,还是之前基础使用中的例子进行修改。
Observable<String> observable0 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("string1"); emitter.onNext("string2"); emitter.onNext("string3"); emitter.onComplete(); } }); Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe"); } @Override public void onNext(String s) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s); } @Override public void onError(Throwable e) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onError"); } @Override public void onComplete() { Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete"); } }; Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName()); Observable<String> observable2 = observable1.observeOn(AndroidSchedulers.mainThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName()); observable2.subscribe(observer); 复制代码
被观察者和观察者的创建和之前一样,在建立订阅关系时调用 subscribeOn
和 observeOn
方法进行线程的切换。这里每个方法返回的都是 Observable
类型,所以可以采用链式调用,这也是 RxJava
的一个特点,但是这里没有采用这种写法,而是将其拆分开来写并且日志打印出每个 Observable
的具体类型,这是为了方便之后源码理解。
运行结果日志:
Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName()); observable1.subscribe(observer); 复制代码
运行结果:
先只调用subscribeOn
方法运行查看结果,发现不仅被观察者发射事件运行在了子线程,观察者接收事件也运行在子线程,那么进入 subscribeOn
方法查看它的实现。
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
可以看到 subscribeOn
方法和 subscribe
方法有些类似。首先是判断传入的 scheduler
是否为空,然后同样调用 RxJavaPlugins.onAssembly
方法,这次构建了一个 ObservableSubscribeOn
对象返回。而 subscribeOn
方法之后还是调用了 subscribe
方法,根据之前的分析, subscribe
方法最终会调用到 subscribeActual
方法,不过此时的 subscribeActual
方法不再是 ObservableCreate
中的而是 ObservableSubscribeOn
中的 subscribeActual
方法。
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } 复制代码
ObservableSubscribeOn
的 subscribeActual
方法中流程和之前的也很类似,这次是先创建了一个 SubscribeOnObserver
对象,将观察者对象传入,接着同样先调用了 observer.onSubscribe
方法,然后将传入的 SubscribeOnObserver
封装入了一个 SubscribeTask
对象中,接着调用了 scheduler.scheduleDirect
方法再将返回结果得到的 Disposable
设置到 SubscribeOnObserver
中。下面一个方法一个方法看。首先是创建 SubscribeTask
。
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } } 复制代码
SubscribeTask
是 ObservableSubscribeOn
的内部类,其实现很简单就是实现了一个 Runnable
接口,构造方法中传入了 SubscribeOnObserver
对象,在其 run
方法中调用了 ObservableSubscribeOn
中的成员变量 source
的 subscribe
方法。这个 source
是在创建 ObservableSubscribeOn
时传入的,根据前面的代码可以找到是在 subscribeOn
方法中创建的对象并且这个 source
对应传入的是当前这个 Observable
对象即通过 Observable.create
获得的被观察者对象,其实现之前看过是一个 ObservableCreate
所以这里就和之前一样又会走到了其父类 Observable
的 subscribe
方法中,继而调用 ObservableCreate
的 subscribeActual
方法,之后最终会调用到观察者的对应 onNext
等方法,不过此时的观察者不直接是在使用时创建传入的 Observer
,而是之前看到的 SubscribeOnObserver
类型,不过其中的 onNext
等方法还是调用了在使用时创建传入的 Observer
的对应方法。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> downstream; final AtomicReference<Disposable> upstream; SubscribeOnObserver(Observer<? super T> downstream) { this.downstream = downstream; this.upstream = new AtomicReference<Disposable>(); } @Override public void onNext(T t) { downstream.onNext(t); } @Override public void onError(Throwable t) { downstream.onError(t); } @Override public void onComplete() { downstream.onComplete(); } ...... } 复制代码
下面接着看到 scheduleDirect
这个方法,在创建好 SubscribeTask
之后调用了 scheduleDirect
方法。这里的 scheduler
就是 subscribeOn
中传入的,对应开始例子中的 Schedulers.newThread
。
public static Scheduler newThread() { return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD); } // 静态成员变量NEW_THREAD static final Scheduler NEW_THREAD; NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); 复制代码
进入 Schedulers.newThread
一步步跟踪,看到 newThread
方法返回静态成员变量中的 NEW_THREAD
,而 NEW_THREAD
又是通过 NewThreadTask
创建。
static final class NewThreadTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { return NewThreadHolder.DEFAULT; } } static final Scheduler DEFAULT = new NewThreadScheduler(); 复制代码
继续跟踪查看发现 NewThreadTask
实际是实现了 Callable
接口,其 call
方法中返回了静态内部类中的 NewThreadHolder.DEFAULT
。这个 DEFAULT
的实现类型为 NewThreadScheduler
。至此终于找到了我们传入的 Scheduler
的真正实现类。于是继续看其 scheduleDirect
方法。
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; } 复制代码
scheduleDirect
方法是在其父类中实现的,看到其中进而调用了同名重载方法,方法中首先是调用 createWorker
方法创建一个 Worker
。这个方法的实现就是在 NewThreadScheduler
中了。
public Worker createWorker() { return new NewThreadWorker(threadFactory); } 复制代码
createWorker
方法中只做了一件事就是创建返回了一个 NewThreadWorker
。
public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; volatile boolean disposed; public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } ...... } 复制代码
NewThreadWorker
中看到创建了一个线程池,再回到 scheduleDirect
方法,创建完 Worker
后将传入的 Runnable
即 SubscribeTask
进行一个装饰得到新的 Runnable
对象。接着将 Worker
和新的 Runnable
封装到一个 DisposeTask
对象中。
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection { @NonNull final Runnable decoratedRun; @NonNull final Worker w; @Nullable Thread runner; DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) { this.decoratedRun = decoratedRun; this.w = w; } @Override public void run() { runner = Thread.currentThread(); try { decoratedRun.run(); } finally { dispose(); runner = null; } } ...... } 复制代码
DisposeTask
同样实现了 Runnable
接口,在 run
方法中调用了从构造传入的 decoratedRun
的 run
方法执行任务。回到最后一步,调用 Worker
的 schedule
方法,这里就对应的 NewThreadWorker
的 schedule
方法。
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } return scheduleActual(action, delayTime, unit, null); } 复制代码
schedule
方法中又进一步调用了其 scheduleActual
方法。
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } 复制代码
scheduleActual
方法里看到又将 decoratedRun
和 DisposableContainer
封装成 ScheduledRunnable
最后将这个 ScheduledRunnable
交给构造函数中创建的线程池去运行,最终就会执行到前面看过的 SubscribeTask
中的 run
方法完成订阅逻辑,调用观察者的 onNext
等方法。到这里就看出最终的 source.subscribe
是会通过线程池切换到子线程中去执行了。
通过查看 subscribeOn
方法源码可以发现,方法里实际上是在前一个创建的 ObservableCreate
外面包了一层,把它包成一个 ObservableSubscribeOn
对象,同样的原先的 Observer
也被包了一层包成一个 SubscribeOnObserver
对象,而线程切换的工作是由 Scheduler
完成的。
接着再来看看切换回主线程的方法 observeOn
,还是先修改使用代码,查看运行日志。
Observable<String> observable2 = observable0.observeOn(AndroidSchedulers.mainThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName()); observable2.subscribe(observer); 复制代码
运行日志:
接着还是进入来看源码。
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 复制代码
这里看到 observeOn
方法里调用了重载方法,方法中还是同一个套路,不过这里创建的又是另一个对象 ObservableObserveOn
了。根据前面的经验这里就又是将前一个 Observable
传递到 ObservableObserveOn
中的成员变量 source
上,这里看到就是构造函数中的第一个参数。接着还是会调用 subscribe
与观察者建立订阅关系进而会执行到 ObservableObserveOn
对象的 subscribeActual
方法。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 复制代码
subscribeActual
方法中判断了 scheduler
的类型,这里的 scheduler
就是由 AndroidSchedulers.mainThread()
传入的,于是先来看一下这个方法。
public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } }); private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false); } 复制代码
从 mainThread
开始看,发现代码调用逻辑和之前的 Schedulers.newThread
方法类似,最终会返回一个 HandlerScheduler
而这个 Scheduler
中的 Handler
则是主线程的 Handler
,看到这里就能猜想到了,后面观察者的对应方法一定是由这个 Handler
来切换到主线程执行的。回到 subscribeActual
方法。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 复制代码
这里判断完类型会走 else
中的方法首先还是会调用 HandlerScheduler
的 createWorker
方法创建一个 Worker
。
@Override public Worker createWorker() { return new HandlerWorker(handler, async); } 复制代码
这里是个 HandlerWorker
其中具体方法后面再看。接着上面创建完 Worker
后同样还是一样调用 source.subscribe
创建了一个 ObserveOnObserver
对象传入。这里的 source
就还是之前的 ObservableCreate
,所以这里还是会调用 ObservableCreate
中的 subscribeActual
方法。
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
ObservableCreate
中的 subscribeActual
方法中的逻辑之前看过,不过此时传入的 observer
仍然不再是在使用时创建的观察者对象了,而是传过来的 ObserveOnObserver
对象,此时创建的 CreateEmitter
中的 observer
也就是这个 ObserveOnObserver
对象。和之前逻辑一样,接着就会调用 observer
的 onNext
等方法,此时调用的即是 ObserveOnObserver
中的 onNext
等方法。所以进入 ObserveOnObserver
查看。
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } @Override public void onComplete() { if (done) { return; } done = true; schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } 复制代码
查看 ObserveOnObserver
中的代码会发现 onNext
方法中先将传入的参数放入了一个队列,然后无论是 onNext
还是 onComplete
方法最后都调用了 schedule
方法,进而再进入查看,发现 schedule
方法中又调用了 worker.schedule
方法。这里的 worker
就是之前创建的 HandlerWorker
,这时再来看它的 schedule
方法。
public Disposable schedule(@NonNull Runnable run) { return schedule(run, 0L, TimeUnit.NANOSECONDS); } 复制代码
单个参数 schedule
方法是在其父类中的,而这个方法中又调用另一个三个参数的 schedule
方法,这个方法父类中是抽象方法所以实现就在子类 HandlerWorker
里了。
@Override @SuppressLint("NewApi") // Async will only be true when the API is available to call. public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); // 判断是否取消订阅 if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); // 创建ScheduledRunnable ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); // 创建消息,并将主线程Handler和ScheduledRunnable Message message = Message.obtain(handler, scheduled); message.obj = this; // 判断设置异步消息 if (async) { message.setAsynchronous(true); } // 发送消息执行callback handler.sendMessageDelayed(message, unit.toMillis(delay)); // 检查是否取消订阅 if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } 复制代码
在子类的这个方法里在做了取消订阅的判断后将方法传入的 Runnable
和 Handler
又封装到一个 ScheduledRunnable
对象中。接着创建了一个 Message
并将 ScheduledRunnable
放入 Message
,最后调用 handler.sendMessageDelayed
方法通过这个主线程的 Handler
执行这个 ScheduledRunnable
。
最后来追溯下 ScheduledRunnable
到底执行了什么,不过猜也知道最后一定调用到观察者中的对应方法。
private static final class ScheduledRunnable implements Runnable, Disposable { private final Handler handler; private final Runnable delegate; private volatile boolean disposed; // Tracked solely for isDisposed(). ScheduledRunnable(Handler handler, Runnable delegate) { this.handler = handler; this.delegate = delegate; } @Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } } ...... } 复制代码
ScheduledRunnable
中的 run
方法很简单就是调用了构造中传入的 Runnable
的 run
方法。而根据之前看过得创建 ScheduledRunnable
时传入的 Runnable
又是从 scheduleDirect
方法中传入的,而 scheduleDirect
方法中的 Runnable
又是从 worker.schedule(this)
方法时传入的,根据上下文代码发现这个 this
指代的是 ObserveOnObserver
对象,于是进一步进入它的 run
方法查看。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ...... ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.downstream = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } ...... @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } ...... } 复制代码
可以看到 run
方法中判断了 outputFused
的真假,然后分别调用了 drainFused
和 drainNormal
方法。这里的 outputFused
是与 RxJava2
中的背压处理相关暂时先不管,根据方法名也能知道正常调用会执行 drainNormal
方法,于是直接来看 drainNormal
方法。
void drainNormal() { int missed = 1; // 存放onNext传入的事件对象队列 final SimpleQueue<T> q = queue; // 传入的观察者对象 final Observer<? super T> a = downstream; // 循环check事件是否完成或者发生错误 for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { // 从队列中取出发送事件传入的对象 v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; // 再次判断是否完成或者发生错误 if (checkTerminated(d, empty, a)) { return; } // 判断队列中取出的发送事件传入的对象v是否为空 if (empty) { break; } // 执行观察者对象的onNext方法 a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } 复制代码
drainNormal
方法中先通过 checkTerminated
方法校验发送事件是否完成或者发生异常,接着从队列中取出事件对象,再次判断是否完成或者发生错误和取出的对象是否为空,没有问题的话就会执行观察者的 onNext
方法。而发送完成和出现异常的方法则是在 checkTerminated
方法处理。
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) { if (disposed) { queue.clear(); return true; } if (d) { Throwable e = error; if (delayError) { if (empty) { disposed = true; if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { if (e != null) { disposed = true; queue.clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { disposed = true; a.onComplete(); worker.dispose(); return true; } } } return false; } 复制代码
在 checkTerminated
方法里根据 delayError
判断是否设置了超时的错误,接着再根据获得的错误 e
是否为空再决定调用的是观察者的 onError()
方法还是 onComplete
方法。至此 observeOn
切换线程的流程也梳理结束。
RxJava
中有很多功能强大的操作符,通过使用这些操作符,可以很容易的解决代码编写时遇到的一些复杂繁琐的问题。这里就用 map
操作符来作为一个例子,来看看操作符是怎样工作的。首先还是来了解 map
操作符的使用方法和作用。
final Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("5"); emitter.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe"); } @Override public void onNext(Integer i) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+i); } @Override public void onError(Throwable e) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onError"); } @Override public void onComplete() { Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete"); } }; Observable<Integer> mapObservable = observable.map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s); } }); Log.d(getClass().getName(), Thread.currentThread().getName() + " mapObservable:"+mapObservable.getClass().getName()); mapObservable.subscribe(observer); 复制代码
运行日志:
map
操作符作用是可以将被观察者发送事件的数据类型转换成其他的数据类型。它的使用方法很简单,例如上面这个例子就将一开始发送的 String
类型转换成观察者接收到的 Integer
类型。下面开始看 map
方法的源码。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } 复制代码
看到 map
方法中依旧还是同样的套路,通过 RxJavaPlugins.onAssembly
方法返回一个被观察者对象,只不过这次构建传入的类型又是另一个 ObservableMap
类型的对象。订阅的流程前面已经看过了,这里和之前的一样最终会走到 ObservableMap
的 subscribeActual
方法,所以直接来看这个方法。
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } 复制代码
ObservableMap
的 subscribeActual
方法里看到很熟悉还是会调用 source.subscribe
方法,只是这里传入的 Observer
对象是一个 MapObserver
对象。接下来的逻辑又和之前一样,根据之前的经验 source.subscribe
方法最终会调用 Observer
的 onNext
方法,所以接下来直接来看 MapObserver
的 onNext
方法。
public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); } 复制代码
MapObserver
的 onNext
方法里的逻辑很简单,在做了一些的判断后调用 mapper.apply(t)
方法获得类型转换后的事件传递对象,最后就会调用观察者的 downstream.onNext
方法,这里的 downstream
就是订阅方法传入的观察者对象。跟踪 mapper
可以找到,它是从 MapObserver
构造时传入的一个 Function
类型,即是在使用 map
操作符时传入的那个 Function
对象,又因为在使用时实现了 Function
的 apply
方法完成了数据的类型转换逻辑,所以这里调用 mapper.apply(t)
方法就可以获得到转换后的数据。
以上就是关于 RxJava
源码工作流程的相关总结,总而言之,观察者模式还是其核心设计思想。除此之外,通过源码阅读还发现,无论在线程切换方面还是其它功能的操作符的实现,根本上来说都是在其原有的被观察者或观察者基础上包装成一个新的对象,功能逻辑由新对象中的方法来实现完成。