不忘初心 砥砺前行, Tomorrow Is Another Day !
本文概要:
对应源码
//Observable.java @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { //参数检查 ObjectHelper.requireNonNull(source, "source is null"); //装配Observable,返回一个ObservableCreate对象 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } //钩子方法 @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } @NonNull static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) { try { return f.apply(t); } catch (Throwable ex) { throw ExceptionHelper.wrapOrThrow(ex); } } 复制代码
当我们通过Create方法创建一个Observable对象时,
当我们创建完Observable对象时,会调用subscribe去绑定观察者Observer.所以直接看该方法.
对应源码
//Observable.java @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer);//此方法是重中之重. } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码
同样的方式,对我们传入的Observer观察者对象进行检查以及预处理,最终调用subscribeActual方法,该方法是一个抽象方法.所以我们直接看它的实现类,也就是ObservableCreate对象下的subscribeActual方法.
对应源码
//ObservableCreate.java public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { //1. 创建一个发射器对象 CreateEmitter<T> parent = new CreateEmitter<T>(observer); //2. 回调onSubscribe,通知订阅成功 observer.onSubscribe(parent); try { //3. 回调subscribe,开始发送事件. //source对象就是创建被观察者传入的. source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
上面注释已经写得很详细了这里再重复一次.整个订阅过程就是.
接着我们来看发射器对象CreateEmitter是如何将事件发送给订阅者的.
对应源码
//ObservableCreate#CreateEmitter.java static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { //回调onNext observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { //回调onError observer.onError(t); } finally { //最后断开订阅 dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { //回调onComplete. observer.onComplete(); } finally { //最后断开订阅 dispose(); } } } @Override public void setDisposable(Disposable d) { DisposableHelper.set(this, d); } @Override public void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public ObservableEmitter<T> serialize() { return new SerializedEmitter<T>(this); } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } //DisposableHelper.java //断开订阅方法 public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } 复制代码
从上面注释可知,发射器CreateEmitter直接回调了观察者Observer的相关方法.当调用dispose断开dispose订阅时,此时和线程中断处理一样,仅仅只是作为一个标识,标识当前发射器已经被中断.
这里最后给出一张关系图对上面流程进行归纳.
Observable的创建-订阅-发送事件过程
从上面可知,RxJava的整体流程框架还是挺清晰的,但有时我们需要多它进行一些附加的操作如切线程,map,filter进行转换等.这里以切线程为例进行分析看如何工作的.
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); //新建一个ObservableSubscribeOn. return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
当我们切换下游线程时,也返回了一个新建的Observable-ObservableSubscribeOn.接着我们直接看ObservableSubscribeOn类.
对应源码
//ObservableSubscribeOn.java //中间的Observable. public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source);//上游的源Observable this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { //创建一个中间的Observer //创建一个中间的Observer //创建一个中间的Observer //重要的话说三遍. final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //回调onSubscribe,通知订阅成功 s.onSubscribe(parent); //切换线程 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } //线程任务类SubscribeTask final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { //回调subscribe,进行订阅 //source是源Observable,parent则是中间Observer,不是最终目标的Observer.这里一定要清楚. source.subscribe(parent); } } 复制代码
从上面注释可知,当我们调用subscribeOn切换上游线程时
这也就是解释了最开始文章所说的subscribeOn无论调用几次,为什么只有第一次是生效的. 因为每次都创建新的Observable与Observer ,线程调度器里将源Observable与中间的Observer进行绑定订阅时,源Observable仅仅是指上一个,已经不是第一个创建出来的.
通过上面流程分析可以总结出RxJava操作符一些通用的流程.对于Map等操作符都可以参考.如图
Observable附加操作通用流程
最后我们根据上面总结出的通用流程,去分析下切换下游线程的过程.
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); //返回一个新建的Observable return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 复制代码
同样当调用observeOn过程切换下游线程时,果不其然也返回了一个中间的Observable-ObservableObserveOn.接着看ObservableObserveOn代码.
对应源码
//中间的Observable //ObservableObserveOn.java @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //线程调度器 Scheduler.Worker w = scheduler.createWorker(); //回调subscribe进行订阅(中间的Observer-ObserveOnObserver) source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 复制代码
同样,果不其然
之前我们知道每次执行observeOn都会切换一次下游线程,从上面源码可知, 每次都会新建一个中间的Observer绑定新指定的线程调度器,所以接收事件都是在新的线程中执行啦 .
至此,RxJava基本原理就差不多分析完成,最重要的是记住两张流程图,都遵循这个规律.
由于本人技术有限,如有错误的地方,麻烦大家给我提出来,本人不胜感激,大家一起学习进步.