博客主页
RxJava 使用通常需要三步:
Observable 字面意思是被观察者,使用 RxJava 需要创建一个被观察者,它会决定什么时候触发事件以及触发怎样的事件。有点类似上游发送命令,可以在这里决定异步操作模块的顺序和异步操作模块的次数。
Observer 即观察者,它可以在不同的线程中执行任务。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,可以在未来某个时刻响应 Observable 的通知,而不需要阻塞等待 Observable 发射数据。
创建了 Observable 巳和 Observer 之后,我们还需要使用 subscribe() 方法将它们连接起来,这样整个上下游就能衔接起来实现链式调用。
Observable.just("Hello World!").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "accept: " + s); } });
just() 是 RxJava 的创建操作符,用于创建一个 Observable,Consumer 是消费者,用于接收单个值。
subscribe 有多个重载的方法。
一个重载方法的版本,subscribe(onNext, onError, onComplete)
Observable.just("Hello World!").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error-> " + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> Hello World! Complete.
onComplete 是一个 Action 它与 Consumer 的区别如下:
再来看一个重载方法的版本, subscribe(onNext, onError, onComplete, onSubscribe)
Observable.just("Hello World!").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error-> " + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }, new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.d(TAG, "subscribe"); } }); // 执行结果 subscribe Next-> Hello World! Complete.
从打印结果可知:先执行了onSubscribe 再执行了 onNext和onComplete
在RxJava 2 中, Observable 不再支持订阅 Subscriber ,而是需要使用 Observer 作为观察者
Observable.just("Hello World!").subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); } @Override public void onNext(String s) { Log.d(TAG, "Next-> " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "Error-> " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "Complete."); } }); // 执行结果 subscribe Next-> Hello World! Complete.
在RxJava 中, 被观察者、观察者、subscribe()方法三者缺一不可。只有使用了 subscribe(),被观察者才会开始发送数据.
RxJava 2 的5种观察者模式:
5种观察者模式的描述,只有 Flowable 支持背压,如果有需要背压的情况,则必须使用 Flowable
do 操作符可以给 Observable 生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段时,这些回调就会被触发。在 RxJava 中包含了很多的 doXXX 操作符。
Observable.just("Hello World!") .doOnNext(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "doOnNext-> " + s); } }) .doAfterNext(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "doAfterNext-> " + s); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doOnComplete."); } }) // 订阅之后回调方法 .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.d(TAG, "doOnSubscribe"); } }) .doAfterTerminate(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doAfterTerminate"); } }) .doFinally(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doFinally"); } }) // Observable 每发射一个数据就会触发这个回调,不仅包括onNext,还包括onComplete和onError .doOnEach(new Consumer<Notification<String>>() { @Override public void accept(Notification<String> stringNotification) throws Exception { Log.d(TAG, "doOnEach-> " + (stringNotification.isOnNext() ? "onNext" : stringNotification.isOnComplete() ? "onComplete" : "onError")); } }) // 订阅后可以取消订阅 .doOnLifecycle(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.d(TAG, "doOnLifecycle::onSubscribe-> " + disposable.isDisposed()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "doOnLifecycle::onDispose"); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "subscribe-> " + s); } }); // 执行结果 doOnSubscribe doOnLifecycle::onSubscribe-> false doOnNext-> Hello World! doOnEach-> onNext subscribe-> Hello World! doAfterNext-> Hello World! doOnComplete. doOnEach-> onComplete doFinally doAfterTerminate
在 RxJava 中, Observable 有 Hot 和 Cold 之分。
Hot Observable 无论有没有观察者进行订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时(多个观察者进行订阅时) , Hot Observable 与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。
Cold Observable 是只有观察者订阅了,才开始执行发射数据流的代码。井且 Cold Observable 和 Observer 只能是一对一的关系 。当有多个不同的订阅者时,消息是重新完整发送的。也就是说,对 Cold Observable ,有多个 Observer 的时候,它们各自的事件是独立的。
Cold Hot 区别:
Observable 的 just、creat、range、fromXXX 等操作符都能生成 Cold Observable
Consumer<Long> subscriber1 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "subscriber1-> " + aLong); } }; Consumer<Long> subscriber2 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber2-> " + aLong); } }; Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(final ObservableEmitter<Long> emitter) throws Exception { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { emitter.onNext(aLong); } }); } }).observeOn(Schedulers.newThread()); observable.subscribe(subscriber1); observable.subscribe(subscriber2); // 执行结果 subscriber2-> 0 subscriber1-> 0 subscriber1-> 1 subscriber2-> 1 subscriber1-> 2 subscriber2-> 2 subscriber2-> 3 subscriber1-> 3 subscriber1-> 4 subscriber2-> 4 subscriber1-> 5 subscriber2-> 5 subscriber2-> 6 subscriber1-> 6 subscriber1-> 7 subscriber2-> 7 subscriber2-> 8 subscriber1-> 8 subscriber1-> 9 subscriber2-> 9
subscriber1 和 subscriber2 结果并不一定是相同的,它们二者是完全独立的。create 操作符创建的Observable 是 Cold Observable
使用 publish 操作符 ,可以让 Cold Observable 转换成 Hot Observable,它将原先 Observable
转换 ConnectableObservable
Consumer<Long> subscriber1 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "subscriber1-> " + aLong); } }; Consumer<Long> subscriber2 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber2-> " + aLong); } }; Consumer<Long> subscriber3 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber3-> " + aLong); } }; ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(final ObservableEmitter<Long> emitter) throws Exception { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { emitter.onNext(aLong); } }); } }).observeOn(Schedulers.newThread()).publish(); // 需要调用connect()才能真正执行 observable.connect(); // 订阅 observable.subscribe(subscriber1); observable.subscribe(subscriber2); try { Thread.sleep(20L); } catch (InterruptedException e) { e.printStackTrace(); } observable.subscribe(subscriber3); // 执行结果 subscriber1-> 0 subscriber2-> 0 subscriber1-> 1 subscriber2-> 1 subscriber3-> 1 subscriber1-> 2 subscriber2-> 2 subscriber3-> 2 subscriber1-> 3 subscriber2-> 3 subscriber3-> 3 subscriber1-> 4 subscriber2-> 4 subscriber3-> 4 subscriber1-> 5 subscriber2-> 5 subscriber3-> 5 subscriber1-> 6 subscriber2-> 6 subscriber3-> 6 subscriber1-> 7 subscriber2-> 7 subscriber3-> 7 subscriber1-> 8 subscriber2-> 8 subscriber3-> 8 subscriber1-> 9 subscriber2-> 9 subscriber3-> 9 subscriber1-> 10 subscriber2-> 10 subscriber3-> 10 subscriber1-> 11 subscriber2-> 11 subscriber3-> 11
多个订阅的 subscriber (或者说观察者)共享同一事件。在这里,ConnectableObservable 是线程安全的。
Subject 和 Processor 的作用相同。Processor 是 RxJava 2.x 新增的类,继承自 Flowable, 支持背压控制( Back Presure ),而 Subject 则不支持背压控制。
Consumer<Long> subscriber1 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "subscriber1-> " + aLong); } }; Consumer<Long> subscriber2 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber2-> " + aLong); } }; Consumer<Long> subscriber3 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber3-> " + aLong); } }; Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(final ObservableEmitter<Long> emitter) throws Exception { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { emitter.onNext(aLong); } }); } }).observeOn(Schedulers.newThread()); PublishSubject<Long> subject = PublishSubject.create(); observable.subscribe(subject); observable.subscribe(subscriber1); observable.subscribe(subscriber2); try { Thread.sleep(20L); } catch (InterruptedException e) { e.printStackTrace(); } subject.subscribe(subscriber3); // 执行结果与上面使用 publish 操作符相同。
Subject 既是 Observable ,又是 Observer (Subscriber)。可以从 Subject 源码上看到 ,继承自 Observable ,实现 Observer
// Subject源码 public abstract class Subject<T> extends Observable<T> implements Observer<T> { // ...省略 }
Subject 作为观察者,可以订阅目标 Cold Observable ,使对方开始发送事件。同时它又作为 Observable 转发或者发送新的事件,让 Cold Observable 借助 Subject 转换为 Hot Observable
Subject 井不是线程安全的,如果想要其线程安全,需要调用 toSerialized() 方法(在RxJava 1.x 中还可以用 SerializedSubject 代替 Subject ,但是在 RxJava 2.x 之后 SerializedSubject 不再是 public class)
然而,很多基于 EventBus 改造的 RxBus 并没有这么做。这样的做法是非常危险的,会遇到并发的情况。
在 ReactiveX 官网的解释是:make a Connectable Observable behave like an ordinary Observable
RefCount 操作符把一个可连接的 Observable 连接和断开的过程自动化了。它操作一个可连接的Observable 返回一个普通的 Observable 。当第一个订阅者/观察者订阅这个 Observable 时,RefCount 连接到下层的可连接 Observable。 RefCount 跟踪有多少个观察者订阅它,直到最后一个观察者完成,才断开与下层可连接 Observable 的连接。
如果所有的订阅者/观察者都取消订阅了,则数据流停止:如果重新订阅,则重新开始数据流。
Consumer<Long> subscriber1 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "subscriber1-> " + aLong); } }; Consumer<Long> subscriber2 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber2-> " + aLong); } }; ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(final ObservableEmitter<Long> emitter) throws Exception { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { emitter.onNext(aLong); } }); } }).observeOn(Schedulers.newThread()).publish(); connectableObservable.connect(); Observable<Long> observable = connectableObservable.refCount(); Disposable disposable1 = observable.subscribe(subscriber1); Disposable disposable2 = observable.subscribe(subscriber2); try { Thread.sleep(20L); } catch (InterruptedException e) { e.printStackTrace(); } disposable1.dispose(); disposable2.dispose(); Log.d(TAG, "重新开始数据流"); disposable1 = observable.subscribe(subscriber1); disposable2 = observable.subscribe(subscriber2); // 执行结果 subscriber1-> 0 subscriber2-> 0 subscriber1-> 1 subscriber2-> 1 重新开始数据流 subscriber1-> 0 subscriber2-> 0 subscriber1-> 1 subscriber2-> 1
如果不是所有的订阅者/观察者都取消了订阅 ,而只是部分取消,则部分的订阅者/观察者重新开始订阅时,不会从头开始数据流
Consumer<Long> subscriber1 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "subscriber1-> " + aLong); } }; Consumer<Long> subscriber2 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber2-> " + aLong); } }; Consumer<Long> subscriber3 = new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, " subscriber3-> " + aLong); } }; ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(final ObservableEmitter<Long> emitter) throws Exception { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { emitter.onNext(aLong); } }); } }).observeOn(Schedulers.newThread()).publish(); connectableObservable.connect(); Observable<Long> observable = connectableObservable.refCount(); Disposable disposable1 = observable.subscribe(subscriber1); Disposable disposable2 = observable.subscribe(subscriber2); observable.subscribe(subscriber3); try { Thread.sleep(20L); } catch (InterruptedException e) { e.printStackTrace(); } disposable1.dispose(); disposable2.dispose(); Log.d(TAG, "subscriber1 和 subscriber2 重新订阅"); disposable1 = observable.subscribe(subscriber1); disposable2 = observable.subscribe(subscriber2); // 执行结果 subscriber1-> 0 subscriber2-> 0 subscriber3-> 0 subscriber1-> 1 subscriber2-> 1 subscriber3-> 1 subscriber1 和 subscriber2 重新订阅 subscriber3-> 2 subscriber1-> 2 subscriber2-> 2 subscriber3-> 3 subscriber1-> 3 subscriber2-> 3 subscriber3-> 4 subscriber1-> 4 subscriber2-> 4
subscriber1 和 subscriber2 先取消了订阅,subscriber3 井没有取消订阅。之后,subscriber1 和 subscriber2 又重新订阅。最终 subscriber1、subscriber2、subscriber3 的值保持一致
share 操作符封装了 publish().refCount()调用,可以看其源码
public final Observable<T> share() { return publish().refCount(); }
在 RxJava 2.x 中,Observable 不再支持背压(Back Pressure),而改由 Flowable 来支持非阻塞式的背压。 Flowable 是 RxJava 2.x 新增的被观察者, Flowable 可以看成 Observable 新的实现,它支持背压,同时实现 Reactive Streams 的 Publisher 接口。 Flowable 所有的操作符强制支持背压,不过 Flowable 中的操作符大多与 Observable 类似。
(1) 使用 Observable 较好的场景
(2) 使用 Flowable 较好的场景
通常情况下 ,如果想要使用 RxJava, 首先会想到的是使用 Observable ,如果考虑到背压的情况,则在 RxJava 2.x 下会使用 Flowable 。除 Observable 和 Flowable 外,在 RxJava 2.x 中还有3种类型的被观察者 Single、Completable 和 Maybe
从 SingleEmitter 的源码可以看出,Single 只有 onSuccess 和 onError 事件
public interface SingleEmitter<T> { void onSuccess(@NonNull T t); void onError(@NonNull Throwable t); void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); boolean tryOnError(@NonNull Throwable t); }
其中, onSuccess 用于发射数据(在 Observable/Flowable 中使用 onNext() 来发射数据),而且只能发射一个数据,后面即使再发射数据也不会做任何处理。
Single 的 SingleObserver 中只有 onSuccess 和 onError ,并没有 onComplete 。这也是Single 与其他4种被观察者之间的最大区别。
Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(SingleEmitter<String> emitter) throws Exception { emitter.onSuccess("test"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "onSuccess-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "onError-> " + throwable.getMessage()); } }); // 执行结果 onSuccess-> test
上面 Observer 中有两个 Consumer, 还可以进一步简化:
Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(SingleEmitter<String> emitter) throws Exception { emitter.onSuccess("test"); } }).subscribe(new BiConsumer<String, Throwable>() { @Override public void accept(String s, Throwable throwable) throws Exception { Log.d(TAG, "onSuccess-> " + s); } });
Single 可以通过 toXXX 方法转换成 Observable、Flowable、Completable、Maybe
Completable 在创建后,不会发射任何数据。从 CompletableEmitter 的源码中可以看到。
public interface CompletableEmitter { void onComplete(); void onError(@NonNull Throwable t); void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); boolean tryOnError(@NonNull Throwable t); }
Completable 只有 onComplete 和 onError 事件,同时 Completable 井没有 map、flatMap 等操作符,它的操作符比起 Observable/Flowable 要少得多
可以通过企fromXXX 操作符来创建一个 Completable
Completable.fromAction(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Hello World!"); } }).subscribe(); // 执行结果 Hello World!
Completable 经常结合 andThen 操作符使用。
Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(CompletableEmitter emitter) throws Exception { try { Log.d(TAG, "--------------"); TimeUnit.SECONDS.sleep(1); emitter.onComplete(); } catch (InterruptedException e) { emitter.onError(e); } } }).andThen(Observable.range(1, 5)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 执行结果 18:19:37.943 -------------- 18:19:38.947 Next-> 1 18:19:38.947 Next-> 2 18:19:38.947 Next-> 3 18:19:38.948 Next-> 4 18:19:38.948 Next-> 5
emitter.onComplete() 执行完成之后,表明 Completable 经执行完毕,接下来执行 andThen 里的操作。
在 Completable 中, andThen 有多个重载的方法,正好对应了5种被观察者的类型。
Completable andThen(CompletableSource next) <T> Observable<T> andThen(ObservableSource<T> next) <T> Maybe<T> andThen(MaybeSource<T> next) <T> Flowable<T> andThen(Publisher<T> next) <T> Single<T> andThen(SingleSource<T> next)
Completable 也可以通过 toXXX 方法转换成 Observable、Flowable、Single 以及 Maybe
Maybe 是 RxJava 2.x 之后才有的新类型,可以看成是 Single 和 Completable 的结合。
Maybe 创建之后, MaybeEmitter 和 SingleEmitter 样,并没有 onNext() 方法,同样需要 onSuccess() 方法来发射数据。
Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(MaybeEmitter<String> emitter) throws Exception { emitter.onSuccess("Hello World!"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Success-> " + s); } }); // 执行结果 Success-> Hello World!
Maybe 也只能发射 0 或者 1 个数据,即使发射多个数据, 后面发射的数据也不会处理
emitter.onSuccess("Hello World!"); emitter.onSuccess("Hello World!2"); // 执行结果仍然是 Success-> Hello World!
如果 MaybeEmitter 先调用了 onComplete(),即使后面再调用 onSuccess(),也不会发射任何数据。
Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(MaybeEmitter<String> emitter) throws Exception { emitter.onComplete(); emitter.onSuccess("Hello World!"); emitter.onSuccess("Hello World!2"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Success-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Complete.
Maybe 在没有数据发射时,subscribe 会调用 MaybeObserver 的 onComplete()。如果 Maybe 有数据发射或者调用了 onError(),则不会执行 MaybeObserver 的 onComplete()。
也可以将 Maybe 转换成 Observable、Flowable、Single,只需相应地调用 toObservable()、toFlowable()、 toSingle()即可。
Subject 既是 Observable ,又是 Observer。官网称可以将 Subject 看作一个桥梁或者代理
Subject 包含 4 种类型,分别是 AsyncSubject、BehaviorSubject、ReplaySubject、PublishSubject
Observer 会接 AsyncSubject 的 onComplete() 之前的最后一个数据
AsyncSubject<String> subject = AsyncSubject.create(); subject.onNext("AsyncSubject#1"); subject.onNext("AsyncSubject#2"); subject.onComplete(); subject.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { // 出现异常才会输出 Log.d(TAG, "Error->" + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); subject.onNext("AsyncSubject#3"); subject.onNext("AsyncSubject#4"); // 执行结果 Next-> AsyncSubject#2 Complete.
修改一下上面代码,将subject.onComplete();放到最后,执行结果
Next-> AsyncSubject#4 Complete.
subject.onComplete() 必须要调用才会开始发送数据,否则观察者将不接收任何数据。
Observer 会先接收到 BehaviorSubject 被订阅之前的最后一个数据,再接收订阅之后发射过来的数据。如果 BehaviorSubject 被订阅之前没有发送任何数据,则会发送一个默认数据。
BehaviorSubject<String> subject = BehaviorSubject.createDefault("BehaviorSubject#1"); // 订阅 subject.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); subject.onNext("BehaviorSubject#2"); subject.onNext("BehaviorSubject#3"); // 执行结果 Next-> BehaviorSubject#1 Next-> BehaviorSubject#2 Next-> BehaviorSubject#3
BehaviorSubject#1 是默认值。
修改一下上面的代码,在订阅之前发送数据 subject.onNext("BehaviorSubject#4"); 执行结果如下:
Next-> BehaviorSubject#4 Next-> BehaviorSubject#2 Next-> BehaviorSubject#3
丢弃了默认值,而发射了 BehaviorSubject#4。因为 BehaviorSubject 每次只会发射调用
subscribe() 方法之前的最后一个事件和调用 subscribe() 方法之后的事件.
BehaviorSubject 还可以缓存最近一次发出信息的数据.
ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的.
ReplaySubject<String> subject = ReplaySubject.create(); subject.onNext("ReplaySubject#1"); subject.onNext("ReplaySubject#2"); // 订阅 subject.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); subject.onNext("ReplaySubject#3"); subject.onNext("ReplaySubject#4"); // 执行结果 Next-> ReplaySubject#1 Next-> ReplaySubject#2 Next-> ReplaySubject#3 Next-> ReplaySubject#4
如果将create() 修改为 createWithSize(1),表示只缓存订阅前最后发送的一条数据。执行结果如下:
Next-> ReplaySubject#2 Next-> ReplaySubject#3 Next-> ReplaySubject#4
这个执行结果与 BehaviorSubject 的相同。但是从并发的角度来看, ReplaySubject 在处理并发subscribe() 和 onNext() 时会更加复杂。
ReplaySubject 除了可以限制缓存数据的数量,还能限制缓存的时间,使用 createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) 即可。
Observer 只接收 PublishSubject 被订阅之后发送的数据
PublishSubject<String> subject = PublishSubject.create(); subject.onNext("PublishSubject#1"); subject.onNext("PublishSubject#2"); subject.onComplete(); // 订阅 subject.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); subject.onNext("PublishSubject#3"); subject.onNext("PublishSubject#4"); // 执行结果 Complete.
因为 subject 在订阅之前己经执行了 onComplete() 方法,所以无法发射数据。修改一下,将 onComplete() 方法放在最后。执行结果如下:
Next-> PublishSubject#3 Next-> PublishSubject#4 Complete.
4 个 Subject 的特性总结:
Subject 作为一个 Observable 时,可以不停地调用 onNext() 来发送事件,直至遇到 onComplete() 才会结束。
PublishSubject<String> subject = PublishSubject.create(); // 订阅 subject.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); subject.onNext("PublishSubject#1"); subject.onNext("PublishSubject#2"); subject.onComplete(); // 执行结果 Next-> PublishSubject#1 Next-> PublishSubject#2 Complete.
如果使用 subscribeOn 操作符将 subject 切换到 I/O 线程,则可以使用 Thread.sleep(2000) 让主线程休眠 2s
PublishSubject<String> subject = PublishSubject.create(); // 订阅 subject.subscribeOn(Schedulers.io()) // 切换到I/O线程 .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); subject.onNext("PublishSubject#1"); subject.onNext("PublishSubject#2"); subject.onComplete(); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } // 执行结果 Complete.
没有打印 PublishSubject#1 和 PublishSubject#2 ???
因为 subject 发射元素的线程被指派到了 I/O 线程,此时 I/O 线程正在初始化还没起来,subject 发射前,PublishSubject#1 和 PublishSubject#2 这两个元素还在主线程中,而在从主线程往 I/O 线程转发的过程中,由于 I/O 线程还没有起来,所以就被丢弃了。此时,无论 Thread 睡了多少秒, PublishSubject#1 和 PublishSubject#2 都不会被打印出来。
其实,解决方法很简单,使用 Observable.create() 来替代 subject ,它允许为每个订阅者精确控制事件的发迭,这样就不会少打印 PublishSubject#1 和 PublishSubject#2
下面的代码实现了一个简化版本的 Android EventBus ,在这里使用了 PublishSubject 。因为事件总线是基于发布/订阅模式实现的,如果某一事件在多个 Activity/Fragment 中被订阅,则在 App 的任意地方一旦发布该事件,多个订阅的地方均能收到这一事件(在这里,订阅事件的 Activity/Fragment 不能被损坏,一旦被损坏就不能收到事件),这很符合 Hot Observable 的特性。所以,我们使用 PublishSubject ,考虑到多钱程的情况,还需要使用 Subject 的 toSerialized() 方法。
public class RxBus { private static class Holder { private static final RxBus BUS = new RxBus(); } public static RxBus get() { return Holder.BUS; } private Subject<Object> mBus; private RxBus() { mBus = PublishSubject.create().toSerialized(); } public boolean hasObservers() { return mBus.hasObservers(); } public Observable<Object> toObservable() { return mBus; } public <T> Observable<T> toObservable(Class<T> clazz) { return mBus.ofType(clazz); } public void post(Object object) { mBus.onNext(object); } }
在这里, Subject 的 toSerialized(), 使用 SerializedSubject 包装了原先的 Subject
public final Subject<T> toSerialized() { if (this instanceof SerializedSubject) { return this; } return new SerializedSubject<T>(this); }
这个版本的 EventBus 较简单,井没有考虑背压的情况,因为在 RxJava 2.x 中, SuSubject 已经不再支持背压了。如果要增加背压的处理,可以使用 Processor,需要将 PublishSubject 改成 PublishProcessor,对应的 Observable 需要改成 Flowable
预加载可以很好地提高程序的用户体验。每当用户处于弱网络时,打开 App 很可能会出现一片空白或者一直在 loading,此时用户一定很烦躁。而如果能够预先加载一些数据,例如上一次打开 App 时保存的数据,那么一定会有效提升 App 的用户体验
下面是借助 BehaviorSubject 的特性来实现一个简单的预加载类 RxPreLoader
public class RxPreLoader<T> { // 能够缓存订阅之前的最新数据 private BehaviorSubject<T> dataSubject; private Disposable disposable; public RxPreLoader(T defaultValue) { dataSubject = BehaviorSubject.createDefault(defaultValue); } // 发送事件 public void publish(T data) { dataSubject.onNext(data); } // 订阅事件 public Disposable subscribe(Consumer<? super T> onNext) { disposable = dataSubject.subscribe(onNext); return disposable; } // 取消订阅 public void disposable() { if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); disposable = null; } } // 获取缓存数据的Subject public BehaviorSubject<T> getCacheDataSubject() { return dataSubject; } // 直接获取最近的一条数据 public T getLastCacheData() { return dataSubject.getValue(); } }
Processor 和 Subject 的作用相同。 Processor 是 RxJava2.0 新增的功能,它是一个接口,继承自 Subscriber 和 Publisher,能够支持背压(Back Pressure)控制。这是 Processor 和 Subject 最大区别。
其实, Publisher 和 Processor 都是 Reactive Streams 的接口, Reactive Streams 项目提供了一个非阻塞异步流处理的背压标准。 RxJava 2.0 己经基于 Reactive Streams 库进行重写 ,包括 Single、Completable ,都是基于 Reactive Streams 规范重写的, Flowable 实现了 Reactive Streams Publisher 接口等。
Reactive Streams 的目标是管制流数据在跨越异步边界进行流数据交换,可以认为是将元素传递到另一个线程或线程池,同时确保在接收端不是被迫缓冲任意数量的数据。换句话说,背压是该模型的重要组成部分,通过设置协调线程之间的队列大小进行限制 。当各异步模型之间采用同步通信时会削弱异步带来的好处,因此必须采取谨慎措施,强制完全无阻塞反应流能在系统的各个方面都做到异步实施。
Reactive Streams 规范的主要目标:
Reactive Streams JVM接口 由以下四个接口组成:
RxJava 2.0 中使用 Processor 来处理背压。同时,在新发布的 Java 9 中也已经引入 Reactive
Streams 的思想,具体来说是构建在 java.util.concurrent.Flow 容器中,包含了四个接口类。
RxJava 的 Subject 是一种特殊的存在,它的灵活性在使用时也会伴随着风险,若是没有用好则可能会错过事件。注意,Subject 不是线程安全的。当然很多开源框架都在使用 Subject,例如大名鼎鼎的 RxLifecycle 就使用了 BehaviorSubject