转载

RxJava2-from操作符使用和分析

List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
mDisposables.add(Observable
        .fromIterable(list)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("接收----->"+integer);
            }
        }));
复制代码

输出日志:

接收----->0
接收----->1
接收----->2
接收----->3
复制代码

分析

我们通过源码可以看到 fromIterable 通过 Iterable 构造了一个 ObservableFromIterable 然后返回。

public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
	// 通过 Iterable 构造了一个 ObservableFromIterable 返回
    return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
}
复制代码

ObservableFromIterable 继承 Observable。

接下来我们回到订阅过程,其订阅过程前面的内容跟上一节分析的是一样的,就不重复了。还记得订阅过程中 Observable 类的 subscribeActual() 是个抽象方法吗?他的真正实现是在 ObservableFromIterable 中,所以我们来看下 subscribeActual 方法:

public final class ObservableFromIterable<T> extends Observable<T> {
    final Iterable<? extends T> source;
    public ObservableFromIterable(Iterable<? extends T> source) {
        this.source = source;
    }

    @Override
    public void subscribeActual(Observer<? super T> observer) {

        ......
		// 把我们创建的 Observer 和 传的 Iterable 包装成 FromIterableDisposable
        FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
        observer.onSubscribe(d);

        if (!d.fusionMode) {
            d.run();
        }
    }
	......
	//内部静态类 FromIterableDisposable
}
复制代码

同样也将我们自定义的 Observer 给包装成了一个新的 FromIterableDisposable 对象,然后调用 observer.onSubscribe(d) 设置了观察者的onSubscribe方法的回调。所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。最后执行 d.run()。

static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> {
	//我们创建的 观察者 Observer
    final Observer<? super T> downstream;
	//我们传的参数 Iterator
    final Iterator<? extends T> it;

    volatile boolean disposed;

    boolean fusionMode;

    boolean done;

    boolean checkNext;

    FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
        this.downstream = actual;
        this.it = it;
    }

    void run() {
        boolean hasNext;
		//循环 Iterator
        do {
			// 消息断开后直接返回
            if (isDisposed()) {
                return;
            }
            T v;

            try {
                v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                downstream.onError(e);
                return;
            }
			// 执行 观察者 Observer 的回调 onNext()
            downstream.onNext(v);
			// 消息断开后直接返回
            if (isDisposed()) {
                return;
            }
            try {
                hasNext = it.hasNext();
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                downstream.onError(e);
                return;
            }
        } while (hasNext);
		// 循环结束执行onComplete() (循环结束isDisposed()都是false,否则在循环中就已经返回了)
        if (!isDisposed()) {
            downstream.onComplete();
        }
    }

 	......
}
复制代码

fromArray():

遍历 数组,和 just() 方式一样直接触发 onNext(),然后返回每项数据

fromArray和多参数just一样,只不过 fromArray 可以传入多于10个的变量,并可传入一个数组

示例

示例1

mDisposables.add(Observable
        //把 int 装箱成 Integer,所以返回的每个item
        .fromArray(0,1,2,3)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("接收----->"+integer);
            }
        }));
复制代码
接收----->0
接收----->1
接收----->2
接收----->3
复制代码

示例2

Integer[] array = {1, 2, 3, 4};
mDisposables.add(Observable
        // 可传入一个数组
        .fromArray(array)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("接收----->"+integer);
            }
        }));
复制代码
接收----->0
接收----->1
接收----->2
接收----->3
复制代码

示例3

//参数 int 类型 把整个array作为可变数据组的一个item,所以返回的是数组类型
int[] array = {1, 2, 3, 4};
mDisposables.add(Observable
        // 可传入一个数组
        .fromArray(array)
        .subscribe(new Consumer<int[]>() {
            @Override
            public void accept(int[] ints) throws Exception {
                //接收到的是数组的地址
                System.out.println("接收----->"+ints.length);
            }
        }));
复制代码
接收----->[I@6d6f6e28
复制代码

分析

示例3和示例1、2的输入结果不一样,这个就涉及到java的 泛型T 以及 基本数据类型和其对应的包装类 相关的知识点,可自行查阅其他资料。

我的理解是 fromArray(T... items) 泛型T 理解成Object,当传int类型数组时,会把整个array当做一个item对象,但是当传入int 类型的多个变量时会自动装箱成Integer。其他八种基本数据类型一样的。

接下来我们看下fromArray的源码:

public static <T> Observable<T> fromArray(T... items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
        return empty();
    } else
    if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
复制代码

我们可以看到当只有一个参数时,就调用 just(T item),多个参数时调用的和多个参数的just()是一样的,具体的可以查看 just示例和源码解析

fromCallable():

示例

mDisposables.add(Observable
        .fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 100;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer i) throws Exception {
                System.out.println("接收----->"+i);
            }
        }));
复制代码
接收----->100
复制代码

分析

我们先看下Callable是什么,原来是java.util.concurrent 包下的一个接口,里面只有一个带返回值的方法。 再看 fromCallable

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) {
    ObjectHelper.requireNonNull(supplier, "supplier is null");
    return RxJavaPlugins.onAssembly(new ObservableFromCallable<T>(supplier));
}
复制代码

同样的装配了一个 ObservableFromCallable 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromCallable 类:

public final class ObservableFromCallable<T> extends Observable<T> implements Callable<T> {
	//我们传进去的 Callable
    final Callable<? extends T> callable;
    public ObservableFromCallable(Callable<? extends T> callable) {
        this.callable = callable;
    }

    @Override
    public void subscribeActual(Observer<? super T> observer) {
		//通过我们创建的 observer 创建了一个DeferredScalarDisposable对象
        DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer);
		//执行订阅回调
        observer.onSubscribe(d);
		//中断后直接返回
        if (d.isDisposed()) {
            return;
        }
        T value;
        try {
			//callable.call()就是我们new的 Callable 中 call 方法的返回值
			//这里做了非空判断,若如为空,直接抛出异常
            value = ObjectHelper.requireNonNull(callable.call(), "Callable returned null");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            if (!d.isDisposed()) {
				observer.onError(e);
            } else {
                RxJavaPlugins.onError(e);
            }
            return;
        }
        d.complete(value);
    }

    @Override
    public T call() throws Exception {
        return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
    }
}
复制代码

通过上面代码,我们看到,创建了一个 DeferredScalarDisposable 对象,最后执行了d.complete(value)。 接下来我们看 DeferredScalarDisposable 类的 complete 方法:

public final void complete(T value) {
    int state = get();
    if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) {
        return;
    }
    Observer<? super T> a = downstream;
    if (state == FUSED_EMPTY) {
        this.value = value;
        lazySet(FUSED_READY);
        a.onNext(null);
    } else {
        lazySet(TERMINATED);
		// 执行回调 onNext(),并把new的 Callable 中 call 方法的返回值作为参数
        a.onNext(value);
    }
	//假如没有执行dispose(),并执行完 onNext 方法后,接着执行onComplete
    if (get() != DISPOSED) {
        a.onComplete();
    }
}
复制代码

根据以上的分析,根据上面的分析,我们得出如下规则:

1、fromCallable 里的返回值就是 onNext 接收的参数。

2、通过 fromCallable() 方式 直接触发 onNext(),并执行 onComplete()。

fromFuture():

示例

FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        Thread.sleep(5000);
        return "返回值";
    }
});

Observable
        .fromFuture(futureTask)
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
                futureTask.run();
            }

            @Override
            public void onNext(String s) {
                System.out.println("接收----->" + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
复制代码
System.out: onSubscribe
System.out: 接收----->返回值
System.out: onComplete
复制代码

分析

我们先简单的看下Future 和 FutureTast 。Future类位于java.util.concurrent包下,它也是一个接口。

Future就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

FutureTask实现了RunableFuture接口,同时RunableFuture又继承Future,Runable接口,也就是说FutureTask具备Runbale的run方法执行异步任务,也可以像Future一样能够控制任务的执行。事实上,FutureTask是Future接口的一个唯一实现类。

详细用法查看 Java并发编程:Callable、Future和FutureTask

接下来我们看下 fromFuture 的源码:

同样的装配了一个 ObservableFromFuture 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromFuture 类的:

public void subscribeActual(Observer<? super T> observer) {
    DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer);
    observer.onSubscribe(d);
    if (!d.isDisposed()) {
        T v;
        try {
            v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null");
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            if (!d.isDisposed()) {
                observer.onError(ex);
            }
            return;
        }
        d.complete(v);
    }
}
复制代码

这个方法也很好理解,DeferredScalarDisposable 类和我们上面说的 fromCallable 的一样的,这里注意这一行代码: v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null");

假如我们不执行 futureTask.run(); 就会一直阻塞。

欢迎转载,但是请注明出处

示例源码

个人Github主页

如果对您有帮助,您可以 "Star" 支持一下哦, 谢谢! ^^

原文  https://juejin.im/post/5d63492351882554a13f8e5e
正文到此结束
Loading...