转载

四. RxJava之基本原理

不忘初心 砥砺前行, Tomorrow Is Another Day !

本文概要:

  1. Observable的创建
  2. subscribe订阅过程
  3. 发送事件
  4. 线程切换过程

1. Observable的创建

对应源码

//Observable.java
@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //参数检查
        ObjectHelper.requireNonNull(source, "source is null");
        //装配Observable,返回一个ObservableCreate对象
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

//钩子方法
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    
     @NonNull
    static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
        try {
            return f.apply(t);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

复制代码

当我们通过Create方法创建一个Observable对象时,

  • 参数检查,然后装配Observable,返回一个ObservableCreate对象,它持有一个ObservableOnSubscribe对象,ObservableCreate是Observable的子类
    • 装配流程: 执行钩子方法,对即将创建的Observale进行预处理,可以通过onObservableAssembly进行设置.

2. subscribe订阅过程

当我们创建完Observable对象时,会调用subscribe去绑定观察者Observer.所以直接看该方法.

对应源码

//Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);//此方法是重中之重.
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
复制代码

同样的方式,对我们传入的Observer观察者对象进行检查以及预处理,最终调用subscribeActual方法,该方法是一个抽象方法.所以我们直接看它的实现类,也就是ObservableCreate对象下的subscribeActual方法.

对应源码

//ObservableCreate.java

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //1. 创建一个发射器对象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //2. 回调onSubscribe,通知订阅成功
        observer.onSubscribe(parent);

        try {
            //3. 回调subscribe,开始发送事件.
            //source对象就是创建被观察者传入的.
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

上面注释已经写得很详细了这里再重复一次.整个订阅过程就是.

  1. 创建一个发射器对象,绑定观察者Observer
  2. 回调onSubscribe,通知订阅成功
  3. 回调subscribe,开始发送事件

接着我们来看发射器对象CreateEmitter是如何将事件发送给订阅者的.

3. 发送事件

对应源码

//ObservableCreate#CreateEmitter.java
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //回调onNext
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    //回调onError
                    observer.onError(t);
                } finally {
                    //最后断开订阅
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //回调onComplete.
                    observer.onComplete();
                } finally {
                    //最后断开订阅
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
    

    //DisposableHelper.java
    //断开订阅方法
    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

    
复制代码

从上面注释可知,发射器CreateEmitter直接回调了观察者Observer的相关方法.当调用dispose断开dispose订阅时,此时和线程中断处理一样,仅仅只是作为一个标识,标识当前发射器已经被中断.

这里最后给出一张关系图对上面流程进行归纳.

四. RxJava之基本原理

Observable的创建-订阅-发送事件过程

4. subscribeOn过程

从上面可知,RxJava的整体流程框架还是挺清晰的,但有时我们需要多它进行一些附加的操作如切线程,map,filter进行转换等.这里以切线程为例进行分析看如何工作的.

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //新建一个ObservableSubscribeOn.
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

复制代码

当我们切换下游线程时,也返回了一个新建的Observable-ObservableSubscribeOn.接着我们直接看ObservableSubscribeOn类.

对应源码

//ObservableSubscribeOn.java
//中间的Observable.
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);//上游的源Observable
        this.scheduler = scheduler;
    }

@Override
    public void subscribeActual(final Observer<? super T> s) {
        //创建一个中间的Observer
        //创建一个中间的Observer
        //创建一个中间的Observer
        //重要的话说三遍.
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //回调onSubscribe,通知订阅成功
        s.onSubscribe(parent);
        
        //切换线程
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
//线程任务类SubscribeTask    
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //回调subscribe,进行订阅
            //source是源Observable,parent则是中间Observer,不是最终目标的Observer.这里一定要清楚.
            source.subscribe(parent);
        }
    }    
复制代码

从上面注释可知,当我们调用subscribeOn切换上游线程时

  1. 创建一个中间的Observable,然后在这个Observable里面会创建一个中间的Observer
  2. 最后在线程任务的run方法里面通过将源Observable与中间的Observer进行绑定订阅,从而进行切换了上游线程 .
  3. 中间的Observer接到相应回调时则会继续往上回调源Observer.

这也就是解释了最开始文章所说的subscribeOn无论调用几次,为什么只有第一次是生效的. 因为每次都创建新的Observable与Observer ,线程调度器里将源Observable与中间的Observer进行绑定订阅时,源Observable仅仅是指上一个,已经不是第一个创建出来的.

通过上面流程分析可以总结出RxJava操作符一些通用的流程.对于Map等操作符都可以参考.如图

四. RxJava之基本原理

Observable附加操作通用流程

5. observeOn过程

最后我们根据上面总结出的通用流程,去分析下切换下游线程的过程.

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //返回一个新建的Observable
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

复制代码

同样当调用observeOn过程切换下游线程时,果不其然也返回了一个中间的Observable-ObservableObserveOn.接着看ObservableObserveOn代码.

对应源码

//中间的Observable
//ObservableObserveOn.java
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //线程调度器
            Scheduler.Worker w = scheduler.createWorker();
            //回调subscribe进行订阅(中间的Observer-ObserveOnObserver)
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
复制代码

同样,果不其然

  1. 创建了一个中间的Observer-ObserveOnObserver,这个中间的Observer绑定了线程调度器Scheduler.
  2. 接着将源Observable与它绑定订阅.
  3. 最后接收到事件时,则通过中间Observer的线程调度器去回调目标Observer.

之前我们知道每次执行observeOn都会切换一次下游线程,从上面源码可知, 每次都会新建一个中间的Observer绑定新指定的线程调度器,所以接收事件都是在新的线程中执行啦 .

至此,RxJava基本原理就差不多分析完成,最重要的是记住两张流程图,都遵循这个规律.

由于本人技术有限,如有错误的地方,麻烦大家给我提出来,本人不胜感激,大家一起学习进步.

原文  https://juejin.im/post/5c72b71c6fb9a049b07dfbdb
正文到此结束
Loading...