博客主页
RxJava 的过滤操作符主要包括以下几种:
只发射第 一 项(或者满足某个条件的第一项)数据
如果只对 Observable 发射的第一项数据,或者满足某个条件的第一项数据感兴趣,那么就可以使用 first 操作符。
在 RxJava 2.x 中,使用 first() 需要一个默认的 Item ,对于 Observable 而言,使用了 first()会返回 Single 类型。
public final Single<T> first(T defaultItem) { return elementAt(0L, defaultItem); } Observable.just(3, 4, 5) .first(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 执行结果 Next-> 3
如果 Observable 不发射任何数据,那么 first 操作符的默认值就起了作用。
Observable.<Integer>empty() .first(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 执行结果 Next-> 8
在 R.Java 2.x 中,还有 firstElement 操作符表示只取第一个数据,没有默认值。 firstOrError 操作符表示要么能取到第一个数据,要么执行 onError 方法,它们分别返回 Maybe 类型和 Single 类型。
只发射最后一项(或者满足某个条件的最后一项)数据
如果只对 Observable 发射的最后一项数据, 或者满足某个条件的最后一项数据感兴趣,那么就可以使用 last 操作符。
last 操作符跟 first 操作符类似,需要一个默认的 Item ,也是返回 Single 类型。
public final Single<T> last(T defaultItem) { ObjectHelper.requireNonNull(defaultItem, "defaultItem is null"); return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem)); } Observable.just(3, 4, 5) .last(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 执行结果 Next-> 5
在 RxJava 2.x 中,有 lastElement 操作符和 lastOrError 操作符。
只发射前面的 N 项数据
使用 take 操作符可以只修改 Observable 的行为,返回前面的 N 项数据,发射完成通知,忽略剩余的数据
Observable.just(1, 2, 3, 4, 5) .take(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 1 Next-> 2 Next-> 3 Complete.
如果对一个 Observable 使用 take 操作符,而那个 Observabl 发射的数据少于 N 项,那么 take 操作符生成的 Observable 就不会抛出异常或者发射 Error 通知,而是仍然会发射那些数据
Observable.just(1, 2, 3, 4, 5) .take(6) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Next-> 5 Complete.
take 有一个重载方法能够接受一个时长而不是数量参数。它会丢掉发射 Observable 开始的那段时间发射的数据,时长和时间单位通过参数指定。
Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS) .take(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 0 Next-> 1 Next-> 2 Complete.
上述代码使用了 intervalRange 操作符表示每隔 ls 会发射一个数据,它们从 0 开始到 9 结束,发射 10 个数据。由于在这里使用了 take 操作符,最后只打印前 3 个数据.
take 的这个重载方法默认在 computation 调度器上执行,也可以使用参数来指定其他调度器。
发射 Observable 发射的最后 N 项数据
使用 takeLast 操作符修改原始 Observable,我们可以只发射 Observable 发射的最后 N 项数据,忽略前面的数据。
Observable.just(1, 2, 3, 4, 5) .takeLast(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 3 Next-> 4 Next-> 5 Complete.
同样,如果对一个 Observable 使用 takeLast(n) 操作符,而那个 Observable 发射的数据少于 N 项,那么 takeLast 操作符生成的 Observable 不会抛出异常或者发射 onError 通知,而是仍然发射那些数据。
takeLast 也有一个重载方法能够接受一个时长而不是数量参数。它会发射在原始 Observable 生命周期内最后一段时间发射的数据,时长和时间单位通过参数指定。
Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS) .takeLast(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 7 Next-> 8 Next-> 9 Complete.
抑制 Observable 发射的前 N 项数据
使用 skip 操作符,可以忽略 Observable 发射的前 N 项数据,只保留之后的数据
Observable.just(1, 2, 3, 4, 5) .skip(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 4 Next-> 5 Complete.
skip 有一个重载方法能够接受一个时长而不是数量参数。它会丢弃原始 Observable 开始那段时间发射的数据,时长和时间单位通过参数指定。
Observable.interval(1, TimeUnit.SECONDS) .skip(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 3 Next-> 4 Next-> 5 Next-> 6 Next-> 7 Next-> 8 Next-> 9 ......
抑制 Observable 发射的后 N 项数据
使用 skipLast 操作符修改原始 Observable,可以忽略 Observable 发射后 N 项数据,只保留前面的数据。
Observable.just(1, 2, 3, 4, 5) .skipLast(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 1 Next-> 2 Complete.
同样, skipLast 也有一个重载方法接受一个时长而不是数量参数。它会丢弃在原始 Observable 生命周期最后一段时间内发射的数据,时长和时间单位通过参数指定。
Observable.interval(1, TimeUnit.SECONDS) .skipLast(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 0 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Next-> 5 Next-> 6 ......
只发射第 N 项数据
elementAt 操作符获取原始 Observable 发射的数据序列指定索引位置的数据项,然后当作自己的唯一数据发射
它传递一个基于 0 的索引值,发射原始 Observable 数据序列对应索引位置的值,如果传递给 elementAt 的值为 5,那么它会发射第 6 项数据。如果传递的是一个负数,则将会抛出 IndexOutOfBoundsException 异常。
Observable.just(1, 2, 3, 4, 5) .elementAt(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 3
elementAt(index)返回一个 Maybe 类型。
public final Maybe<T> elementAt(long index) { if (index < 0) { throw new IndexOutOfBoundsException("index >= 0 required but it was " + index); } return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<T>(this, index)); }
如果原始 Observable 的数据项数小于 index+1 ,那么会调用 onComplete 方法(在 RxJava l.x 中也会抛出一个 IndexOutOfBoundsException 异常)。所以 elementAt 还提供了一个带默认值的方法,它返回一个 Single 类型。
Observable.just(1, 2, 3, 4, 5) .elementAt(8, 10) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Success: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: 10
如果 index 超出了索引范围,那么取默认值
不发射任何数据,只发射 Observable 终止通知
ignoreElements 操作符抑制原始 Observable 发射的所有数据,只允许它的终止通知( onError 或 onComplete )通过。它返回 Completable 类型
如果我们不关心一个 Observable 发射的数据,但是希望在它完成时或遇到错误终止时收到通知,那么就可以对 Observable 使用 gnoreElements 操作符,它将确保永远不会调用观察者的 onNext 方法。
Observable.just(1, 2, 3, 4, 5) .ignoreElements() .subscribe(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }); // 执行结果 Complete.
过滤掉重复的数据项
distinct 的过滤规则是: 只允许还没有发射过的数据项通过
Observable.just(1, 2, 2, 3, 4, 4, 4, 5) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->1 Next->2 Next->3 Next->4 Next->5 Complete.
distinct 还能接受 Function 作为参数,这个函数根据原始 Observable 发射的数据项产生一个 Key ,然后比较这些 Key 而不是数据本身,来判定两个数据是否不同。
与 distinct 类似的是 distinctUntilChanged 操作符,该操作符与 distinct 的区别是:它只判定一个数据和它的直接前驱是否不同
Observable.just(1, 2, 1, 2, 3, 4, 4, 4, 5) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->1 Next->2 Next->1 Next->2 Next->3 Next->4 Next->5 Complete.
只发射通过谓词测试的数据项
filter 操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。
Observable.just(2, 30, 22, 5, 60, 1) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 10; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->30 Next->22 Next->60 Complete.
仅在过了一段指定的时间还没发射数据的才发射一个数据
debounce 操作符会过滤掉发射速率过快的数据项
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { if (emitter.isDisposed()) return; try { for (int i = 0; i < 10; i++) { emitter.onNext(i); Thread.sleep(i * 100); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }).debounce(500, TimeUnit.MILLISECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->6 Next->7 Next->8 Next->9 Complete.
debounce 还有另外一种形式,使用一个 Function 函数来限制发送的数据。
跟 debounce 类似的是由throttleWithTimeout 操作符,它与只使用时间参数来限流的 debounce 的功能相同。