本文中的源码基于 RxJava1
RxJava : io.reactivex:rxjava:1.3.4
以下是各个 Part 主要分析源码的方向
先来看一个最简单的 RxJava Demo
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hi rxjava"); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String s) { System.out.println(s); } });
那么,我们就从 Observable.create 的角度开始分析 RxJava 内部源码的实现。
public class Observable<T> { @Deprecated public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); } ... }
可以看到,我们上面传入 OnSubscribe 对象 f 被 RxJavaHooks 包装了一下。但是默认情况下的 RxJavaHooks.onCreate 返回的就是 f 本身。
public final class RxJavaHooks { public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; } }
接着,根据上面 demo 的代码可以看出,创建出来的 Observable 对象又调用了 subscribe 方法。
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
在 subscribe 内部调用了 subscribe(Subscriber<? super T> subscriber, Observable<T> observable)
方法
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // 一开始是对参数的校验 if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // 调用 subscriber.onStart 方法,默认是空实现 subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped // 把 subscriber 包装成 SafeSubscriber if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // 开始执行 onSubscribe 的 onCall 方法 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { // 如果发生了异常,就调用 subscriber 的 onError 方法 try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }
我们详细的来看下 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)
这句代码。
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { return f.call(instance, onSubscribe); } return onSubscribe; }
和上面的 RxJavaHooks 一样,默认的 hook 只是返回了 onSubscribe 对象。所以这句代码就可以“简化”为 onSubscribe.call(subscriber)
。
也就是执行了 demo 中的 call 方法。
@Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hi rxjava"); }
而在 call 方法中又调用了 subscriber 的 onNext 方法。
还记得上面 subscriber 被包装成 SafeSubscriber 了吗?
所以这里就会调用 SafeSubscriber.onNext 方法。
@Override public void onNext(T t) { try { if (!done) { // actual就是我们自己定义的subscriber actual.onNext(t); } } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwOrReport(e, this); } }
在 SafeSubscriber.onNext 方法内,会调用真正的 subscriber.onNext 方法。SafeSubscriber 的作用就是是为了防止被调用 onCompleted 之后再重新调用 onNext 。换句话说,SafeSubscriber 就是为了防止重用。
因此,subscriber.onNext 也就被执行了。
@Override public void onNext(String s) { System.out.println(s); }
在这里,我们把上面简单的 demo 稍微增加一点难度,中间加一个转换:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hi rxjava"); } }) .map(new Func1<String, String>() { @Override public String call(String s) { return s + "hahaha"; } }) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String s) { System.out.println(s); } });
可以看到,中间加了一层 map 操作符。
所以我们来分析一下 map 中到底干了什么。
public class Observable<T> { public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return unsafeCreate(new OnSubscribeMap<T, R>(this, func)); } }
map 中调用了 unsafeCreate 方法。我们来看看 unsafeCreate 方法内部
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
这代码多么似曾相识啊,和上面的 Observable.create 比较一下,发现 Observable.unsafeCreate 和 Observable.create 的逻辑是一样的。
所以我们可以知道, map 操作符内部会重新创建一个 Observable ,而这个 Observable 的 OnSubscribe 是一个 OnSubscribeMap 对象。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) { this.source = source; // 原来我们自己创建的 Observable this.transformer = transformer; // Fun1 转换 } }
所以我们在这里可以小结一下,map 操作符会创建一个新的 Observable 对象,并且它的 OnSubscribe 是一个 OnSubscribeMap 对象,而我们自己的 Observable 会保存在 OnSubscribeMap 里。
再回头看看上面的 demo ,发现 map 创建出来的 Observable 对象调用了 subscribe 方法。在 Part 1 中我们分析过,调用 subscribe 方法内部其实就是会去调用 Observable 中 OnSubscribe 的 call 方法。
所以,我们直接来看 OnSubscribeMap 的 call 方法。
@Override public void call(final Subscriber<? super R> o) { // o 就是我们自定义的 Subscriber MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); // source 就是我们自己创建的 Observable // 和 subscribe 相比,unsafeSubscribe 内部不会对参数校验,subscriber 不会包装成 SafeSubscriber source.unsafeSubscribe(parent); }
call 方法内部创建了一个新的 MapSubscriber 对象,
static final class MapSubscriber<T, R> extends Subscriber<T> { public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; // actual 就是我们自定义的 Subscriber this.mapper = mapper; // mapper 就是 Func1 转换 } }
然后让我们自己的 Observable 去 subscribe 这个 MapSubscriber 对象。
那么接着代码就会执行到 MapSubscriber.call 方法。
@Override public void onNext(T t) { R result; try { // 执行 Func1 转换器 实现从 T 到 R 的转换 // 在 demo 中就是在 hi rxjava 后面追加 hahaha result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } // 执行我们自定义的 Subscriber 的 onNext 方法 actual.onNext(result); }
到这里,整个 map 操作符的流程就讲完了。不明白的同学可以对照着源码多读几遍,相信你会明白的。
献上官方对 map 操作符的示意图
这一小节来看看 subscribeOn 操作,先来看 demo
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hi rxjava"); } }) .subscribeOn(Schedulers.io()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String s) { System.out.println(s); } });
先来看看 Schedulers.io() 中到底干了什么。
public static Scheduler io() { return RxJavaHooks.onIOScheduler(getInstance().ioScheduler); }
其实就是获取 Schedulers 单例中的 ioScheduler 。
private static Schedulers getInstance() { for (;;) { Schedulers current = INSTANCE.get(); if (current != null) { return current; } current = new Schedulers(); if (INSTANCE.compareAndSet(null, current)) { return current; } else { current.shutdownInstance(); } } } private Schedulers() { @SuppressWarnings("deprecation") RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook(); Scheduler c = hook.getComputationScheduler(); if (c != null) { computationScheduler = c; } else { computationScheduler = RxJavaSchedulersHook.createComputationScheduler(); } Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); } Scheduler nt = hook.getNewThreadScheduler(); if (nt != null) { newThreadScheduler = nt; } else { newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler(); } }
可以看到,ioScheduler 默认实现是 RxJavaSchedulersHook.createIoScheduler()
public static Scheduler createIoScheduler() { return createIoScheduler(new RxThreadFactory("RxIoScheduler-")); } public static Scheduler createIoScheduler(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory == null"); } return new CachedThreadScheduler(threadFactory); }
默认创建一个新的 CachedThreadScheduler 。
public CachedThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); } @Override public void start() { CachedWorkerPool update = new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } }
在 CachedWorkerPool 构造方法内部会去创建线程池。
然后回过头来看 subscribeOn 方法的内部代码:
public final Observable<T> subscribeOn(Scheduler scheduler) { return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate)); } public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn)); }
发现逻辑都是类似的,也是创建了一个新的 Observable , 而对应的 OnSubscribe 是一个 OperatorSubscribeOn 对象。
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) { this.scheduler = scheduler; this.source = source; // 真正的 Observable this.requestOn = requestOn; } }
按照以往的惯例,最后 subscribe 的时候肯定会调用 OperatorSubscribeOn 的 call 方法,所以我们直接去看 call 方法。
@Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); // 创建时传入真正的 subscriber SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source); subscriber.add(parent); subscriber.add(inner); inner.schedule(parent); }
scheduler.createWorker() 是可以理解为在新的工作线程中去做某一个动作(Action0)。前面说过,这里的 scheduler 是 CachedThreadScheduler 类型,所以 createWorker 就是创建了一个 EventLoopWorker 对象。
@Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
然后调用 inner.schedule 。
@Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } // threadWorker.scheduleActual 就是调用了线程池去执行这个 action0 ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } // 这里的 action 就是上面的 SubscribeOnSubscriber 对象 // 因此 SubscribeOnSubscriber 的 call 方法就在工作线程中被调用了 action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; }
接下来就到 SubscribeOnSubscriber 的 call 方法中看看。
@Override public void call() { Observable<T> src = source; source = null; t = Thread.currentThread(); // 把真正的 Observable subscribe 到 SubscribeOnSubscriber 中 src.unsafeSubscribe(this); }
最后,真正的 Observable 调用 call 方法时,会调用 SubscribeOnSubscriber 的 onNext 方法。
@Override public void onNext(T t) { // actual 是真正我们自定义的 Subscriber actual.onNext(t); }
SubscribeOnSubscriber 中的 onNext 方法再把参数传给真正的 Subscriber 。
到这里,就把 subscribeOn 切换线程的原理讲完了。
讲完了 subscribeOn ,再来看 observeOn 会简单很多。还是先来个 demo 吧
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hi rxjava"); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String s) { System.out.println(s); } });
先分析 AndroidSchedulers.mainThread() 。
public static Scheduler mainThread() { return getInstance().mainThreadScheduler; }
和 Schedulers.io 类似,也是去获取 AndroidSchedulers.mainThreadScheduler
private static AndroidSchedulers getInstance() { for (;;) { AndroidSchedulers current = INSTANCE.get(); if (current != null) { return current; } current = new AndroidSchedulers(); if (INSTANCE.compareAndSet(null, current)) { return current; } } } private AndroidSchedulers() { RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook(); Scheduler main = hook.getMainThreadScheduler(); if (main != null) { mainThreadScheduler = main; } else { // 传入主线程的 looper mainThreadScheduler = new LooperScheduler(Looper.getMainLooper()); } }
LooperScheduler 的构造方法
class LooperScheduler extends Scheduler { private final Handler handler; LooperScheduler(Looper looper) { handler = new Handler(looper); } LooperScheduler(Handler handler) { this.handler = handler; } }
内部就是创建了主线程的 Handler 。然后利用 Handler 去发送消息就行了。
那么我们就来看看 observeOn 方法。
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, RxRingBuffer.SIZE); } public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) { return observeOn(scheduler, false, bufferSize); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }
可以看到 observeOn 内部利用了 lift ,那么什么是 lift 呢?
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
原来 lift 内部也是去创建一个新的 Observable ,而且 Observable.OnSubscribe 是一个 OnSubscribeLift 对象。套路都是相似的,不一样的就是需要额外传入一个 Operator 对象。从上面可知, Operator 就是一个 OperatorObserveOn 对象。
接着就去 OnSubscribeLift 的 call 方法中看看。
@Override public void call(Subscriber<? super R> o) { try { // 执行 operator.call 方法,返回一个 Subscriber 对象 Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); // 这里的 parent 就是我们真正的 onSubscribe parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } }
关键代码 RxJavaHooks.onObservableLift(operator).call(o);
,可以猜到默认hook就是返回 operator 本身。那么我们到 operator.call 中看看。
@Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; } }
从前面的代码可以知道,这里是 scheduler 是 AndroidScheduler 。所以这里会返回一个 ObserveOnSubscriber 对象。从上面的代码可知,返回了 ObserveOnSubscriber 对象之后,会调用 parent.call(st)
。这里的 parent 就是最原始,也就是我们自定义的 Observable 。所以最后代码就走到了 ObserveOnSubscriber 的 onNext 方法中。
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(NotificationLite.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { // 这里的 recursiveScheduler 就是 LooperScheduler // 注意这里 schedule 的参数是 this !!! recursiveScheduler.schedule(this); } } }
可以看到,这里会调用 LooperScheduler 来处理。
@Override public Subscription schedule(final Action0 action) { // 这里的 action 参数就是 ObserveOnSubscriber 对象 return schedule(action, 0, TimeUnit.MILLISECONDS); } @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { if (unsubscribed) { return Subscriptions.unsubscribed(); } action = hook.onSchedule(action); // ScheduledAction 实现了 Runnable 接口 ScheduledAction scheduledAction = new ScheduledAction(action, handler); // 之后该 message 会在主线程中取出,然后执行 ScheduledAction Message message = Message.obtain(handler, scheduledAction); message.obj = this; // Used as token for unsubscription operation. handler.sendMessageDelayed(message, unit.toMillis(delayTime)); if (unsubscribed) { handler.removeCallbacks(scheduledAction); return Subscriptions.unsubscribed(); } return scheduledAction; }
LooperScheduler.schedule 主要做的就是构造出 message ,然后利用 Handler 把 message 发送到主线程中去执行。所以接着代码就到了 ScheduledAction.run 中。
static final class ScheduledAction implements Runnable, Subscription { @Override public void run() { try { // 这里的 action 参数就是 ObserveOnSubscriber 对象 // 所以会调用 ObserveOnSubscriber.call action.call(); } catch (Throwable e) { // nothing to do but print a System error as this is fatal and there is nowhere else to throw this IllegalStateException ie; if (e instanceof OnErrorNotImplementedException) { ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e); } else { ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); } RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); } } }
ScheduledAction.run 是在主线程中运行的,而 run 方法中调用了 action.call();
。action 其实就是原来那个 ObserveOnSubscriber 对象。
所以代码再次跳转到 ObserveOnSubscriber.call 方法中。
// only execute this from schedule() @Override public void call() { long missed = 1L; long currentEmission = emitted; // these are accessed in a tight loop around atomics so // loading them into local variables avoids the mandatory re-reading // of the constant fields final Queue<Object> q = this.queue; // 这里的 child 就是真正的我们自定义的 Subscriber final Subscriber<? super T> localChild = this.child; // requested and counter are not included to avoid JIT issues with register spilling // and their access is is amortized because they are part of the outer loop which runs // less frequently (usually after each bufferSize elements) for (;;) { long requestAmount = requested.get(); while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } // 在主线程中调用了真正的 Subscriber 的 onNext 方法 localChild.onNext(NotificationLite.<T>getValue(v)); currentEmission++; if (currentEmission == limit) { requestAmount = BackpressureUtils.produced(requested, currentEmission); request(currentEmission); currentEmission = 0L; } } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } emitted = currentEmission; missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } }
到这里,observeOn 实现切换线程的原理就讲完了。基本的 RxJava 操作中的源码也都讲了一遍。至于其他的操作符后面有空再讲吧。