RxJava
可能是我使用的最重要的库,Rx 通常是编写代码的另一种范式, Kotlin
作为一种新的编程语言,使它可以轻松实现将协程驱动的 flow 实现为自己的 Rx 实现。 我可能在 Hello Kotlin Coroutines 中介绍了协程,这对于理解 flow 很有必要
Kotlin
具有一组扩展,以方便使用集合。 但它不是响应式的
listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi") .map { it.length } .filter { it > 4 } .forEach { println(it) } 复制代码
在此示例中,如果您深入研究 map 函数源代码,您将发现这里没有魔法,它只是列表的循环,进行了一些转换然后为您提供了一个新列表。 过滤器也一样。 这种机制称为 eager evaluation
,该函数在整个列表中进行操作并提供一个新列表。 但是如果我们不需要创建这些临时列表以节省一些内存,那我们可以使用 Sequences
listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi") // 使用 Sequence .asSequence() .map { it.length } .filter { it > 4 } .forEach { println(it) } 复制代码
这里的区别就是先调用 asSequence
方法,然后使用我们的操作,再次 查看 map 方法后,我们发现了一些不同之处,它只是 sequence 的修饰符,返回值类型也是 sequence
。 使用 sequence map
时,只能一项一项地进行操作。列表较大时, sequence
比普通集合要好得多。 sequence
可以同步完成其工作,有没有办法异步使用那些转换运算符呢?答案是 flow
如果我们尝试获取列表并将其用作 flow ,并在流的末尾调用 collect {..}
,则会收到编译错误。 由于 flow 是基于协程构建的,因此默认情况下它具有异步功能,因此您可以在代码中使用协程时使用它
collect {…}
运算符,您可以将其想像为 Rxjava
中的 subscribe
流也是 cold stream
,这意味着,直到您调用操作符(如 collect)后,flow 才会被执行。 如果您重复调用 collect ,每次您将获得相同的结果
因此,Collections 扩展功能仅适用于小数据, sequence
可以节省您不必要的工作(不创建临时列表),而使用 flow,您可以用协程的强大功能来编写代码。 因此,让我们学习如何构建它
我们看到 asFlow
方法,它是 Collections 上的扩展函数,可将其转换为 flow,我们查看一下源码
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow { forEach { value -> emit(value) } } 复制代码
如果我们要编写前面的示例在数据源中添加一些逻辑,则只需使用 flow{…}
或者 flowof()
flow 拥有一些列的用于转换的运算符,例如 map
, filter
, groupBy
, scan
等等
在由 Coroutines 提供支持的 flow
中,您可以自然地在您的操作符中使用异步代码,假设我们想要做一些耗时的操作,这里使用延迟一秒钟表示。 使用 RxJava
时,您可以使用 flatmap
这里想表达的是 flow 具有更简单的设计,并且与以其陡峭的学习曲线而闻名的 RxJava
相比易于学习,我在此使用 flow 将它简化一下
我已经提到 collect()
是 terminal operator,当您在调用它时得到结果,在 RxJava
中,您可以通过调用 subscribe()
来启动它,或者使用阻塞的方式,调用 blockingGet
flow 中的 terminal operator 是需要作用域操作的挂起函数,其他的 operator 例如
toList(),toSet -> 返回集合中的所有 item
first() -> 仅返回第一个发射
reduce(),fold() -> 使用特定操作获取结果
为了发射数据,您需要使用一个挂起函数
//fire a coroutine someScope.launch { //fire flow with a terminal operator flowSampleData().collect { } } 复制代码
上面的花括号让人想起了回调,您可以使用 launchIn
函数,处理结果可以使用 onEach{...}
flowSampleData() .onEach { //handle emissions } .launchIn(someScope) 复制代码
每次设置 RxJava
订阅时,我们都必须取消这些订阅以避免内存泄漏或过期的任务在后台运行, RxJava
提供对订阅的引用( disposable
)来取消订阅, disposable().dispose()
。如果您在 CompositeDisposable
使用了多个对象,则调用 clear()
或 dispose()
对于 flow 使用特定 scope 的协程则可以无需进行额外的工作来达到此目的
RxJava
最有用的功能之一就是处理错误的方式,您可以使用此 onError()
函数捕获工作流中的任何错误。 flow 有一个类似的称为 catch {…}
,如果不使用 catch {…}
,则您的代码可能会引发异常或应用崩溃。 您就可以选择使用常规 try catch 或使用 atch {…} 以声明方式进行编码
让我们模拟一个错误
private fun flowOfAnimeCharacters() = flow { emit("Madara") emit("Kakashi") // 抛出异常 throw IllegalStateException() emit("Jiraya") emit("Itachi") emit("Naruto") } 复制代码
使用
runBlocking { flowOfAnimeCharacters() .map { stringToLength(it) } .filter { it > 4 } .collect { println(it) } } 复制代码
如果我们运行此代码,它将引发异常,并且如我们所说,您有两个选项可以处理错误,即常规 try-catch
和 catch {…}
。 这是两种情况下的修改代码
// 使用 try-catch runBlocking { try { flowOfAnimeCharacters() .map { stringToLength(it) } .filter { it > 4 } .collect { println(it) } } catch (e: Exception) { println(e.stackTrace) } finally { println("Beat it") } } 复制代码
// 使用 catch{} runBlocking { flowOfAnimeCharacters() .map { stringToLength(it) } .filter { it > 4 } // catch .catch { println(it) } .collect { println(it) } } 复制代码
使用 catch{}
需要注意的是 catch{}
操作符的放置顺序,它要放置在 terminal operator 之前,这样您才可以捕获想要的异常
如果错误中断了流,并且我们打算使用完整备份或默认数据恢复流,在 Rxjava
中使用 onErrorResumeNext()
或 onErrorReturn()
,在 flow 中,我们还是使用 catch {…}
,但我们在其中调用了 emit()
来逐个生成备份,甚至我们可以使用 emitAll()
引入一个全新的 flow,例如如果中途出现了异常,我们需要“ Minato” 和 “ Hashirama”
runBlocking { flowOfAnimeCharacters() .catch { emitAll(flowOf("Minato", "Hashirama")) } .collect { println(it) } } 复制代码
那么得到的结果是
Madara Kakashi Minato Hashirama 复制代码
默认情况下,flow 数据源将在调用者上下文中运行,如果要更改它,例如,要使 flow 在 IO 而不是 Main 上运行,则使用 flowOn()
,并更改上游的上下文,上游是调用 flowOn
之前的全部操作符。 这是一个很好的文档示例
这里的 flowOn()
充当 RxJava
中的两个角色 [subscribeOn() — observeOn()]
,您可以编写流然后确定将在哪个上下文中进行操作
当 flow 完成发射时,您可能需要执行一些操作, onCompletion {…}
可以解决这一问题,并且它确定 flow 是正常完成还是异常完成
已知数据源如下
private fun flowOfAnimeCharacters() = flow { emit("Madara") emit("Kakashi") throw IllegalStateException() emit("Jiraya") emit("Itachi") emit("Naruto") } 复制代码
catch {…}
的工作就是捕获 IllegalStateException()
并重新开始新流程,这使我们从源头上留下“ Madara”,“ Kakashi”,在后面留下“ Minato”,“ Hashirama”。 但是 onCompletion {…}
会显示错误吗?
答案是否定的,catch 捕获了所有错误,接下来是全新的事情,请记住 onCompletion {…}
和 catch {…}
只是中介程序运算符。 它们的顺序很重要
您可以使用 Flow builders 构建 flow,其中最基本的是 flow{…}
。 如果要开始该 flow,请调用诸如 collect {…}
之类的 terminal operator ,并且由于 terminal operator 是挂起函数,因此需要使用协程构建器 launch {…}
的作用域,或者如果您想要以优雅的风格进行操作, 您可以结合使用 launchIn()
和 onEach {…}
。 使用 catch {…}
捕获上游错误,并根据需要提供回退流程。 onCompletion {..}
将在上游完成所有发射之后或发生错误时触发。 默认情况下,所有这些方法都适用于调用程序协程上下文,如果要更改上游上下文,请使用 flowOn()