转载

Android进阶知识:RxJava相关

RxJava 核心功能是一个用来完成异步操作的库,相对于其它异步操作的方法, RxJavaAPI 使用更加的简洁。并且 RxJava 中还提供了很多功能强大的操作符,帮助我们解决很多原本复杂繁琐的代码逻辑,提高了代码质量。 RxJava 的实现是基于观察者模式,观察者模式中以下有三个比较重要的概念:

  1. 被观察者(Observable)
  2. 观察者(Observer)
  3. 订阅(subscribe)

被观察者是事件的发起者,被观察者与观察者建立订阅关系后,被观察者发送事件,观察者才能接收到事件。

2. 基础使用

RxJava 的基础使用也很简单,分为三个步骤,分别是创建被观察者,创建观察者和建立订阅关系,具体代码如下。

// 1. 创建被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
        // 2. 创建观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);

            }

            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");

            }

            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable:"+observable.getClass().getName());
        // 3. 建立订阅关系
        observable.subscribe(observer);
复制代码

运行日志:

Android进阶知识:RxJava相关

3. 订阅源码流程

本文中所有源码基于 RxJava22.2.11 版本。首先来看看这个基本的订阅流程源码是怎么实现的。

3.1 创建被观察者

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码

使用 RxJava 可以通过 Observablecreate 方法创建一个被观察者对象。 create 方法从参数中传入一个 ObservableOnSubscribe 类型的 source ,然后方法中先校验了 source 是否为空,接着将传入的 source 封装成一个 ObservableCreate 对象,然后调用了 RxJavaPlugins.onAssembly 方法返回创建的好的 Observable 。接着进入 onAssembly 方法查看。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
复制代码

onAssembly 方法中首先是一个 Hook 实现,这里可以理解为一个代理。可以看到这里先判断 onObservableAssembly 是否为空,为空就直接返回传入的 source ,否则再调用 apply 方法。这里可以继续跟踪一下 onObservableAssembly

@SuppressWarnings("rawtypes")
@Nullable
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

/**
  * Sets the specific hook function.
  * @param onObservableAssembly the hook function to set, null allowed
  */
@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
复制代码

它是 RxJavaPlugins 中的成员变量,默认为空,并且提供了一个 set 方法来设置它。因为默认为空,所以默认返回的就是传入的 source 。这里的代理默认是不会对 Observable 做什么操作,如果需要有特殊的需求可以调用 set 方法实现自己的代理。而默认返回的 source 类型为 ObservableCreate 对象也实现了 Observable 接口。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ......
}
复制代码

3.2 创建观察者

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();
}
复制代码

观察者 Observer 是一个接口,其中提供了一些方法,使用时创建接口的实现,并根据需求在方法中做自己的实现。

3.3 建立订阅关系

建立订阅关系调用了 Observablesubscribe 方法。

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
           ......
        } catch (Throwable e) {
           ......
        }
    }
复制代码

方法中还是先判断了传入参数 observer 是否为空,接着还是一个 Hook 实现,这里就不细究了,获得 Hook 返回的 observer 后再次判断是否为空,之后调用了 subscribeActual 方法。

protected abstract void subscribeActual(Observer<? super T> observer);
复制代码

ObservablesubscribeActual 方法是个抽象方法,之前看过这里的 Observable 实际实现是个 ObservableCreate 对象,所以再进入 ObservableCreate 类查看对应方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
复制代码

ObservableCreate 中的 subscribeActual 方法中先创建了一个 CreateEmitter 发射器对象,并将 observer 对象传入。接着调用了 observeronSubscribe 方法,此时观察者的 onSubscribe 方法执行。最后调用了 sourcesubscribe 方法。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
复制代码

这个 source 就是在 create 方法中传入的 ObservableOnSubscribe 。它的 subscribe 方法中通过调用 ObservableEmitter 的方法发送事件,这里的 ObservableEmitter 就是之前创建的 CreateEmitter 对象,所以再来进一步看看它其中的方法。

CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
复制代码

CreateEmitter 的构造函数接收了观察者对象,然后在调用 onNext 方法时先做了空判断,再对 isDisposed 进行取消订阅的判断,之后调用了 observeronNext 方法,也就是观察者的 onNext 方法。同样的 onComplete 中最终也是调用了 observeronComplete 方法。至此 RxJava 中的基本订阅流程的源码就梳理完了。

Android进阶知识:RxJava相关

4. 线程切换

RxJava 中有个很重要的功能,就是能方便的切换线程,来看下它的使用,还是之前基础使用中的例子进行修改。

Observable<String> observable0 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }
            @Override
            public void onNext(String s) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
            }
            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
        Observable<String> observable2 = observable1.observeOn(AndroidSchedulers.mainThread());
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
        observable2.subscribe(observer);
复制代码

被观察者和观察者的创建和之前一样,在建立订阅关系时调用 subscribeOnobserveOn 方法进行线程的切换。这里每个方法返回的都是 Observable 类型,所以可以采用链式调用,这也是 RxJava 的一个特点,但是这里没有采用这种写法,而是将其拆分开来写并且日志打印出每个 Observable 的具体类型,这是为了方便之后源码理解。 运行结果日志:

Android进阶知识:RxJava相关

4.1 subscribeOn

Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
observable1.subscribe(observer);
复制代码

运行结果:

Android进阶知识:RxJava相关
先只调用 subscribeOn 方法运行查看结果,发现不仅被观察者发射事件运行在了子线程,观察者接收事件也运行在子线程,那么进入 subscribeOn

方法查看它的实现。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
复制代码

可以看到 subscribeOn 方法和 subscribe 方法有些类似。首先是判断传入的 scheduler 是否为空,然后同样调用 RxJavaPlugins.onAssembly 方法,这次构建了一个 ObservableSubscribeOn 对象返回。而 subscribeOn 方法之后还是调用了 subscribe 方法,根据之前的分析, subscribe 方法最终会调用到 subscribeActual 方法,不过此时的 subscribeActual 方法不再是 ObservableCreate 中的而是 ObservableSubscribeOn 中的 subscribeActual 方法。

@Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
复制代码

ObservableSubscribeOnsubscribeActual 方法中流程和之前的也很类似,这次是先创建了一个 SubscribeOnObserver 对象,将观察者对象传入,接着同样先调用了 observer.onSubscribe 方法,然后将传入的 SubscribeOnObserver 封装入了一个 SubscribeTask 对象中,接着调用了 scheduler.scheduleDirect 方法再将返回结果得到的 Disposable 设置到 SubscribeOnObserver 中。下面一个方法一个方法看。首先是创建 SubscribeTask

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
复制代码

SubscribeTaskObservableSubscribeOn 的内部类,其实现很简单就是实现了一个 Runnable 接口,构造方法中传入了 SubscribeOnObserver 对象,在其 run 方法中调用了 ObservableSubscribeOn 中的成员变量 sourcesubscribe 方法。这个 source 是在创建 ObservableSubscribeOn 时传入的,根据前面的代码可以找到是在 subscribeOn 方法中创建的对象并且这个 source 对应传入的是当前这个 Observable 对象即通过 Observable.create 获得的被观察者对象,其实现之前看过是一个 ObservableCreate 所以这里就和之前一样又会走到了其父类 Observablesubscribe 方法中,继而调用 ObservableCreatesubscribeActual 方法,之后最终会调用到观察者的对应 onNext 等方法,不过此时的观察者不直接是在使用时创建传入的 Observer ,而是之前看到的 SubscribeOnObserver 类型,不过其中的 onNext 等方法还是调用了在使用时创建传入的 Observer 的对应方法。

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;
        final AtomicReference<Disposable> upstream;
        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }
        @Override
        public void onComplete() {
            downstream.onComplete();
        }
        ......
    }
复制代码

下面接着看到 scheduleDirect 这个方法,在创建好 SubscribeTask 之后调用了 scheduleDirect 方法。这里的 scheduler 就是 subscribeOn 中传入的,对应开始例子中的 Schedulers.newThread

public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
// 静态成员变量NEW_THREAD
static final Scheduler NEW_THREAD;

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
复制代码

进入 Schedulers.newThread 一步步跟踪,看到 newThread 方法返回静态成员变量中的 NEW_THREAD ,而 NEW_THREAD 又是通过 NewThreadTask 创建。

static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}
static final Scheduler DEFAULT = new NewThreadScheduler();
复制代码

继续跟踪查看发现 NewThreadTask 实际是实现了 Callable 接口,其 call 方法中返回了静态内部类中的 NewThreadHolder.DEFAULT 。这个 DEFAULT 的实现类型为 NewThreadScheduler 。至此终于找到了我们传入的 Scheduler 的真正实现类。于是继续看其 scheduleDirect 方法。

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}
复制代码

scheduleDirect 方法是在其父类中实现的,看到其中进而调用了同名重载方法,方法中首先是调用 createWorker 方法创建一个 Worker 。这个方法的实现就是在 NewThreadScheduler 中了。

public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}
复制代码

createWorker 方法中只做了一件事就是创建返回了一个 NewThreadWorker

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    ......
}
复制代码

NewThreadWorker 中看到创建了一个线程池,再回到 scheduleDirect 方法,创建完 Worker 后将传入的 RunnableSubscribeTask 进行一个装饰得到新的 Runnable 对象。接着将 Worker 和新的 Runnable 封装到一个 DisposeTask 对象中。

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
        @NonNull
        final Runnable decoratedRun;
        @NonNull
        final Worker w;
        @Nullable
        Thread runner;
        DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }
        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }
    ......
}
复制代码

DisposeTask 同样实现了 Runnable 接口,在 run 方法中调用了从构造传入的 decoratedRunrun 方法执行任务。回到最后一步,调用 Workerschedule 方法,这里就对应的 NewThreadWorkerschedule 方法。

public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }
复制代码

schedule 方法中又进一步调用了其 scheduleActual 方法。

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
复制代码

scheduleActual 方法里看到又将 decoratedRunDisposableContainer 封装成 ScheduledRunnable 最后将这个 ScheduledRunnable 交给构造函数中创建的线程池去运行,最终就会执行到前面看过的 SubscribeTask 中的 run 方法完成订阅逻辑,调用观察者的 onNext 等方法。到这里就看出最终的 source.subscribe 是会通过线程池切换到子线程中去执行了。

通过查看 subscribeOn 方法源码可以发现,方法里实际上是在前一个创建的 ObservableCreate 外面包了一层,把它包成一个 ObservableSubscribeOn 对象,同样的原先的 Observer 也被包了一层包成一个 SubscribeOnObserver 对象,而线程切换的工作是由 Scheduler 完成的。

Android进阶知识:RxJava相关

4.2 observeOn

接着再来看看切换回主线程的方法 observeOn ,还是先修改使用代码,查看运行日志。

Observable<String> observable2 = observable0.observeOn(AndroidSchedulers.mainThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
observable2.subscribe(observer);
复制代码

运行日志:

Android进阶知识:RxJava相关

接着还是进入来看源码。

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

 public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码

这里看到 observeOn 方法里调用了重载方法,方法中还是同一个套路,不过这里创建的又是另一个对象 ObservableObserveOn 了。根据前面的经验这里就又是将前一个 Observable 传递到 ObservableObserveOn 中的成员变量 source 上,这里看到就是构造函数中的第一个参数。接着还是会调用 subscribe 与观察者建立订阅关系进而会执行到 ObservableObserveOn 对象的 subscribeActual 方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
复制代码

subscribeActual 方法中判断了 scheduler 的类型,这里的 scheduler 就是由 AndroidSchedulers.mainThread() 传入的,于是先来看一下这个方法。

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
    @Override public Scheduler call() throws Exception {
        return MainHolder.DEFAULT;
    }
});

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
复制代码

mainThread 开始看,发现代码调用逻辑和之前的 Schedulers.newThread 方法类似,最终会返回一个 HandlerScheduler 而这个 Scheduler 中的 Handler 则是主线程的 Handler ,看到这里就能猜想到了,后面观察者的对应方法一定是由这个 Handler 来切换到主线程执行的。回到 subscribeActual 方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
复制代码

这里判断完类型会走 else 中的方法首先还是会调用 HandlerSchedulercreateWorker 方法创建一个 Worker

@Override
public Worker createWorker() {
    return new HandlerWorker(handler, async);
}
复制代码

这里是个 HandlerWorker 其中具体方法后面再看。接着上面创建完 Worker 后同样还是一样调用 source.subscribe 创建了一个 ObserveOnObserver 对象传入。这里的 source 就还是之前的 ObservableCreate ,所以这里还是会调用 ObservableCreate 中的 subscribeActual 方法。

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

ObservableCreate 中的 subscribeActual 方法中的逻辑之前看过,不过此时传入的 observer 仍然不再是在使用时创建的观察者对象了,而是传过来的 ObserveOnObserver 对象,此时创建的 CreateEmitter 中的 observer 也就是这个 ObserveOnObserver 对象。和之前逻辑一样,接着就会调用 observeronNext 等方法,此时调用的即是 ObserveOnObserver 中的 onNext 等方法。所以进入 ObserveOnObserver 查看。

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
        schedule();
}

@Override
public void onComplete() {
    if (done) {
        return;
    }
    done = true;
    schedule();
}

void schedule() {
     if (getAndIncrement() == 0) {
         worker.schedule(this);
     }
}
复制代码

查看 ObserveOnObserver 中的代码会发现 onNext 方法中先将传入的参数放入了一个队列,然后无论是 onNext 还是 onComplete 方法最后都调用了 schedule 方法,进而再进入查看,发现 schedule 方法中又调用了 worker.schedule 方法。这里的 worker 就是之前创建的 HandlerWorker ,这时再来看它的 schedule 方法。

public Disposable schedule(@NonNull Runnable run) {
   return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
复制代码

单个参数 schedule 方法是在其父类中的,而这个方法中又调用另一个三个参数的 schedule 方法,这个方法父类中是抽象方法所以实现就在子类 HandlerWorker 里了。

@Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
	    // 判断是否取消订阅
            if (disposed) {
                return Disposables.disposed();
            }
            run = RxJavaPlugins.onSchedule(run);
	    // 创建ScheduledRunnable
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
	    // 创建消息,并将主线程Handler和ScheduledRunnable
            Message message = Message.obtain(handler, scheduled);
            message.obj = this;
	    // 判断设置异步消息
            if (async) {
                message.setAsynchronous(true);
            }
	    // 发送消息执行callback
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            // 检查是否取消订阅
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }
复制代码

在子类的这个方法里在做了取消订阅的判断后将方法传入的 RunnableHandler 又封装到一个 ScheduledRunnable 对象中。接着创建了一个 Message 并将 ScheduledRunnable 放入 Message ,最后调用 handler.sendMessageDelayed 方法通过这个主线程的 Handler 执行这个 ScheduledRunnable

最后来追溯下 ScheduledRunnable 到底执行了什么,不过猜也知道最后一定调用到观察者中的对应方法。

private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }
    ......
    }
复制代码

ScheduledRunnable 中的 run 方法很简单就是调用了构造中传入的 Runnablerun 方法。而根据之前看过得创建 ScheduledRunnable 时传入的 Runnable 又是从 scheduleDirect 方法中传入的,而 scheduleDirect 方法中的 Runnable 又是从 worker.schedule(this) 方法时传入的,根据上下文代码发现这个 this 指代的是 ObserveOnObserver 对象,于是进一步进入它的 run 方法查看。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ......
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        ......
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    ......    
    }
复制代码

可以看到 run 方法中判断了 outputFused 的真假,然后分别调用了 drainFuseddrainNormal 方法。这里的 outputFused 是与 RxJava2 中的背压处理相关暂时先不管,根据方法名也能知道正常调用会执行 drainNormal 方法,于是直接来看 drainNormal 方法。

void drainNormal() {
            int missed = 1;
            // 存放onNext传入的事件对象队列
            final SimpleQueue<T> q = queue;
            // 传入的观察者对象
            final Observer<? super T> a = downstream;
            // 循环check事件是否完成或者发生错误
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        // 从队列中取出发送事件传入的对象
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    // 再次判断是否完成或者发生错误
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    // 判断队列中取出的发送事件传入的对象v是否为空
                    if (empty) {
                        break;
                    }
                    // 执行观察者对象的onNext方法
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
复制代码

drainNormal 方法中先通过 checkTerminated 方法校验发送事件是否完成或者发生异常,接着从队列中取出事件对象,再次判断是否完成或者发生错误和取出的对象是否为空,没有问题的话就会执行观察者的 onNext 方法。而发送完成和出现异常的方法则是在 checkTerminated 方法处理。

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
复制代码

checkTerminated 方法里根据 delayError 判断是否设置了超时的错误,接着再根据获得的错误 e 是否为空再决定调用的是观察者的 onError() 方法还是 onComplete 方法。至此 observeOn 切换线程的流程也梳理结束。

Android进阶知识:RxJava相关

5. map操作符

RxJava 中有很多功能强大的操作符,通过使用这些操作符,可以很容易的解决代码编写时遇到的一些复杂繁琐的问题。这里就用 map 操作符来作为一个例子,来看看操作符是怎样工作的。首先还是来了解 map 操作符的使用方法和作用。

final Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("5");
                emitter.onComplete();
            }
        });
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }

            @Override
            public void onNext(Integer i) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+i);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
            }
            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Observable<Integer> mapObservable = observable.map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        });
        Log.d(getClass().getName(), Thread.currentThread().getName() + " mapObservable:"+mapObservable.getClass().getName());
        mapObservable.subscribe(observer);
复制代码

运行日志:

Android进阶知识:RxJava相关

map 操作符作用是可以将被观察者发送事件的数据类型转换成其他的数据类型。它的使用方法很简单,例如上面这个例子就将一开始发送的 String 类型转换成观察者接收到的 Integer 类型。下面开始看 map 方法的源码。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码

看到 map 方法中依旧还是同样的套路,通过 RxJavaPlugins.onAssembly 方法返回一个被观察者对象,只不过这次构建传入的类型又是另一个 ObservableMap 类型的对象。订阅的流程前面已经看过了,这里和之前的一样最终会走到 ObservableMapsubscribeActual 方法,所以直接来看这个方法。

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码

ObservableMapsubscribeActual 方法里看到很熟悉还是会调用 source.subscribe 方法,只是这里传入的 Observer 对象是一个 MapObserver 对象。接下来的逻辑又和之前一样,根据之前的经验 source.subscribe 方法最终会调用 ObserveronNext 方法,所以接下来直接来看 MapObserveronNext 方法。

public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
复制代码

MapObserveronNext 方法里的逻辑很简单,在做了一些的判断后调用 mapper.apply(t) 方法获得类型转换后的事件传递对象,最后就会调用观察者的 downstream.onNext 方法,这里的 downstream 就是订阅方法传入的观察者对象。跟踪 mapper 可以找到,它是从 MapObserver 构造时传入的一个 Function 类型,即是在使用 map 操作符时传入的那个 Function 对象,又因为在使用时实现了 Functionapply 方法完成了数据的类型转换逻辑,所以这里调用 mapper.apply(t) 方法就可以获得到转换后的数据。

Android进阶知识:RxJava相关

6. 总结

以上就是关于 RxJava 源码工作流程的相关总结,总而言之,观察者模式还是其核心设计思想。除此之外,通过源码阅读还发现,无论在线程切换方面还是其它功能的操作符的实现,根本上来说都是在其原有的被观察者或观察者基础上包装成一个新的对象,功能逻辑由新对象中的方法来实现完成。

原文  https://juejin.im/post/5d594514e51d4561af16dd13
正文到此结束
Loading...