微信公众号: BaronTalk
通过前面五个篇幅的介绍,相信大家对RxJava的基本使用以及操作符应该有了一定的认识。但是知其然还要知其所以然;所以从这一章开始我们聊聊源码,分析RxJava的实现原理。本文我们主要从三个方面来分析RxJava的实现:
本章节基于 RxJava1.1.9 版本的源码
在 RxJava系列2(基本概念及使用介绍) 中我们介绍过,一个最基本的RxJava调用是这样的:
<!-- more -->
Observable.create(new Observable.OnSubscribe<String>() { public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava!"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { public void onCompleted() { System.out.println("completed!"); } public void onError(Throwable e) { } public void onNext(String s) { System.out.println(s); } });
首先调用 Observable.create()
创建一个被观察者 Observable
,同时创建一个 OnSubscribe
作为 create()
方法的入参;接着创建一个观察者 Subscriber
,然后通过 subseribe()
实现二者的订阅关系。这里涉及到三个关键对象和一个核心的方法:
OnSubscribe.call()
可以看做是 观察者模式 中被观察者用来通知观察者的 notifyObservers()
方法) 首先我们来看看 Observable.create()
的实现:
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
这里创建了一个被观察者 Observable
,同时将 RxJavaHooks.onCreate(f)
作为构造函数的参数,源码如下:
protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
我们看到源码中直接将参数 RxJavaHooks.onCreate(f)
赋值给了当前我们构造的被观察者 Observable
的成员变量 onSubscribe
。那么 RxJavaHooks.onCreate(f)
返回的又是什么呢?我们接着往下看:
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<OnSubscribe, OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }
由于我们并没调用 RxJavaHooks.initCreate()
,所以上面代码中的 onObservableCreate
为null;因此 RxJavaHooks.onCreate(f)
最终返回的就是 f
,也就是我们在 Observable.create()
的时候new出来的 OnSubscribe
。( 由于对RxJavaHooks的理解并不影响我们对RxJava执行流程的分析,因此在这里我们不做进一步的探讨。为了方便理解我们只需要知道RxJavaHooks一系列方法的返回值就是入参本身就OK了,例如这里的 RxJavaHooks.onCreate(f)
返回的就是 f
)。
至此我们做下逻辑梳理: Observable.create()
方法构造了一个被观察者 Observable
对象,同时将new出来的 OnSubscribe
赋值给了该 Observable
的成员变量 onSubscribe
。
接着我们看下观察者 Subscriber
的源码,为了增加可读性,我去掉了源码中的注释和部分代码。
public abstract class Subscriber<T> implements Observer<T>, Subscription { private final SubscriptionList subscriptions;//订阅事件集,所有发送给当前Subscriber的事件都会保存在这里 ... protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } ... public final void unsubscribe() { subscriptions.unsubscribe(); } public final boolean isUnsubscribed() { return subscriptions.isUnsubscribed(); } public void onStart() { } ... }
public interface Subscription { void unsubscribe(); boolean isUnsubscribed(); }
Subscriber
实现了 Subscription
接口,从而对外提供 isUnsubscribed()
和 unsubscribe()
方法。前者用于判断是否已经取消订阅;后者用于将订阅事件列表( 也就是当前观察者的成员变量 subscriptions
)中的所有 Subscription
取消订阅,并且不再接受观察者 Observable
发送的后续事件。
前面我们分析了观察者和被观察者相关的源码,那么接下来便是整个订阅流程中最最关键的环节了。
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber); } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... return Subscriptions.unsubscribed(); } }
subscribe()
方法中将传进来的 subscriber
包装成了 SafeSubscriber
, SafeSubscriber
其实是 subscriber
的一个代理,对 subscriber
的一系列方法做了更加严格的安全校验。保证了 onCompleted()
和 onError()
只会有一个被执行且只执行一次,一旦它们其中方法被执行过后 onNext()
就不在执行了。
上述代码中最关键的就是 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)
。这里的RxJavaHooks和之前提到的一样, RxJavaHooks.onObservableStart(observable, observable.onSubscribe)
返回的正是他的第二个入参 observable.onSubscribe
,也就是当前 observable
的成员变量 onSubscribe
。而这个成员变量我们前面提到过,它是我们在 Observable.create()
的时候new出来的。所以这段代码可以简化为 onSubscribe.call(subscriber)
。这也印证了我在 RxJava系列2(基本概念及使用介绍) 中说的, onSubscribe.call(subscriber)
中的 subscriber
正是我们在 subscribe()
方法中new出来的观察者。
到这里,我们对RxJava的执行流程做个总结:首先我们调用 crate()
创建一个观察者,同时创建一个 OnSubscribe
作为该方法的入参;接着调用 subscribe()
来订阅我们自己创建的观察者 Subscriber
。
一旦调用 subscribe()
方法后就会触发执行 OnSubscribe.call()
。然后我们就可以在call方法调用观察者 subscriber
的 onNext()
, onCompleted()
, onError()
。
最后我用张图来总结下之前的分析结果:
之前我们介绍过几十个操作符,要一一分析它们的源码显然不太现实。在这里我抛砖引玉,选取一个相对简单且常用的 map
操作符来分析。
我们先来看一个 map
操作符的简单应用:
Observable.create(new Observable.OnSubscribe<Integer>() { public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }).map(new Func1<Integer, String>() { public String call(Integer integer) { return "This is " + integer; } }).subscribe(new Subscriber<String>() { public void onCompleted() { System.out.println("onCompleted!"); } public void onError(Throwable e) { System.out.println(e.getMessage()); } public void onNext(String s) { System.out.println(s); } });
为了便于表述,我将上面的代码做了如下拆解:
Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() { public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }); Subscriber<String> subscriberOne = new Subscriber<String>() { public void onCompleted() { System.out.println("onCompleted!"); } public void onError(Throwable e) { System.out.println(e.getMessage()); } public void onNext(String s) { System.out.println(s); } }; Observable<String> observableB = observableA.map(new Func1<Integer, String>() { public String call(Integer integer) { return "This is " + integer;; } }); observableB.subscribe(subscriberOne);
map()
的源码和上一小节介绍的 create()
一样位于 Observable
这个类中。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return create(new OnSubscribeMap<T, R>(this, func)); }
通过查看源码我们发现调用 map()
的时候实际上是创建了一个新的被观察者 Observable
,我们姑且称它为 ObservableB
;一开始通过 Observable.create()
创建的 Observable
我们称之为 ObservableA
。在创建 ObservableB
的时候同时创建了一个 OnSubscribeMap
,而 ObservableA
和变换函数 Func1
则作为构造 OnSubscribeMap
的参数。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { final Observable<T> source;//ObservableA final Func1<? super T, ? extends R> transformer;//map操作符中的转换函数Func1。T为转换前的数据类型,在上面的例子中为Integer;R为转换后的数据类型,在该例中为String。 public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } public void call(final Subscriber<? super R> o) {//结合第一小节的分析结果,我们知道这里的入参o其实就是我们自己new的观察者subscriberOne。 MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); source.unsafeSubscribe(parent); } static final class MapSubscriber<T, R> extends Subscriber<T> { final Subscriber<? super R> actual;//这里的actual就是我们在调用subscribe()时创建的观察者mSubscriber final Func1<? super T, ? extends R> mapper;//变换函数 boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } public void onError(Throwable e) { ... actual.onError(e); } public void onCompleted() { ... actual.onCompleted(); } public void setProducer(Producer p) { actual.setProducer(p); } } }
OnSubscribeMap
实现了 OnSubscribe
接口,因此 OnSubscribeMap
就是一个 OnSubscribe
。在调用 map()
的时候创建了一个新的被观察者 ObservableB
,然后我们用 ObservableB.subscribe(subscriberOne)
订阅了观察者 subscriberOne
。结合我们在第一小节的分析结果,所以 OnSubscribeMap.call(o)
中的 o
就是 subscribe(subscriberOne)
中的 subscriberOne
;一旦调用了 ObservableB.subscribe(subscriberOne)
就会执行 OnSubscribeMap.call()
。
在 call()
方法中,首先通过我们的观察者 o
和转换函数 transformer
构造了一个 MapSubscriber
,最后调用了 source
也就是 observableA
的 unsafeSubscribe()
方法。即 observableA
订阅了一个观察者 MapSubscriber
。
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { ... RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... return Subscriptions.unsubscribed(); } }
上面这段代码最终执行了 onSubscribe
也就是 OnSubscribeMap
的 call()
方法, call()
方法中的参数就是之前在 OnSubscribeMap.call()
中new出来的 MapSubscriber
。最后在 call()
方法中执行了我们自己的业务代码:
subscriber.onNext(1); subscriber.onCompleted();
其实也就是执行了 MapSubscriber
的 onNext()
和 onCompleted()
。
public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { ... return; } actual.onNext(result); }
onNext(T t)
方法中的的 mapper
就是变换函数, actual
就是我们在调用 subscribe()
时创建的观察者 subscriberOne
。这个 T
就是我们例子中的 Integer
, R
就是 String
。在 onNext()
中首先调用变换函数 mapper.call()
将 T
转换成 R
(在我们的例子中就是将 Integer
类型的 1 转换成了 String
类型的 “This is 1” );接着调用 subscriberOne.onNext(String result)
。同样在调用 MapSubscriber.onCompleted()
时会执行 subscriberOne.onCompleted()
。这样就完成了一直完成的调用流程。
我承认太啰嗦了,花费了这么大的篇幅才将 map()
的转换原理解释清楚。我也是希望尽量的将每个细节都呈现出来方便大家理解,如果看我啰嗦了这么久还是没能理解,请看下面我画的这张执行流程图。
在前面的文章中我介绍过RxJava可以很方便的通过 subscribeOn()
和 observeOn()
来指定数据流的每一部分运行在哪个线程。其中 subscribeOn()
指定了处理 Observable
的全部的过程(包括发射数据和通知)的线程; observeOn()
指定了观察者的 onNext()
, onError()
和 onCompleted()
执行的线程。接下来我们就分析分析源码,看看线程调度是如何实现的。
在分析源码前我们先看看一段常见的通过RxJava实现的线程调度代码:
Observable.create(new Observable.OnSubscribe<String>() { public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava!"); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { public void onCompleted() { System.out.println("completed!"); } public void onError(Throwable e) { } public void onNext(String s) { System.out.println(s); } });
public final Observable<T> subscribeOn(Scheduler scheduler) { ... return create(new OperatorSubscribeOn<T>(this, scheduler)); }
通过上面的代码我们可以看到, subscribeOn()
和 map()
一样是创建了一个新的被观察者 Observable
。因此我大致就能猜到 subscribeOn()
的执行流程应该和 map()
差不多, OperatorSubscribeOn
肯定也是一个 OnSubscribe
。那我们接下来就看看 OperatorSubscribeOn
的源码:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler;//线程调度器,用来指定订阅事件发送、处理等所在的线程 final Observable<T> source; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { public void onNext(T t) { subscriber.onNext(t); } public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }); } }
OperatorSubscribeOn
实现了 OnSubscribe
接口, call()
中对 Subscriber
的处理也和 OperatorMap
对 Subscriber
的处理类似。首先通过 scheduler
构建了一个 Worker
;然后用传进来的 subscriber
构造了一个新的 Subscriber s
,并将 s
丢到 Worker.schedule()
中来处理;最后用原 Observable
去订阅观察者 s
。而这个 Worker
就是线程调度的关键!前面的例子中我们通过 subscribeOn(Schedulers.io())
指定了 Observable
发射处理事件以及通知观察者的一系列操作的执行线程,正是通过这个 Schedulers.io()
创建了我们前面提到的 Worker
。所以我们来看看 Schedulers.io()
的实现。
首先通过 Schedulers.io()
获得了 ioScheduler
并返回,上面的 OperatorSubscribeOn
通过这个的 Scheduler
的 createWorker()
方法创建了我们前面提到的 Worker
。
public static Scheduler io() { return RxJavaHooks.onIOScheduler(getInstance().ioScheduler); }
接着我们看看这个 ioScheduler
是怎么来的,下面的代码向我们展现了是如何在 Schedulers
的构造函数中通过 RxJavaSchedulersHook.createIoScheduler()
来初始化 ioScheduler
的。
private Schedulers() { ... Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); } ... }
最终 RxJavaSchedulersHook.createIoScheduler()
返回了一个 CachedThreadScheduler
,并赋值给了 ioScheduler
。
public static Scheduler createIoScheduler() { return createIoScheduler(new RxThreadFactory("RxIoScheduler-")); }
public static Scheduler createIoScheduler(ThreadFactory threadFactory) { ... return new CachedThreadScheduler(threadFactory); }
到这一步既然我们知道了 ioScheduler
就是一个 CachedThreadScheduler
,那我们就来看看它的 createWorker()
的实现。
public Worker createWorker() { return new EventLoopWorker(pool.get()); }
上面的代码向我们赤裸裸的呈现了前面 OperatorSubscribeOn
中的 Worker
其实就是 EventLoopWorker
。我们重点要关注的是他的 scheduleActual()
。
static final class EventLoopWorker extends Scheduler.Worker implements Action0 { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once; EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.once = new AtomicBoolean(); this.threadWorker = pool.get(); } ... public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { ... ScheduledAction s = threadWorker.scheduleActual(new Action0() { public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; } }
通过对源码的一步步追踪,我们知道了前面 OperatorSubscribeOn.call()
中的 inner.schedule()
最终会执行到 ThreadWorker
的 scheduleActual()
方法。
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = RxJavaHooks.onScheduledAction(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future<?> f; if (delayTime <= 0) { f = executor.submit(run); } else { f = executor.schedule(run, delayTime, unit); } run.add(f); return run; }
scheduleActual()
中的 ScheduledAction
实现了 Runnable
接口,通过线程池 executor
最终实现了线程切换。上面便是 subscribeOn(Schedulers.io())
实现线程切换的全部过程。
observeOn()
切换线程是通过 lift
来实现的,相比 subscribeOn()
在实现原理上相对复杂些。不过本质上最终还是创建了一个新的 Observable
。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ... return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); } public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return create(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
OperatorObserveOn
作为 OnSubscribeLift
构造函数的参数用来创建了一个新的 OnSubscribeLift
对象,接下来我们看看 OnSubscribeLift
的实现:
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); try { st.onStart(); parent.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); o.onError(e); } } }
OnSubscribeLift
继承自 OnSubscribe
,通过前面的分析我们知道一旦调用了 subscribe()
将观察者与被观察绑定后就会触发被观察者所对应的 OnSubscribe
的 call()
方法,所以这里会触发 OnSubscribeLift.call()
。在 call()
中调用了 OperatorObserveOn.call()
并返回了一个新的观察者 Subscriber st
,接着调用了前一级 Observable
对应 OnSubscriber.call(st)
。
我们再看看 OperatorObserveOn.call()
的实现:
public Subscriber<? super T> call(Subscriber<? super T> child) { ... ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; }
OperatorObserveOn.call()
中创建了一个 ObserveOnSubscriber
并调用 init()
进行了初始化。
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { ... public void onNext(final T t) { ... schedule(); } public void onCompleted() { ... schedule(); } public void onError(final Throwable e) { ... schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } public void call() { long missed = 1L; long currentEmission = emitted; final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; for (;;) { long requestAmount = requested.get(); while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); currentEmission++; if (currentEmission == limit) { requestAmount = BackpressureUtils.produced(requested, currentEmission); request(currentEmission); currentEmission = 0L; } } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } emitted = currentEmission; missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } } ... }
ObserveOnSubscriber
继承自 Subscriber
,并实现了 Action0
接口。我们看到 ObserveOnSubscriber
的 onNext()
、 onCompleted()
、 onError()
都有个 schedule()
,这个方法就是我们线程调度的关键;通过 schedule()
将新观察者 ObserveOnSubscriber
发送给 subscriberOne
的所有事件都切换到了 recursiveScheduler
所对应的线程,简单的说就是把 subscriberOne
的 onNext()
、 onCompleted()
、 onError()
方法丢到了 recursiveScheduler
对应的线程中来执行。
那么 schedule()
又是如何做到这一点的呢?他内部调用了 recursiveScheduler.schedule(this)
, recursiveScheduler
其实就是一个 Worker
,和我们在介绍 subscribeOn()
时提到的 worker
一样,执行 schedule()
实际上最终是创建了一个 runable
,然后把这个 runnable
丢到了特定的线程池中去执行。在 runnable
的 run()
方法中调用了 ObserveOnSubscriber.call()
,看上面的代码大家就会发现在 call()
方法中最终调用了 subscriberOne
的 onNext()
、 onCompleted()
、 onError()
方法。这便是它实现线程切换的原理。
好了,我们最后再看看 示例C 对应的执行流程图,帮助大家加深理解。
这一章以 执行流程 、 操作符实现 以及 线程调度 三个方面为切入点剖析了RxJava源码。下一章将站在更宏观的角度来分析整个RxJava的框架结构、设计思想等等。敬请期待~~ :)
如果你喜欢我的文章,就关注下我的公众号 BaronTalk 、 知乎专栏 或者在 GitHub 上添个 Star 吧!