implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
我们从最简单的开始,挑最主要的讲:
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { e.onNext(1); } }).subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d("A", String.valueOf(o)); } });
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
该方法返回的是Observable对象,onAssembly方法可以占时不用理会,我们看
new ObservableCreate<T>(source):
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
也就是我们最终返回的是ObservableCreate对象,他继承Observable,持有ObservableOnSubscribe的引用。
public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer()); }
接着:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; }
可以看出,我们传进来的Consumer对象被放进了LambdaObserver中。接下来重点是 subscribe(ls) ;
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
.subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d("A", String.valueOf(o)); } });
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
这个方法里面构造了一个发射器CreateEmitter,传进去的observer就是上面所说的LambdaObserver,里面持有我们传进去的Consumer对象。
observer.onSubscribe(parent);调用的是LambdaObserver里面的onSubscribe;
@Override public void onSubscribe(Disposable s) { if (DisposableHelper.setOnce(this, s)) { try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); onError(ex); } }
这个方法里面的onSubscribe.accept(this);中的onSubscribe,是我们上面的方法中的第四个参数,是一个空的Consumer,所以不用理他:
public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer()); }
我们回过头看subscribeActual方法中的source.subscribe(parent);
source是我们之前传进去的ObservableOnSubscribe对象,source.subscribe便是调用ObservableOnSubscribe的subscribe方法,传进去的是一个发射器。
到此我们总算调用到了外面这个方法了。
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { e.onNext(1); } })
我们再看e.onNext(1),在ObservableCreate的内部类CreateEmitter里面:
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }
这里的observer我们上面已经分析过,是构造发射器的时候传进去的LambdaObserver,LambdaObserver里面持有我们传进去的Consumer对象。
@Override public void onNext(T t) { if (!isDisposed()) { try { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); } } }
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { e.onNext(1); } }).subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d("A", String.valueOf(o)); } });
到此结束。