原文首发于微信公众号:jzman-blog,欢迎关注交流!
RxJava 是 ReactiveX 在 Java 上的开源的实现,一个用于通过使用可观察序列来进行异步编程和基于事件的程序的库,这是官网的介绍,主要关注点是异步编程和链式调用以及事件序列。
implementation "io.reactivex.rxjava2:rxjava:2.2.3" implementation 'io.reactivex.rxjava2:rxandroid:2.1.0' 复制代码
RxJava 中的几个重要概念是:观察者(Observer) 、被观察者(Observable)和事件序列,事件序列完全由被观察者者自己控制,那么被观察者如果在需要时通知观察者呢,这就需要被观察者与观察者之间建立订阅关系。建立订阅关系后,当被观察者发生变化,观察者就能在第一时间接收被观察者的变化。
在 RxJava2 中观察者(Observer) 的事件回调方法有四个:
//观察者 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { //解除订阅 Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { //发送事件时观察者回调 Log.i(TAG, "onNext--->"+s); } @Override public void onError(Throwable e) { //发送事件时观察者回调(事件序列发生异常) Log.i(TAG, "onError--->"); } @Override public void onComplete() { //发送事件时观察者回调(事件序列发送完毕) Log.i(TAG, "onComplete--->"); } }; 复制代码
//被观察者 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Event1"); emitter.onNext("Event2"); emitter.onComplete(); emitter.onNext("Event3"); } }); 复制代码
//建立观察者与被观察者之间的订阅关系 observable.subscribe(observer); 复制代码
上述代码的输出结果参考如下:
onSubscribe---> onNext--->Event1 onNext--->Event2 onComplete---> 复制代码
显然,由于在 发送完 Event2 之后就调用了 onComplete 方法,之后发送的事件 Event3 将不会被观察者收到。
上面代码还可以这样写,结果是一样的,具体参考如下:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Event1"); emitter.onNext("Event2"); emitter.onComplete(); emitter.onNext("Event3"); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { Log.i(TAG, "onNext--->"+s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上面代码中使用了 Observable 的 create 方法来创建 Observable,并以此来进行相关事件的发送,为帮助理解来看一下官方的关于 create 操作符的示意图:
Observable 中还提供了很多的静态方法来创建 Observable,下文将会介绍这些常用方法。
使用 just 可以创建一个发送指定事件的 Observable,just 发送事件的上限 10,即最多发送 10 个事件,相较 create 在一定程度上简化了处理流程,just 重载的方法如下:
public static <T> Observable<T> just(T item) public static <T> Observable<T> just(T item1, T item2) public static <T> Observable<T> just(T item1, T item2, T item3) public static <T> Observable<T> just(T item1, T item2, T item3, T item4) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) 复制代码
下面是 just 操作符的简单使用:
//just操作符的简单使用 Observable.just("Event1", "Event2", "Event3") .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { Log.i(TAG, "onNext--->" + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码的输出结果如下:
onSubscribe---> onNext--->Event1 onNext--->Event2 onNext--->Event3 onComplete---> 复制代码
来看一下官方的关于 just 操作符的示意图,下面是 just 发送四个事件的示意图,具体如下:
使用 from 相关的操作符可以创建发送数组(array)、集合(Iterable) 以及异步任务(future)的 Observable,可将 from 相关的操作符分为如下几类:
//数组 public static <T> Observable<T> fromArray(T... items) //集合 public static <T> Observable<T> fromIterable(Iterable<? extends T> source) //异步任务 public static <T> Observable<T> fromFuture(Future<? extends T> future) //异步任务+超时时间 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) //异步任务+超时时间+线程调度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) //异步任务+线程调度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) //Reactive Streams中的发布者,使用方式类似create操作符,事件的发送由发布者(被观察者)自行决定 public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher) 复制代码
下面是 fromArray 的使用方式,具体如下:
//fromArray操作符的简单使用 String[] events = {"Event1", "Event2", "Event3"}; Observable.fromArray(events).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { Log.i(TAG, "onNext--->" + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
看一下 fromArray 的官方示意图,具体如下:
下面是 fromIterable 的使用方式,具体如下:
//fromIterable操作符的简单使用 List<String> list = new ArrayList<>(); list.add("Event1"); list.add("Event2"); list.add("Event3"); Observable.fromIterable(list).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { Log.i(TAG, "onNext--->" + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->" + e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
看一下 fromIterable 的官方示意图,具体如下:
上述代码的输出参考如下:
onSubscribe---> onNext--->Event1 onNext--->Event2 onNext--->Event3 onComplete---> 复制代码
Callable 位于 java.util.concurrent 包下,和 Runnable 类似,但是带有返回值,使用 fromCallable 发出的事件是从主线程发出的,如果不订阅则不会执行 call 里面的操作,使用 fromCallable 要注意以下几点:
下面是 fromCallable 的简单使用,参考如下:
//fromCallable操作符的简单使用 Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { //其他操作... return "call"; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { Log.i(TAG, "onNext--->" + s+Thread.currentThread()); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->" + e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述到执行结果如下:
onSubscribe---> onNext--->call onComplete---> 复制代码
看一下 fromCallable 的官方示意图,具体如下:
从上面可知 fromFuture 有四个重载方法,参数中可以指定异步任务、任务超时时间、线程调度器等,先来认识一下 Future 接口,Future 接口位于 java.util.concurrent 包下,主要作用是对 Runnable 和 Callable 的异步任务执行进行任务是否执行的判断、任务结果的获取、具体任务的取消等,而 Runnable 和 Callable 伴随着线程的执行,这就意味着使用 fromFuture 发出的事件是从非 Main 线程发出,如果执行耗时任务要记得使用 subscribeOn 切换订阅线程,下面以 FutureTask 为例来说明 fromFuture 的使用方式。
创建一个 Callable 用来执行异步任务,参考如下:
//异步任务 private class MCallable implements Callable<String> { @Override public String call() throws Exception { Log.i(TAG, "任务执行开始--->"); Thread.sleep(5000); Log.i(TAG, "任务执行结束--->"); return "MCallable"; } } 复制代码
然后,创建一个 FutureTask ,参考如下:
//创建FutureTask MCallable mCallable = new MCallable(); FutureTask<String> mFutureTask = new FutureTask<>(mCallable); 复制代码
然后,使用 Thread 执行上面创建的 Future,参考如下:
//执行FutureTask new Thread(mFutureTask).start(); 复制代码
最后,使用 fromFuture 创建与之对应的 Observeable 并订阅,参考如下:
//fromFuture Observable.fromFuture(mFutureTask) .subscribeOn(Schedulers.io()) //切换订阅线程 .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { Log.i(TAG, "onNext--->" + s+Thread.currentThread()); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->" + e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码的只想结果如下:
任务执行开始---> onSubscribe---> 任务执行结束---> onNext--->MCallable onComplete---> 复制代码
看一下 fromFuture 的官方示意图,下面的示意图是 fromFuture 方法携带一个参数 Future 的示意图,具体如下:
上面的异步任务延时 5 秒,如果使用 fromFuture 的重载方法指定超时时间为 4 秒,参考如下:
//指定超时时间为4s Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io()) //... 复制代码
此时,由于异步任务不能在 4 秒内完成,Observer 会相应的被触发 onError 方法,执行结果参考如下:
任务执行开始---> onSubscribe---> onError--->java.util.concurrent.TimeoutException 任务执行结束---> 复制代码
那么如何取消这个异步任务呢,这也正是 Future 的优点所在,可以随意的取消这个任务,具体参考如下:
//异步任务的取消 public void cancelTask(View view) { if (mFutureTask.isDone()) { Log.i(TAG, "任务已经完成--->"); } else { Log.i(TAG, "任务正在执行--->"); boolean cancel = mFutureTask.cancel(true); Log.i(TAG, "任务取消是否成功--cancel->" + cancel); Log.i(TAG, "任务取消是否成功--isCancelled->" + mFutureTask.isCancelled()); } } 复制代码
下面是在任务执行过程中取消任务的执行结果,参考如下:
任务执行开始---> onSubscribe---> 任务正在执行---> 任务取消是否成功--cancel->true 任务取消是否成功--isCancelled->true onError--->java.util.concurrent.CancellationException 复制代码
这样就取消了正在执行的异步任务,这部分内容更多的是关于 Java Future 相关的知识。
使用 defer 创建 Observable 时,只有在订阅时去才会创建 Observable 并发送相关的事件,下面是 defer 操作符的使用,参考如下:
//defer defer = "old"; Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() { @Override public ObservableSource<String> call() throws Exception { return Observable.just(defer); } }); defer = "new"; observable.subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(String s) { Log.i(TAG, "onNext--->"+s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"+e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码的执行结果如下:
onSubscribe---> onNext--->new onComplete---> 复制代码
显然,最终在订阅之前 Observable 工厂又创建了最新的 Observable,onNext 中接收的数据也是最新的,为了理解 defer 操作符,来看一下官方 defer 操作符的示意图:
使用 empty 操作符可以创建一个不发生任何数据但正常终止的 Observable,参考如下:
//empty Observable.empty().subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(Object o) { Log.i(TAG, "onNext--->"+o); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"+e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码的输出结果如下:
onSubscribe---> onComplete---> 复制代码
为了方便理解 empty 操作符的使用,来看一些 empty 操作符的官方示意图:
使用 never 操作符可以创建一个不发生任何数据也不终止的 Observable,参考如下:
//never Observable.never().subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(Object o) { Log.i(TAG, "onNext--->"+o); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"+e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码的输出结果如下:
onSubscribe---> 复制代码
为了方便理解 never 操作符的使用,来看一些 never 操作符的官方示意图:
timer 操作符可以创建一个带延时的发送固定数值 0 的 Observable,还可以指定线程调度器,timer 重载方法如下:
//延时 public static Observable<Long> timer(long delay, TimeUnit unit) //延时+线程调度器 public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 复制代码
下面是 timer 的使用方式:
//timer Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(Long s) { Log.i(TAG, "onNext--->"+s); Log.i(TAG, "当前线程--->"+Thread.currentThread().getName()); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"+e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码的执行结果如下:
onSubscribe---> //延时3秒收到数据 onNext--->0 当前线程--->RxCachedThreadScheduler-1 onComplete---> 复制代码
为了方便理解 timer 操作符的使用,来看一些 timer 操作符的官方示意图,下面以 timer 指定延时器和线程调度器的方式为例,具体如下:
使用 interval 操作符可以创建一个可以以固定时间间隔发送整数值的一个 Observable,interval 可以指定初始延时时间、时间间隔、线程调度器等,interval 重载方法如下:
//初始延时+时间间隔 public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) //初始延时+时间间隔+线程调度器 public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) //时间间隔 public static Observable<Long> interval(long period, TimeUnit unit) //时间间隔+线程调度器 public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 复制代码
下面是 interval 的使用方式:
//interval Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(Long aLong) { Log.i(TAG, "onNext--->"+aLong); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"+e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码执行后就会以每个 3 秒持续发送值为整数的事件,执行结果如下:
onSubscribe---> onNext--->0 onNext--->1 onNext--->2 ... 复制代码
为了方便理解 interval 操作符的使用,来看一些 interval 操作符的官方示意图,下面以 interval 指定间隔时间和时间单位的方式为例,具体如下:
使用 range 操作符可以创建一个可以发送指定整数范围值的一个 Observable,range 相关的方法有两个,只是数值的范围表示不同,两个方法声明如下:
// int public static Observable<Integer> range(final int start, final int count) // long public static Observable<Long> rangeLong(long start, long count) 复制代码
下面是 range 的使用方式,具体参考如下:
//range Observable.range(1,5).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe--->"); } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext--->"+integer); } @Override public void onError(Throwable e) { Log.i(TAG, "onError--->"+e); } @Override public void onComplete() { Log.i(TAG, "onComplete--->"); } }); 复制代码
上述代码的执行结果如下:
onSubscribe---> onNext--->1 onNext--->2 onNext--->3 onNext--->4 onNext--->5 onComplete---> 复制代码
为了方便理解 range 操作符的使用,来看一些 range 操作符的官方示意图:
这篇文章主要介绍了 RxJava2 相关基础知识以及 RxJava2 中创建型操作符的理解和使用。可以选择关注个人微信公众号:jzman-blog 获取最新更新,一起交流学习!