转载

More of RxJava

这是一系列文章的第一篇,这个系列没有什么非常明确的主题,可以看做是日常开发和协作里悟出来的一些技术相关的事情——或者说是技术方面的随笔。

这一篇是关于 RxJava

曾经看到这么一篇文章,称赞 Kotlin Coroutine 大好,RxJava 可以直接丢弃了;也曾经看到一篇文章(事实上正是我刚开始学习 RxJava 的时候看的),称 RxJava 的本质就是异步。

如果把这两篇文章放在一起看,那么似乎 “合理”:Kotlin Coroutine 解决了 JVM 世界里过去的 Callback 形式的异步回调,而 RxJava 的本质就是处理异步,那么 RxJava 自然不需要了,用 Kotlin Coroutine 就行了。

上面这段话的表述是有问题的。

这引出我下面要说的一部分:

RxJava 不只是异步

先放一段大家熟悉的代码:

getUser("id").subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : SingleObserver<UnsplashUser> {
            override fun onSuccess(user: UnsplashUser) {
                nameTextView.text = user.name
            }
            override fun onSubscribe(d: Disposable) {
                // do on subscribe
            }
            override fun onError(e: Throwable) {
                e.printStackTrace()
            }
        })

getUser() 返回一个 Single<UnsplashUser> ,里面可能是网络请求,又或者是查询数据库,但这不重要,重要的是这两句话:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

第一句是把订阅的操作放到 IO 线程里执行,然后在主线程里观察:因此我们能在 onSuccess 方法里给 TextViewtext 属性赋值。

相信上述代码大家一定在项目里用过,甚至把中间的切换线程的操作放到一个 Transformer<T> 里,实现自己的 Observer 简化一些通用的处理,理所当然地把 RxJava 等同于异步。

但是 RxJava 真的非要异步吗?试想一下,如果在调用的时候,完全没有 subscribeOnobserveOn ,那么这样算什么呢?

你可能会回答:

这情况就不该用 RxJava 了。

这可以说对,同时也可以说不对。

首先,RxJava 是一个模式,一个建立在上游发射数据,然后被下游观察的观察者模式,在上游发出数据后,你可以做一些数据转换(Map),可以接到另一个流(FlatMap),可以让他经过你自定义的转换器(Transformer),当然也可以指定订阅和观察的线程(SubscribeOn & ObserveOn),只使用其中一部分也是没问题。

一个合适的例子是 RxBus,跟 EventBus不一样的是,它是基于 RxJava 实现的,是站在 RxJava 的肩膀上的。当然它可以切换线程,但这个不重要:这是一个上游发射数据,下游接收数据然后做对应处理的好例子。

Rx 的世界,不只有 Observable

自 RxJava2 开始,就支持了 Flowable , Single , Maybe , Completable 这几个类型,跟 Observable 相比,他们表示的意思更明确:

  • Flowable:支持背压的 Observable
  • Single:成功发射一个 item 或者失败
  • Maybe:可能发射一个 item,或者直接完成,或者失败
  • Completable:完成或者失败

为什么需要知道这个?我真的需要知道是该用 Observable 还是 Single 吗?

如果所有代码都是你自己写的,那么你可能觉得区分与否都没所谓:一个获取用户信息的网络请求 API,返回 Observable / Flowable / Single 都是一样的,对于前两者,你只会关心 onNext (而且只期望发生一次),对于后者,你会只关心 onSuccess ,同时错误处理都在 onError 里处理。所以,为什么不直接使用 Single 呢?

如果你是写给别人服务的 API,那么区分这几种类型还是很有必要的:通过返回的类型能简单地告知调用方这个流应该是什么类型,不明确的类型(比如一律是 Observable)可能让调用方在 Observer 处理事件的时候里做一些无用功。

不要跟 Callback 混用

看一下这样的代码:

interface Callback {
    fun onSuccess(value: Int)
    fun onError(t: Throwable)
}

private fun foo(callback: Callback?) {
    Single.just(0)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : SingleObserver<Int> {
            override fun onSuccess(t: Int) {
                callback?.onSuccess(t)
            }
            override fun onSubscribe(d: Disposable) {
            }
            override fun onError(e: Throwable) {
                callback?.onError(e)
            }
        })
}

fun main() {
    foo(object : Callback {
        override fun onSuccess(value: Int) {
        }
        override fun onError(t: Throwable) {
        }
    })
}

万不得已不要跟 Callback  混用。每调用一次就多产生一个对象(这个可能是最无关紧要的理由,毕竟使用 Rx 意味着产生的对象就比较多了),变成 Callback 风格对调用方来说可能 并没有什么好处 ;相反,如果返回的类型是 Rx 的,那么调用方在调用的时候就可以考虑使用 flatMap 的形式跟上游连接起来。

同时,正如第一点说的,Rx 的本质并不是异步,因此同样地 万不得已 不要在方法里面指定调度器(也就是不要使用 subscribeOn)。如果你有观察 Rx 内部的 public 方法,你可以看到基本上都有这么一段注释:

* <dl>
*  <dt><b>Scheduler:</b></dt>
*  <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>

这表示内部没有指定调度器执行。

上面之所以说 万不得已 ,是因为有一些 API 需要在特定的线程上处理,比如 delay 方法默认在 Computation 调度器执行的:

*  <dt><b>Scheduler:</b></dt>
*  <dd>{@code interval} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>

所以对于写 SDK 的同学来说,请尽可能地把调度器的指定权交给调用方,这样既能减少测试的粒度,又能让 API 设计得更灵活。

一个例子:

fun main() {
    val uri = Uri.parse("")
    decodeBitmap(uri)
        .flatMap {
            saveToInternalStorage(it)
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : SingleObserver<File> {
            override fun onSuccess(t: File) {
            }
            override fun onSubscribe(d: Disposable) {
            }
            override fun onError(e: Throwable) {
            }
        })
}
private fun saveToInternalStorage(bm: Bitmap): Single<File> {
    return Single.create { e ->
        val file = File(getExternalFilesDir(Environment.DIRECTORY_PICTURES), "${System.nanoTime()}.jpg")
        val fos = FileOutputStream(file)
        try {
            fos.use {
                bm.compress(Bitmap.CompressFormat.JPEG, 100, it)
            }
            e.onSuccess(file)
        } catch (t: Throwable) {
            e.onError(t)
        }
    }
}
private fun decodeBitmap(uri: Uri): Single<Bitmap> {
    return Single.create<Bitmap> { e ->
        val fd = contentResolver.openFileDescriptor(uri, "r")?.fileDescriptor ?: kotlin.run {
            e.onError(RuntimeException("Failed to open fd"))
            return@create
        }
        val bm = BitmapFactory.decodeFileDescriptor(fd) ?: kotlin.run {
            e.onError(RuntimeException("Failed to decode bitmap"))
            return@create
        }
        e.onSuccess(bm)
    }
}

善用「起死回生」

好吧这里并不是真的什么「起死回生」。

有几个方法你可能会忽略,拿 Single 为例子:

More of RxJava

这几个方法分别可以在遇到错误的时候(前提是 onError 能执行),接上一个 Single 流,或者直接发射一个 item。如果你的 onError 和 onSuccess 里的处理很类似的话,那么这些方法将可能帮助你简化 Observer 里处理的代码。

同样地,Observable 和其他也有类似的代码,对于 Observable 来说,可以忽略其中一些 item 的错误,然后转化为正确的或者经过特殊装饰的 item 发射出去。

当然,onErrorxxx 里面也是可以抛异常的,如果在里面抛异常了,那么依然能在 onError 里处理这个异常——略有不同的是,这个异常是 CompositeException ,你需要 getCause() 才能获得真正的异常。

原文  http://juniperphoton.dev/more-of-rxjava/
正文到此结束
Loading...