RxJava 2.x 提供了五种模式,如下所示。
模式/类型 | 描述 | 接口 | 消费者 |
---|---|---|---|
Observable | 支持 0…N个数据,不支持背压 | io.reactivex.Observable | Observer |
Flowable | 支持 0…N个数据 支持背压 | io.reactivex.Flowable | Subscriber |
Single | 只支持1个数据 | io.reactivex.Single | SingleObserver |
Completable | 不支持数据 | io.reactivex.Completable | CompletableObserver |
Maybe | 只支持0或1个数据 | io.reactivex.Maybe | MaybeObserver |
创建 Observable 非常容易,我们首先需要创建一个 Observable 作为被观察者,然后在创建一个 Observer 作为观察者,然后通过 subscribe() 进行订阅。
public class ObservableDemo { public static void main(String[] args) { Observable<String> observable = Observable.create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }); Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("Observer.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Observer.onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("Observer.onError"); } @Override public void onComplete() { System.out.println("Observer.onComplete"); } }; observable.subscribe(observer); } }
我们可以使用 create 创建一个 Observable,它拥有 onNext, onError, onCompleted 方法。其中,onNext用于发射数据项,可以多次调用,每调用一次发射一条数据, onError 或 onCompleted 只能调用一次,onError发射错误事件,除非使用 retry() 操作符来截获错误,否则事件流通常会终止。onCompleted 传递一个完成事件,表示不会再发生onNext调用。两者之间互斥,此后就不能再调用该 Observable 的其他方法。
这里,我们也可以改造成链式调用。
public class ObservableDemo2 { public static void main(String[] args) { Observable.<String>create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("Observer.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Observer.onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("Observer.onError"); } @Override public void onComplete() { System.out.println("Observer.onComplete"); } }); } }
阅读 RxJava 2.x 源码 io.reactivex.Observable,我们可以知道 subscribe 具有很多重载的方法。有兴趣的读者,可以深入了解下。
我们可以省略 onComplete(),只实现 onNext() 和 onError()。这将不再对 onComplete() 执行任何操作。我们甚至可以忽略 onError(),只指定 onNext()。但是,不实现 onError() 是在生产环境中应该避免的事情。在事件流的任何地方发生的错误都将传播到 onError() 进行处理,然后终止事件流。如果我们没有为 onError() 指定一个操作,那么该错误将不会处理。当然,如果出现错误,我们可以先尝试使用 retry() 操作符恢复并重新订阅可观察到的数据项。
public final Disposable subscribe() public final Disposable subscribe(Consumer<? super T> onNext) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) public final void subscribe(Observer<? super T> observer)
这里,我们简单来了解一下 subscribe(Consumer<? super T> onNext)
的使用吧。
public class ObservableDemo3 { public static void main(String[] args) { Observable.<String>create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }).subscribe(System.out::println); } }
注意, onNext, onError, onCompleted 方法不需要直接推送到最终的观察者,它们可以通过 map() 和 filter() 等操作符创建新的 Observable 然后继续发送。
Flowable 是唯一支持背压的模式,它的用法与 Observable 非常相似。(关于背压,笔者会在之后的文章中进行讲解。)
public class FlowableDemo { public static void main(String[] args) { Flowable.<String>create(e -> { e.onNext("Hello world!"); e.onNext("Hello World"); e.onComplete(); e.onNext("Hello World"); }, BackpressureStrategy.MISSING).subscribe(new Subscriber<String>(){ @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscriber.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Subscriber.onNext: " + s); } @Override public void onError(Throwable throwable) { System.out.println("Subscriber.onError"); } @Override public void onComplete() { System.out.println("Subscriber.onComplete"); } }); } }
阅读 RxJava 2.x 源码 io.reactivex.Flowable
,我们可以知道 subscribe 也具有很多重载的方法。
public final Disposable subscribe() public final Disposable subscribe(Consumer<? super T> onNext) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) public final void subscribe(FlowableSubscriber<? super T> s) public final void subscribe(Subscriber<? super T> s)
Single 的工作就像 Observable 一样,但是它只有 onSuccess
?事件和 onError
事件,并且它有自己的 SingleObserver
接口。 onSuccess
整合了 onNext
和 onComplete
事件,因此,这里 onSuccess
只能发送一个数据,换句话说,即使多次发送也不会产生效果。
public class SingleDemo { public static void main(String[] args) { Single.<String>create(e -> { e.onSuccess("success"); e.onSuccess("success"); }).subscribe(new SingleObserver<String>(){ @Override public void onSubscribe(Disposable d) { System.out.println("SingleObserver.onSubscribe"); } @Override public void onSuccess(String s) { System.out.println("SingleObserver.onSuccess:"+s); } @Override public void onError(Throwable e) { System.out.println("SingleObserver.onError"); } }); } }
从控制台的打印结果可以看出,即使多次发送“success”,但是只会消费一次。
阅读 RxJava 2.x 源码 io.reactivex.Single
,我们可以知道 subscribe 也具有很多重载的方法。
public final Disposable subscribe() public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) public final Disposable subscribe(Consumer<? super T> onSuccess) public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) public final void subscribe(SingleObserver<? super T> subscriber)
这里,我们简单来了解一下 subscribe(Consumer<? super T> onSuccess)
的使用吧。
public class SingleDemo2 { public static void main(String[] args) { Single.<String>create(e -> { e.onSuccess("success"); }).subscribe(System.out::println); } }
我们可以通过 toObservable 转换成一个 Observable 对象。
Single.just("success").toObservable().subscribe(System.out::println);
Completable 不发送数据,只有 onComplete
事件和 onError
事件。
public class CompletableDemo { public static void main(String[] args) { Completable.create(e -> { e.onComplete(); }) .subscribe(System.out::println); } }
此外,我们可以通过 complete()
快速创建一个 Completable 对象,它会立即调用 onComplete
事件。
Completable.complete().subscribe(System.out::println);
或者,也可以通过 fromAction()
或 fromRunnable()
在调用 onComplete
事件之前执行指定的操作。
Completable.fromAction(System.out::println).subscribe();
Maybe 结合了 Single 和 Completable 特性。Maybe 包含 onSuccess
、 onError
、 onComplete
事件。 这里, onSuccess
可以发送 0 ~ 1 个数据,换句话说,即使多次发送也不会产生效果。如果调用 onComplete
事件,就会停止发送数据。
public class MaybeDemo { public static void main(String[] args) { Maybe.<String>create(e -> { e.onComplete(); e.onSuccess("success"); e.onSuccess("success"); }).subscribe(System.out::println); } }