这是一系列文章的第一篇,这个系列没有什么非常明确的主题,可以看做是日常开发和协作里悟出来的一些技术相关的事情——或者说是技术方面的随笔。
这一篇是关于 RxJava
。
曾经看到这么一篇文章,称赞 Kotlin Coroutine 大好,RxJava 可以直接丢弃了;也曾经看到一篇文章(事实上正是我刚开始学习 RxJava 的时候看的),称 RxJava 的本质就是异步。
如果把这两篇文章放在一起看,那么似乎 “合理”:Kotlin Coroutine 解决了 JVM 世界里过去的 Callback 形式的异步回调,而 RxJava 的本质就是处理异步,那么 RxJava 自然不需要了,用 Kotlin Coroutine 就行了。
这引出我下面要说的一部分:
先放一段大家熟悉的代码:
getUser("id").subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : SingleObserver<UnsplashUser> { override fun onSuccess(user: UnsplashUser) { nameTextView.text = user.name } override fun onSubscribe(d: Disposable) { // do on subscribe } override fun onError(e: Throwable) { e.printStackTrace() } })
getUser()
返回一个 Single<UnsplashUser>
,里面可能是网络请求,又或者是查询数据库,但这不重要,重要的是这两句话:
.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
第一句是把订阅的操作放到 IO 线程里执行,然后在主线程里观察:因此我们能在 onSuccess
方法里给 TextView
的 text
属性赋值。
相信上述代码大家一定在项目里用过,甚至把中间的切换线程的操作放到一个 Transformer<T>
里,实现自己的 Observer 简化一些通用的处理,理所当然地把 RxJava 等同于异步。
但是 RxJava 真的非要异步吗?试想一下,如果在调用的时候,完全没有 subscribeOn
和 observeOn
,那么这样算什么呢?
你可能会回答:
这情况就不该用 RxJava 了。
这可以说对,同时也可以说不对。
首先,RxJava 是一个模式,一个建立在上游发射数据,然后被下游观察的观察者模式,在上游发出数据后,你可以做一些数据转换(Map),可以接到另一个流(FlatMap),可以让他经过你自定义的转换器(Transformer),当然也可以指定订阅和观察的线程(SubscribeOn & ObserveOn),只使用其中一部分也是没问题。
一个合适的例子是 RxBus,跟 EventBus不一样的是,它是基于 RxJava 实现的,是站在 RxJava 的肩膀上的。当然它可以切换线程,但这个不重要:这是一个上游发射数据,下游接收数据然后做对应处理的好例子。
自 RxJava2 开始,就支持了 Flowable
, Single
, Maybe
, Completable
这几个类型,跟 Observable
相比,他们表示的意思更明确:
为什么需要知道这个?我真的需要知道是该用 Observable
还是 Single
吗?
如果所有代码都是你自己写的,那么你可能觉得区分与否都没所谓:一个获取用户信息的网络请求 API,返回 Observable / Flowable / Single 都是一样的,对于前两者,你只会关心 onNext
(而且只期望发生一次),对于后者,你会只关心 onSuccess
,同时错误处理都在 onError
里处理。所以,为什么不直接使用 Single
呢?
如果你是写给别人服务的 API,那么区分这几种类型还是很有必要的:通过返回的类型能简单地告知调用方这个流应该是什么类型,不明确的类型(比如一律是 Observable)可能让调用方在 Observer 处理事件的时候里做一些无用功。
看一下这样的代码:
interface Callback { fun onSuccess(value: Int) fun onError(t: Throwable) } private fun foo(callback: Callback?) { Single.just(0) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : SingleObserver<Int> { override fun onSuccess(t: Int) { callback?.onSuccess(t) } override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { callback?.onError(e) } }) } fun main() { foo(object : Callback { override fun onSuccess(value: Int) { } override fun onError(t: Throwable) { } }) }
万不得已不要跟 Callback 混用。每调用一次就多产生一个对象(这个可能是最无关紧要的理由,毕竟使用 Rx 意味着产生的对象就比较多了),变成 Callback 风格对调用方来说可能 并没有什么好处 ;相反,如果返回的类型是 Rx 的,那么调用方在调用的时候就可以考虑使用 flatMap
的形式跟上游连接起来。
同时,正如第一点说的,Rx 的本质并不是异步,因此同样地 万不得已 不要在方法里面指定调度器(也就是不要使用 subscribeOn)。如果你有观察 Rx 内部的 public 方法,你可以看到基本上都有这么一段注释:
* <dl> * <dt><b>Scheduler:</b></dt> * <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd> * </dl>
这表示内部没有指定调度器执行。
上面之所以说 万不得已 ,是因为有一些 API 需要在特定的线程上处理,比如 delay
方法默认在 Computation 调度器执行的:
* <dt><b>Scheduler:</b></dt> * <dd>{@code interval} operates by default on the {@code computation} {@link Scheduler}.</dd> * </dl>
所以对于写 SDK 的同学来说,请尽可能地把调度器的指定权交给调用方,这样既能减少测试的粒度,又能让 API 设计得更灵活。
一个例子:
fun main() { val uri = Uri.parse("") decodeBitmap(uri) .flatMap { saveToInternalStorage(it) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : SingleObserver<File> { override fun onSuccess(t: File) { } override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } }) } private fun saveToInternalStorage(bm: Bitmap): Single<File> { return Single.create { e -> val file = File(getExternalFilesDir(Environment.DIRECTORY_PICTURES), "${System.nanoTime()}.jpg") val fos = FileOutputStream(file) try { fos.use { bm.compress(Bitmap.CompressFormat.JPEG, 100, it) } e.onSuccess(file) } catch (t: Throwable) { e.onError(t) } } } private fun decodeBitmap(uri: Uri): Single<Bitmap> { return Single.create<Bitmap> { e -> val fd = contentResolver.openFileDescriptor(uri, "r")?.fileDescriptor ?: kotlin.run { e.onError(RuntimeException("Failed to open fd")) return@create } val bm = BitmapFactory.decodeFileDescriptor(fd) ?: kotlin.run { e.onError(RuntimeException("Failed to decode bitmap")) return@create } e.onSuccess(bm) } }
好吧这里并不是真的什么「起死回生」。
有几个方法你可能会忽略,拿 Single 为例子:
这几个方法分别可以在遇到错误的时候(前提是 onError 能执行),接上一个 Single 流,或者直接发射一个 item。如果你的 onError 和 onSuccess 里的处理很类似的话,那么这些方法将可能帮助你简化 Observer 里处理的代码。
同样地,Observable 和其他也有类似的代码,对于 Observable 来说,可以忽略其中一些 item 的错误,然后转化为正确的或者经过特殊装饰的 item 发射出去。
当然,onErrorxxx 里面也是可以抛异常的,如果在里面抛异常了,那么依然能在 onError 里处理这个异常——略有不同的是,这个异常是 CompositeException
,你需要 getCause()
才能获得真正的异常。