诞生 5 年之久的 RxJava,已经不只是一个开源库,可以说它的诞生 改变了我们写代码的方式 ,把它比作「神兵利器」也毫不为过。我们现在已经能看到各式各样名为「最佳实践」的使用教程,如果我们没能用好这把利器,不仅不会发挥它的作用,反而会伤着我们自己。
回顾它的诞生原因,是为了解决 回调地狱 (callback hell) 以及 麻烦的线程切换 。在 Android 开发中,哪个地方最会出现 多层的回调嵌套 以及 频繁的线程切换 呢?对!没错!是「网络请求」。所以 RxJava、Retrofit 这俩兄弟总会一起出现的,我们项目中关于 RxJava 的使用,也几乎都和网络请求相关。
最初我们对 RxJava + Retrofit 的使用经验都是来源于 RxJava 与 Retrofit 结合的最佳实践 这篇文章,相信大家都看过。这篇文章中的基本封装思想是: 订阅每个网络请求的流,将流的订阅结果再通过回调的方式返给流(也就是网络请求)的创建者。 如下所示:
//HttpMethod public void getTopMovie(final ResultListener listener, int start, int count){ movieService.getTopMovie(start, count) .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber(){ @Override public void onStart() {} @Override public void onNext(Subject t) { listener.onNext(t) } @Override public void onError(Throwable e) {} @Override public void onCompleted() {} }) } //Activity HttpMethods.getInstance().getTopMovie(new ResultListener(){ @Override public void onNext(Subject t){ //handle result } }, 0, 10)
这种封装方式,对于初步的使用以及简单的项目,是没有问题的。但是遇到复杂一点的网络请求,它的扩展性就不那么灵活了:
subscribe
之前通过 flatMap
发起第二个或者第三个网络请求。这种写法肯定会影响项目中已有的外部调用。 onNext
中发起第二个请求,再在第二个网络请求的 onNext
中发起第三个网络请求……这一层又一层的回调嵌套,正是用 RxJava 所能解决、避免的这样写,我们就又回到了最初的原点。 RxJava 提供给我们的、我们所中意的强大之处在哪?在于它的「操作符」, map
、 flatMap
、 zip
等等,甚至线程的切换 subscribeOn
、 observeOn
也是操作符。RxJava 的各种强大的功能就是通过各式各样的「操作符」实现的。
操作符操作的是什么?流。流( Observable
、 Flowable
)是 RxJava 的基本单位。所以一套链式请求拆开应该是这样的:
所以说,网络请求库对外提供网络请求的结果应该是以「流」的形式进行提供:
当生产者大于消费者,
Flowable
天然支持背压。所以
Flowable
这个万金油,不管三七二十一,直接拿来用是没有问题的。
但是, 网络请求,会产生背压问题吗?不会,为了防止抬杠,可以说大部分情况下是不会的。 网络请求的每一个流,即用即走,上游的生产者( Request
)和下游的消费者( Responese
),永远是一对一的关系,不会出现连续的事件流。杀鸡焉用牛刀,所以我们可以退一步,改用 Observable
。
网络请求不会出现连续的事件流,在 onNext
出现之后, onComplete
马上就会被调用,所以只需要这两者中的一个就够了,也就不用考虑 Observable
,同样 Maybe
也是可以排除的。
剩下的也就只有 Single
和 Completable
了,相对于 Single
, Completable
没有 map
和 flatMap
方法。所以需要进一步处理网络请求结果的我们,可以选择使用 Single
。
网络请求过程,协议层的异常会自动抛至 onError()
,如 404、503 错误。对于如下 有请求结果但无目标请求数据 ,我们也应当作为异常来处理:
{ "code": "6002", "msg": "公钥为空" }
毕竟这样的请求结果,是后端经过异常处理返回给我们的。
假定我们的请求结果是这样的范式:
data CommonResult<T>( var code: Int = 0, var data: T? = null, var message: String? = null )
我们活用 RxJava 的操作符,用 map
来处理请求到的 ResponseBody
(这也是前面选择 Single 的原因),为了便于复用,可以定义一个这样的 mapper:
class CommonResultMapper<T> : Function<CommonResult<T>, T> { override fun apply(t: CommonResult<T>): T { val data = t.data if (t.code == SUCCESS_CODE && data != null) { return data } else { //抛出异常 throw Throwable("请求 $t 失败") } } }
使用这个定义好的 mapper:
@GET(PUSH_URL) fun fetchTag(@Query("udid") udid: String): Single<CommonResult<Tag>> fun fetchTagResult(udid: String): Single<Tag> { return netService.fetchTag(udid) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(CommonResultMapper()) }
如果你愿意,你还可以将 线程切换 和 数据处理 结合在一起,使用 RxJava 的 Transformer
//定义一个 transformer fun <T> resultTransformer(): SingleTransformer<CommonResult<T>, T> { return SingleTransformer { single -> single.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(CommonResultMapper()) } } //使用 fun fetchTagResult(udid: String): Single<Tag> { return netService.fetchTag(udid) .compose(resultTransformer()) }
使用这样封装,结果归结果,异常归异常。
fetchTagResult("123321") .subscribeBy( onSuccess = { tag -> //结果 }, onError = { e -> //异常 } )
回顾上图中的 对内封装 和 对外可见 ,在得到真正想要的网络请求结果之前,需要一直保持 对内封装 的状态。因此,如果需要同时或者按顺序发起多个网络请求,那么就应该在 对内封装 中进行操作,例如可以使用 flatMap
按顺序发起第二个网络请求:
fun fetchUserSingle(tag: Tag): Single<User> { return netService.fetchUser(tag) } fun fetchUserResult(udid: String): Single<User> { return netService.fetchTag(udid) .compose(resultTransformer()) .flatMap{ tag -> //使用第一个请求的结果作为第二个请求的参数 return@flatMap fetchUserSingle(tag) } }
无论如何,善用 操作符 ,我们的代码总会是「链式」的。
最后还需要关注一下这里的内存泄漏问题,在 Activity
销毁时,要及时取消掉这些已经失去上下文意义的网络请求。这里我们及时 unsubscribe
就好了。
同时在管理生命周期方面,也有更成熟的方案: RxLifecycle 。
以上
关注公众号,了解更多:point_down: