List<Integer> list = new ArrayList<>(); list.add(0); list.add(1); list.add(2); list.add(3); mDisposables.add(Observable .fromIterable(list) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("接收----->"+integer); } })); 复制代码
输出日志:
接收----->0 接收----->1 接收----->2 接收----->3 复制代码
我们通过源码可以看到 fromIterable 通过 Iterable 构造了一个 ObservableFromIterable 然后返回。
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) { ObjectHelper.requireNonNull(source, "source is null"); // 通过 Iterable 构造了一个 ObservableFromIterable 返回 return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source)); } 复制代码
ObservableFromIterable 继承 Observable。
接下来我们回到订阅过程,其订阅过程前面的内容跟上一节分析的是一样的,就不重复了。还记得订阅过程中 Observable 类的 subscribeActual() 是个抽象方法吗?他的真正实现是在 ObservableFromIterable 中,所以我们来看下 subscribeActual 方法:
public final class ObservableFromIterable<T> extends Observable<T> { final Iterable<? extends T> source; public ObservableFromIterable(Iterable<? extends T> source) { this.source = source; } @Override public void subscribeActual(Observer<? super T> observer) { ...... // 把我们创建的 Observer 和 传的 Iterable 包装成 FromIterableDisposable FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it); observer.onSubscribe(d); if (!d.fusionMode) { d.run(); } } ...... //内部静态类 FromIterableDisposable } 复制代码
同样也将我们自定义的 Observer 给包装成了一个新的 FromIterableDisposable 对象,然后调用 observer.onSubscribe(d) 设置了观察者的onSubscribe方法的回调。所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。最后执行 d.run()。
static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> { //我们创建的 观察者 Observer final Observer<? super T> downstream; //我们传的参数 Iterator final Iterator<? extends T> it; volatile boolean disposed; boolean fusionMode; boolean done; boolean checkNext; FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) { this.downstream = actual; this.it = it; } void run() { boolean hasNext; //循环 Iterator do { // 消息断开后直接返回 if (isDisposed()) { return; } T v; try { v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value"); } catch (Throwable e) { Exceptions.throwIfFatal(e); downstream.onError(e); return; } // 执行 观察者 Observer 的回调 onNext() downstream.onNext(v); // 消息断开后直接返回 if (isDisposed()) { return; } try { hasNext = it.hasNext(); } catch (Throwable e) { Exceptions.throwIfFatal(e); downstream.onError(e); return; } } while (hasNext); // 循环结束执行onComplete() (循环结束isDisposed()都是false,否则在循环中就已经返回了) if (!isDisposed()) { downstream.onComplete(); } } ...... } 复制代码
遍历 数组,和 just() 方式一样直接触发 onNext(),然后返回每项数据
fromArray和多参数just一样,只不过 fromArray 可以传入多于10个的变量,并可传入一个数组
示例1
mDisposables.add(Observable //把 int 装箱成 Integer,所以返回的每个item .fromArray(0,1,2,3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("接收----->"+integer); } })); 复制代码
接收----->0 接收----->1 接收----->2 接收----->3 复制代码
示例2
Integer[] array = {1, 2, 3, 4}; mDisposables.add(Observable // 可传入一个数组 .fromArray(array) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("接收----->"+integer); } })); 复制代码
接收----->0 接收----->1 接收----->2 接收----->3 复制代码
示例3
//参数 int 类型 把整个array作为可变数据组的一个item,所以返回的是数组类型 int[] array = {1, 2, 3, 4}; mDisposables.add(Observable // 可传入一个数组 .fromArray(array) .subscribe(new Consumer<int[]>() { @Override public void accept(int[] ints) throws Exception { //接收到的是数组的地址 System.out.println("接收----->"+ints.length); } })); 复制代码
接收----->[I@6d6f6e28 复制代码
示例3和示例1、2的输入结果不一样,这个就涉及到java的 泛型T 以及 基本数据类型和其对应的包装类 相关的知识点,可自行查阅其他资料。
我的理解是 fromArray(T... items) 泛型T 理解成Object,当传int类型数组时,会把整个array当做一个item对象,但是当传入int 类型的多个变量时会自动装箱成Integer。其他八种基本数据类型一样的。
接下来我们看下fromArray的源码:
public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); } 复制代码
我们可以看到当只有一个参数时,就调用 just(T item),多个参数时调用的和多个参数的just()是一样的,具体的可以查看 just示例和源码解析
mDisposables.add(Observable .fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return 100; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer i) throws Exception { System.out.println("接收----->"+i); } })); 复制代码
接收----->100 复制代码
我们先看下Callable是什么,原来是java.util.concurrent 包下的一个接口,里面只有一个带返回值的方法。 再看 fromCallable
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) { ObjectHelper.requireNonNull(supplier, "supplier is null"); return RxJavaPlugins.onAssembly(new ObservableFromCallable<T>(supplier)); } 复制代码
同样的装配了一个 ObservableFromCallable 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromCallable 类:
public final class ObservableFromCallable<T> extends Observable<T> implements Callable<T> { //我们传进去的 Callable final Callable<? extends T> callable; public ObservableFromCallable(Callable<? extends T> callable) { this.callable = callable; } @Override public void subscribeActual(Observer<? super T> observer) { //通过我们创建的 observer 创建了一个DeferredScalarDisposable对象 DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer); //执行订阅回调 observer.onSubscribe(d); //中断后直接返回 if (d.isDisposed()) { return; } T value; try { //callable.call()就是我们new的 Callable 中 call 方法的返回值 //这里做了非空判断,若如为空,直接抛出异常 value = ObjectHelper.requireNonNull(callable.call(), "Callable returned null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); if (!d.isDisposed()) { observer.onError(e); } else { RxJavaPlugins.onError(e); } return; } d.complete(value); } @Override public T call() throws Exception { return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value"); } } 复制代码
通过上面代码,我们看到,创建了一个 DeferredScalarDisposable 对象,最后执行了d.complete(value)。 接下来我们看 DeferredScalarDisposable 类的 complete 方法:
public final void complete(T value) { int state = get(); if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) { return; } Observer<? super T> a = downstream; if (state == FUSED_EMPTY) { this.value = value; lazySet(FUSED_READY); a.onNext(null); } else { lazySet(TERMINATED); // 执行回调 onNext(),并把new的 Callable 中 call 方法的返回值作为参数 a.onNext(value); } //假如没有执行dispose(),并执行完 onNext 方法后,接着执行onComplete if (get() != DISPOSED) { a.onComplete(); } } 复制代码
根据以上的分析,根据上面的分析,我们得出如下规则:
1、fromCallable 里的返回值就是 onNext 接收的参数。
2、通过 fromCallable() 方式 直接触发 onNext(),并执行 onComplete()。
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(5000); return "返回值"; } }); Observable .fromFuture(futureTask) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); futureTask.run(); } @Override public void onNext(String s) { System.out.println("接收----->" + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("onComplete"); } }); 复制代码
System.out: onSubscribe System.out: 接收----->返回值 System.out: onComplete 复制代码
我们先简单的看下Future 和 FutureTast 。Future类位于java.util.concurrent包下,它也是一个接口。
Future就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
FutureTask实现了RunableFuture接口,同时RunableFuture又继承Future,Runable接口,也就是说FutureTask具备Runbale的run方法执行异步任务,也可以像Future一样能够控制任务的执行。事实上,FutureTask是Future接口的一个唯一实现类。
详细用法查看 Java并发编程:Callable、Future和FutureTask
接下来我们看下 fromFuture 的源码:
同样的装配了一个 ObservableFromFuture 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromFuture 类的:
public void subscribeActual(Observer<? super T> observer) { DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer); observer.onSubscribe(d); if (!d.isDisposed()) { T v; try { v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); if (!d.isDisposed()) { observer.onError(ex); } return; } d.complete(v); } } 复制代码
这个方法也很好理解,DeferredScalarDisposable 类和我们上面说的 fromCallable 的一样的,这里注意这一行代码: v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null");
假如我们不执行 futureTask.run(); 就会一直阻塞。
示例源码
个人Github主页如果对您有帮助,您可以 "Star" 支持一下哦, 谢谢! ^^