博客主页
在 RxJav 1.x 中, Subscription 的接口可以用来取消订阅。
public interface Subscription { /** * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription * was received. * <p> * This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before * onCompleted is called). */ void unsubscribe(); /** * Indicates whether this {@code Subscription} is currently unsubscribed. * * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise */ boolean isUnsubscribed(); }
在 Rx.Java l.x 中, Observable.subscribe() 方法会返回一个 Subscription 的对象。也就是说,
我们每次订阅都会返回 Subscription。Subscription 只需调用 unsubscribe 就可以取消订阅。
Subscription 对象是被观察者和订阅者之间的纽带。 RxJava 使用 Subscription 处理取消订阅时,会停止整个调用链。如果使用了一串很复杂的操作符,则调用 unsubscribe() 将会在它当前执行的地方终止,而不需要做任何额外的工作。
在 Android 开发中,我们会在 Activity/Fragment 的 onDestroy 里做一些释放资源的事情,如果使用了 RxJava l.x ,则可以用 Subscription.isUnsubscribed() 检查 Subscription 是否是 unsubscribed。 如果是 unsubscribed ,则调用 Subscription.unsubscribe(), RxJava 会取消订阅井通知给 Subscriber,井允许垃圾回收机制释放对象,防止任何 RxJava 造成内存泄露。
在 RxJava 2.0 之后, Subscription 被改名为 Disposable。 RxJava 2.x 中由于己经存在名为
org.reactivestreams.Subscription 这个类(遵循 Reactive Streams 标准),为了避免名字冲突,所以将原先的 rx.Subscription 改名为 io.reactivex.disposables.Disposable
public interface Disposable { /** * Dispose the resource, the operation should be idempotent. */ void dispose(); /** * Returns true if this resource has been disposed. * @return true if this resource has been disposed */ boolean isDisposed(); } Disposable disposable = Observable.just("hello, rxjava") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); disposable.dispose();
RxJava 1.x 中有一个复合订阅( composite subscription )的概念。在 RxJava 2.x 中,RxJava
也内置了一个类似复合订阅的容器 CompositeDisposable ,每当我们得到一个 Disposable 时,就
调用 CompositeDisposable.add(),将它添加到容器中,在退出的时候,调用
CompositeDisposable.clear()即可切断所有的事件。在 Android 中,我们经常会看到这样的用法,
也就是会有一个针对 Activity 或者 Fragment 的 CompositeDisposable ,这样就可以在 onDestroy 或者其他合适的地方取消订阅。
在 Android 开发中,可以使用 Disposable 来管理一个订阅或者使用 CompositeDisposable 来管理多个订阅,防止由于没有及时取消,导致 Activity/Fragment 无法销毁而引起内存泄露。然而,也有一些比较成熟的库可以做这些事情。
GitHub 下载地址: https://github.com/trello/RxL...
RxLifecycle 是配合 Activity/Fragment 生命周期来管理订阅的。由于 RxJava Observable 订阅后(调用了 subscribe 函数),一般会在后台线程执行一些操作(比如访问网络请求数据),当后台操作返回后,调用 Observer 的 onNext 等函数,然后再更新 UI 状态。但是后台线程请求是需要时间的,如果用户单击刷新按钮请求新的微博信息,在刷新还没有完成时,如果用户退出了当前界面,返回了前面的界面,而这个时候刷新的 Observable 又未取消订阅,就会导致之前的 Activity 无法被 JVM 回收,从而导致内存泄露。这就是在 Android 开发中值得注意的地方, RxLifecycle 就是专门用来做这件事的。
implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.2' // If you want to bind to Android-specific lifecycles implementation 'com.trello.rxlifecycle2:rxlifecycle-android:2.2.2' // If you want pre-written Activities and Fragments you can subclass as providers implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.2' // If you want pre-written support preference Fragments you can subclass as providers implementation 'com.trello.rxlifecycle2:rxlifecycle-components-preference:2.2.2' // If you want to use Navi for providers implementation 'com.trello.rxlifecycle2:rxlifecycle-navi:2.2.2' // If you want to use Android Lifecycle for providers implementation 'com.trello.rxlifecycle2:rxlifecycle-android-lifecycle:2.2.2' // If you want to use Kotlin syntax implementation 'com.trello.rxlifecycle2:rxlifecycle-kotlin:2.2.2' // If you want to use Kotlin syntax with Android Lifecycle implementation 'com.trello.rxlifecycle2:rxlifecycle-android-lifecycle-kotlin:2.2.2'
在想使用该库的 Activity 或者 Fragment 中,可以继承 RxActivity、RxAppCompatActivity 或者
RxFragment,井添加相应的 import 语句
import com.trello.rxlifecycle2.components.support.RxAppCompatActivity; public class BackPressureAct extends RxAppCompatActivity { }
当涉及绑定 Observable 到 Activity 或者 Fragment 生命周期时,要么指定 Observable 应终止的生命周期事件,要么让 RxLifecycle 库决定何时终止 Observable 序列。
默认情况下,RxLifecycle 将在辅助生命周期事件中终止 Observable, 所以如果在 Activity 的 onCreate() 方法期间订阅了 Observable ,则 RxLifecycle 将在该 Activity 的 onDestroy() 方法中终止 Observable 序列。如果在 Fragment 的 onAttach() 方法中订阅,RxLifecycle 将在 onDetach() 方法中终止该序列。
GitHub 下载地址: https://github.com/uber/AutoD...
AutoDispose 是 Uber 开源的库。 它与 RxLifecycle的区别是,不仅可以在 Android 平台上使用,还可以在 Java(的企业级)平台上使用,适用的范围更加宽广。
现在除 CompositeDisposable 和 RxLifecycle 外,还多了一个选择 AutoDispose。
AutoDispose 支持 Kotlin、Android Architecture Components,井且 AutoDispose 可以与 RxLifecycle 进行相互操作。
Transformer 是转换器的意思。早在 RxJava 1.x 版本就己有了 Observable.Transformer、Single .Transformer 和 Completable.Transformer。在 RxJava 2.x 版本中有 ObservableTransformer、SingleTransformer、 CompletableTransformer、FlowableTransformer、 MaybeTransformer 。其中,FlowableTransformer 和 MaybeTransformer 是新增 。由于 RxJava 2.x 将 Observable 拆分成了Observable 和 Flowable ,所以有了FlowableTransformer 。同样, Maybe 也是 RxJava 2.x 新增的一个类型,所以有 MaybeTransformer
Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,与调用一系列的内联操作符一模一样。
举个简单的例子, 写一个 transformer() 方法将一个发射整数 Observable 转换为发射字符串的 Observable
private static ObservableTransformer<Integer, String> transformer() { return new ObservableTransformer<Integer, String>() { @Override public ObservableSource<String> apply(Observable<Integer> upstream) { return upstream.map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return String.valueOf(integer); } }); } }; }
在使用 transformer() 方法,通过标准 RxJava 操作。
Observable.just(123, 456) .compose(transformer()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 执行结果 Next: 123 Next: 456
compose 操作符能够从数据流中得到原始的被观察者。当创建被观察者时,compose 操作符会立即执行,而不像其他的操作符需要在 onNext() 调用后才能执行
关于 compose 操作符,国内也有相应的翻译: https://www.jianshu.com/p/e9e...
对于网络请求,我们经常会做如下操作来切换线程:
.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
做一个简单的封装:
object RxJavaUtils { @JvmStatic fun <T> observableToMain(): ObservableTransformer<T, T> { return ObservableTransformer { upstream -> upstream .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } } @JvmStatic fun <T> flowableToMain(): FlowableTransformer<T, T> { return FlowableTransformer { upstream -> upstream .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } } }
这段代码是用 Kotlin 写的, 把一些工具类用 Kotlin 来编写,而且 Kotlin 的 Lambda 表达式也更为直观。
对于 Flowable 切换到主线程的操作,可以这样使用
.compose(RxJavaUtils.<String>flowableToMain())
RxLifecycle 能够配合 Android 的生命周期,防止 App 内存泄漏,其中就使用了 LifecycleTransformer。知乎也做了一个类似的 RxLifecycle ,能够做同样的事情。
https://github.com/fengzhizi7...