博客主页
RxJava 的条件操作符主要包括以下几个:
RxJava 的布尔操作符主要包括:
判定 Observable 发射的所有数据是否都满足某个条件
传递一个谓词函数给 all 操作符,这个函数接受原始 Observable 发射的数据,根据计算返回一个布尔值。 all 返回一个只发射单个布尔值的 Observable,如果原始 Observable 正常终止并且每一项数据都满足条件,就返回 true。如果原始 Observabl 的任意一项数据不满足条件,就返回false
Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 10; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: true
判断 Observable 发射的所有数据是否都大于 3
Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 3; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: false
判定一个 Observable 是否发射了一个特定的值
给 contains 传一个指定的值,如果原始 Observable 发射了那个值,那么返回的 Observable 将发射 true,否则发射 false 。与它相关的一个操作符是 isEmpty ,用于判定原始 Observable 是否未发射任何数据。
Observable.just(2, 30, 22, 5, 60, 1) .contains(22) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: true
给定两个或多个 Observable ,它只发射首先发射数据或通知的那个 Observable 的所有数据
当传递多个 Observable 给 amb 时,它只发射其中一个 Observable 数据和通知: 首先发送通知给 amb 的那个 Observable ,不管发射的是一项数据 ,还是一个 onError 或 onCompleted 通知。 amb 忽略和丢弃其他所有 Observables 的发射物。
在 RxJava 中, amb 还有一个类似的操作符 ambWith。 例如, Observable.amb(ol, o2 )和
ol.ambWith(o2)是等价的
在 RxJava 2.x 中, amb 需要传递 Iterable 对象,或者使用 ambArray 来传递可变参数。
Observable.ambArray( Observable.just(1, 2, 3), Observable.just(4, 5, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 1 Next: 2 Next: 3
修改一下代码,第一个 Observable 延迟 ls 后再发射数据
Observable.ambArray( Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS), Observable.just(4, 5, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 4 Next: 5 Next: 6
由于第一个 Observable 延迟发射,因此我们只消费了第二个 Observable 的数据,第一个 Observable 发射的数据就不再处理了。
发射来自原始 Observable 值,如果原始 Observable 没有发射任何值,就发射一个默认值
defaultIfEmpty 简单精确地发射原始 Observable 的值,如果原始 Observable 没有发射任何数据,就正常终止(以 onComplete 形式了),那么 defaultlfEmpty 返回的 Observable 就发射一个我们提供的默认值。
Observable.empty() .defaultIfEmpty(8) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } }); // 执行结果 Next: 8
在 defaultIfEmpty 方法内部,其实调用的是 switchIfEmpty 操作符,源码如下:
public final Observable<T> defaultIfEmpty(T defaultItem) { ObjectHelper.requireNonNull(defaultItem, "defaultItem is null"); return switchIfEmpty(just(defaultItem)); }
defaultIfEmpty 和 switchIfEmpty 的区别是, defaultIfEmpty 操作符只能在被观察者不发送数据时发送一个默认的数据 ,如果想要发送更多数据,则可以使用 switchIfEmpty 操作符,发送自定义的被观察者作为替代。
Observable.empty() .switchIfEmpty(Observable.just(1, 2, 3)) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } }); // 执行结果 Next: 1 Next: 2 Next: 3
判定两个 Observable 是否发射相同的数据序列
传递两个 Observable 给 sequenceEqual 操作符时,它会比较两个 Observable 发射物,如果两个序列相同(相同的数据,相同的顺序,相同的终止状态〉 ,则发射 true 否则发射 false
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5) ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 执行结果 Success: true
将两个 Observable 改成不一致
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5, 6) ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 执行结果 Success: false
sequenceEqual 还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。对于复杂对象的比较,用三个参数的版本更为合适。
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5), new BiPredicate<Integer, Integer>() { @Override public boolean test(Integer integer, Integer integer2) throws Exception { return integer == integer2; } } ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 执行结果 Success: true
丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据
skipUntil 订阅原始的 Observable,但是忽略它的发射物,直到第二个 Observable 发射一项数据那一刻,它才开始发射原始的 Observabl。 skipUntil 默认不在任何特定的调度器上执行。
Observable.intervalRange(1, 9, 0, 1, TimeUnit.MILLISECONDS) .skipUntil(Observable.timer(4, TimeUnit.MILLISECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next: " + aLong); } }); // 执行结果 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Next: 9
上述代码,原始的 Observable 发射 1 到 9 这 9 个数 ,初始延迟时间是 0,每间隔 lms。由于使用 skipUntil,因此它会发射原始 Observable 在 3ms 之后的数据。
丢弃 Observable 发射的数据,直到一个指定的条件不成立。
skipWhile 订阅原始的 Observable ,但是忽略它的发射物,直到指定的某个条件变为 false。它才开始发射原始的 Observable。skipWhile 默认不在任何特定的调度器上执行
Observable.just(1, 2, 3, 4, 5) .skipWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer <= 2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 3 Next: 4 Next: 5
当第二个 Observable 发射了一项数据或者终止时,丢弃原始 Observable 发射的任何数据
takeUntil 订阅并开始发射原始 Observable ,它还监视你提供的第二个 Observable。如果第二个 Observable 发射了一项数据或者发射了一个终止通知,则 takeUntil 返回的 Observable 会停止发射原始 Observable 并终止。
Observable.just(1, 2, 3, 4, 5, 6) .takeUntil(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer == 4; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 1 Next: 2 Next: 3 Next: 4
发射原始 Observable 发射的数据,直到一个指定的条件不成立
takeWhile 发射原始的 Observable 直到某个指定的条件不成立,它会立即停止发射原始 Observable,并终止自己的 Observable。
RxJava 中的 takeWhile 操作符返回一个原始 Observable 行为的 Observable,直到某项数据,指定的函数返回 false ,这个新的 Observable 就会发射 onComplete 终止通知
Observable.just(1, 2, 3, 4, 5, 6) .takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer <= 2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next: 1 Next: 2 Complete.