转载

RxJava源码解析一

本文基于RxJava 3.0.0

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'

我们从最简单的开始,挑最主要的讲:

Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
        e.onNext(1);
    }
}).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) throws Exception {
        Log.d("A", String.valueOf(o));
    }
});

我们先看create方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

该方法返回的是Observable对象,onAssembly方法可以占时不用理会,我们看

new ObservableCreate<T>(source):

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

也就是我们最终返回的是ObservableCreate对象,他继承Observable,持有ObservableOnSubscribe的引用。

接下来看subscribe方法:

public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

接着:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}

可以看出,我们传进来的Consumer对象被放进了LambdaObserver中。接下来重点是 subscribe(ls) ;

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribeActual(observer);是一个抽象方法,实现它的方法有很多,从上面的分析中可以知道,调用下面方法的是ObservableCreate对象。

.subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) throws Exception {
        Log.d("A", String.valueOf(o));
    }
});

所以subscribeActual(observer);调用的是ObservableCreate里面的。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

这个方法里面构造了一个发射器CreateEmitter,传进去的observer就是上面所说的LambdaObserver,里面持有我们传进去的Consumer对象。

observer.onSubscribe(parent);调用的是LambdaObserver里面的onSubscribe;

@Override
public void onSubscribe(Disposable s) {
    if (DisposableHelper.setOnce(this, s)) {
        try {
            onSubscribe.accept(this);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            onError(ex);
        }
    }

这个方法里面的onSubscribe.accept(this);中的onSubscribe,是我们上面的方法中的第四个参数,是一个空的Consumer,所以不用理他:

public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

我们回过头看subscribeActual方法中的source.subscribe(parent);

source是我们之前传进去的ObservableOnSubscribe对象,source.subscribe便是调用ObservableOnSubscribe的subscribe方法,传进去的是一个发射器。

到此我们总算调用到了外面这个方法了。

(也就是说必须调用subscribe方法,才能发射事件)

Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
        e.onNext(1);
    }
})

我们再看e.onNext(1),在ObservableCreate的内部类CreateEmitter里面:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

这里的observer我们上面已经分析过,是构造发射器的时候传进去的LambdaObserver,LambdaObserver里面持有我们传进去的Consumer对象。

于是observer.onNext(t)便是调用LambdaObserver的onNext(t)

@Override
public void onNext(T t) {
    if (!isDisposed()) {
        try {
            onNext.accept(t);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            onError(e);
        }
    }
}

onNext便是它持有的的Consumer对象,于是onNext.accept(t)便回调到了外面这一层的accept:

Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
        e.onNext(1);
    }
}).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) throws Exception {
        Log.d("A", String.valueOf(o));
    }
});

到此结束。

原文  http://77blogs.com/?p=512
正文到此结束
Loading...