转载

Rxjava源码解析(一):最简单的流式传递

Rxjava是什么

Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。</br>

通俗来说,Rxjava是一个采用了观察者模式设计处理异步的框架。链式调用设计让代码优雅易读。

举个例子:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

           
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }
        });


        observable.subscribe(new Observer<String>() {
           
            public void onSubscribe(Disposable d) {

            }

           
            public void onNext(String s) {

            }

           
            public void onError(Throwable e) {

            }

           
            public void onComplete() {

            }
        });

这是Rxjava2最简单的用法:

Observable
Observable

Rxjava源码解析

1. 创建Observable:

创建Observable用的是 Observable.create(ObservableOnSubscribe<T> source) 方法。

这个方法的参数是 ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * e the safe emitter instance, never null
     * Exception on error
     */
    void subscribe ObservableEmitter<T> e) throws Exception;
}

查看源码可以知道,它是个接口,唯一的方法是 subscribe ,参数是 ObservableEmitter<T> eObservableEmitter 是一个继承了 Emitter 的接口,在 Emitter 里定义了onNext、onError、onComplete等方法。

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * value the value to signal, not null
     */
    void onNext T value);

    /**
     * Signal a Throwable exception.
     * error the Throwable to signal, not null
     */
    void onError Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

ObservableEmitter 对接口 Emitter 进行扩展,增加了setDisposable、setCancellable等方法

基本参数了解了,现在看看create方法里面做了什么,代码如下:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

调用了 RxJavaPluginsonAssembly 方法。又有一个新参数 ObservableCreate<T>(source) ,我们看看它是什么:

final class ObservableCreate<T> extends Observable<T> {

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

}

继承了 Observable ,所以也是个被观察对象,在构造函数中我们看到我们new的 ObservableOnSubscribe 对象,被存在了 ObservableCreatesource 里面

那我们继续看看onAssembly方法做什么:

public static <T> Observable<T> onAssembly Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

一个Hook方法。 onObservableAssembly 是一个静态变量,我们没有设置,默认为空,所以直接返回source对象。也就是说,Observable的create方法其实就是把我们 ObservableOnSubscribe 对象,存储在 ObservableCreate 对象的 source 里面,然后返回 ObservableCreate 对象。

我们知道 ObservableCreate 是继承 Observable 的,所以创建了 ObservableCreate 对象,我们的Observable也就创建完了。

2. 订阅事件(被观察者)

订阅被观察者的操作是 observable.subscribe(new Observer<String>()) 。这个操作符其实是个“被动”,就是事件被观察者观察。因为subscribe方法里的参数 Observer 才是观察者。我们也会在 Observer 里的各个会调方法里接收到事件相关的返回值。

我们看看 subscribe 方法的源码:

public final void subscribe(Observer<? super T> observer) {
        try {
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            RxJavaPlugins.onError(e);
        }
    }

看代码我们知道最主要调用的方法是: subscribeActual(observer); ,这个方法是Observable里的抽象方法,此时我们的Observable是一个 ObservableCreate 对象。所以我们去看一下 ObservableCreate 里面是如何重写这个方法的。代码如下:

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

我们一看到这个方法主要做了三件事:

  1. 创建一个 CreateEmitter 对象 parent ,把 observer 传进去。
  2. parent 传给 observeronSubscribe 方法。
  3. parent 传给 sourcesubscribe 方法。上面我们知道 source 就是我们传进来的 ObservableOnSubscribe 对象, subscribe 也就是我们重写的方法:
public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }

我们可以在这个方法里做被观察的事件,并可以通过 CreateEmitter 回调相应的方法。 CreateEmitter 是实现 ObservableEmitter 接口,我们看看它内部实现, onNext 源码如下:

public void onNext(T t) {
    observer.onNext(t);
}

也就是说,当我们在 ObservableOnSubscribe 的1 subscribe 方法里调用 ObservableEmitteronNext 方法的时候,它里面会调用observer的 onNext 。于是通过这样的传递,我们就能在observer里响应的回调方法里收到事件的相关状态。

至此一个简单Rxjava流式传递原理已经讲完了。

原文  https://xiaozhuanlan.com/topic/3124609857
正文到此结束
Loading...