just()示例
Observable.just("文章1", "文章2") .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onNext(String value) { Log.d(TAG, " onNext : " + value); } @Override public void onError(Throwable e) { Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, " onComplete"); } }) 复制代码
Observable 的just()有10个重载方法,参数1~10个
public static <T> Observable<T> just(T item) ...... public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) 复制代码
同样的我们从以下几个方面分析源码:
public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); //创建ObservableJust对象,封装成被观察者Observable return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); } 复制代码
创建了 ObservableJust 对象,调用 RxJavaPlugins.onAssembly 返回了被观察者 Observable.
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码
我们看下 ObservableJust 类,同样的也是继承 Observable。
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } //这个方法很重要, 等会会说到 @Override protected void subscribeActual(Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); observer.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } } 复制代码
这时候被观察者已经创建完成了,它就是 Observable 的子类 ObservableJust,我们在用 Observable.just() 的时候其实被观察者是 ObservableJust。
public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); npe.initCause(e); throw npe; } } 复制代码
这里的 Subscribe() 方法和 Observable.Create() 调的 Subscribe() 一样,我们知道 Subscribeactual() 这个方法是抽象方法,那它具体实现是不是和 Observable.Create() 一样也是 Observablecreate 类里呢?不是,不是,不是,重要的话说三遍哦,它的具体实现在我们看到 Observablejust 中。
那么我们回到 ObservableJust 中看下subscribeActual(Observer<? super T> observer)
protected void subscribeActual(Observer<? super T> observer) { //创建了一个线程 ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); //设置observer的回调方法onSubscribe observer.onSubscribe(sd); //执行线程 sd.run(); } 复制代码
这里面创建了一个线程(Runnable),他就是 ScalarDisposable ,ScalarDisposable实现Runnable,把我们创建的观察者Observer 和参数value (Observable.just("文章1") 这里的‘文章1’) 作为构造方法的参数传进去了,
同时他也是Disposable的子类,所以 observer.onSubscribe(sd);
这行就很好理解了,就是设置了观察者的onSubscribe方法的回调,所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } ...... 复制代码
接下来看下 ScalarDisposable 类
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable { private static final long serialVersionUID = 3880992722410194083L; //我们创建的观察者 final Observer<? super T> observer; //我们在just中传递的参数(文章1) final T value; static final int START = 0; static final int FUSED = 1; static final int ON_NEXT = 2; static final int ON_COMPLETE = 3; public ScalarDisposable(Observer<? super T> observer, T value) { this.observer = observer; this.value = value; } @Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } } 复制代码
我们在subscribeActual()方法中看到最后执行了 sd.run(); 所以我们只需看 public void run() 这个方法, run()方法首先执行了 observer.onNext(value), 也就是说我们在创建Observable时传的参数此时发送给observer, 然后在执行observer.onComplete()。
这样,Observable.just() 一个参数的方法就结束了
同样的我们从以下几个方面分析源码:
我们下看源码:
public static <T> Observable<T> just(T item1, T item2) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); return fromArray(item1, item2); } 复制代码
很简单,在往下看 fromArray()
public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); } 复制代码
我们看到fromArray()参数是 可变长度参数,也就是说参数可以为1个,当然了1个时直接调用Observable.just("文章1") 一个参数的just()。最后返回一个可变长度参数 items 构造的 ObservableFromArray 对象,他也继承了Observable,也就是说我们创建的被观察着就是 ObservableFromArray 对象。
public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } ...... 复制代码
同样是
public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); npe.initCause(e); throw npe; } } 复制代码
但是抽象方法subscribeActual()在ObservableFromArray中执行在看ObservableFromArray的subscribeActual()这个方法
@Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } 复制代码
此时创建了FromArrayDisposable对象,参数是我们创建的观察者observer,和传递的可变长度的参数array,然后执行Observer中的onSubscribe()方法。最后执行了FromArrayDisposable的run()方法(注意他不是线程的run()方法)。
我们在看FromArrayDisposable的run()方法
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> downstream; final T[] array; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.downstream = actual; this.array = array; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The " + i + "th element is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } } 复制代码
变量downstream 就是我们创建的观察者 Observer ,array 就是我们传递的可变长度的那一串数组
run()方法中遍历array,然后执行回调 downstream.onNext(value) ,最后在执行回调 downstream.onComplete()。
根据上面的分析,我们得出如下规则:
1、通过 just() 方式 直接触发 onNext()
2、just 传进去什么,在onNext() 接收什么,如果我们传入 List,同样的在 onNext() 接收的也是 List,而不是 List 的 Item
3、onNext() 中接收数据的顺序是根据 just 传入的顺序确认的,使用 just 不允许传递 null,否则会出现异常
示例源码
个人Github主页如果对您有帮助,您可以 "Star" 支持一下哦, 谢谢! ^^