转载

RxJava学习记录

RxJava是基于JVM实现的Reactive扩展. 它是一种使用Reactor模式实现的, 异步的, 事件驱动的编程方式. 它支持数据/事件序列, 可以使用操作符将序列结合在一起, 而不用关心底层的细节, 比如线程安全, 同步等问题.

Reactor模式

Reactor模式是事件驱动的, 有一个或多个并发的事件输入源, 有一个Service Handler集中收集输入事件, 并分发到相应的Request Handler中进行处理.

Reactor模式的维基百科

+-------+ +-----------------+
|Input|-----+ +---->|Request Handler|
+-------+ | |+-----------------+
 | |
+-------+ |+-----------------+|+-----------------+
|Input|-----+-->|Service Handler|---+---->|Request Handler|
+-------+ |+-----------------+|+-----------------+
 | |
+-------+ | |+-----------------+
|Input|-----+ +---->|Request Handler|
+-------+ +-----------------+

角色

RxJava主要有2个角色: Observable和Observer, 他们之间通过订阅建立联系, Observable发射出数据, 期间通过 操作符 进行处理, 最后被 Observer 得到

  • Observable: 产生数据
  • Observer: 消费数据
  • 操作符: 转换数据
+------------+
| Observable |
+------------+
|
subscribe
|
+------------+
| Observer |
+------------+

RxJava的执行过程

  • 创建一个Observable
  • 使用各种操作符进行变换
  • 创建一个Observer订阅Observable

当Observable发射数据时, Observer进行相应的动作

伪代码如下

Observable
 .operator()
 .operator()
 .operator()
 .operator()
 .subscribe()

Observable和Observer

Observable

Observable是发射数据的来源, 并实现了以下3种方法:

  • onNext()
    Observable调用该方法来发射一条数据
  • onError()
    当遇到错误时, Observable会调用该方法. 调用该方法后便不会再调用onNext和onComplete
  • onComplete()
    当Observable最后一次调用onNext完毕后, 会调用该方法.

Observable的 onNext 可以被调用0次或多次

最后, 会调用 onComplete 或者 onError , 这两者的调用标志着序列结束.

onNext 通常被叫做 发射 , onCompleteonError 通常被叫做 通知

冷Observable, 热Observable, 可连接的Observable

  • 热Observable
    在创建时就开始发射数据, 如果某个observer在发射中途订阅了这个Observable, 那么他接收到的数据也从中途开始
  • 冷Observable
    会等待Observer来订阅它, 然后才开始发射数据
  • 可连接的Observable
    当调用 connect 方法后, 才开始发射数据, 无论是否有Observer对他进行订阅

Observer

Subscriber是Observer的实现. Subscriber会订阅Observable发出的事件, 它有两种基本操作:

  • subscribe
  • unsubscribe

Observable约定

官网

通知

Observable通过以下 通知 与订阅它的Observer进行通信:

  • onNext
    将Observable发射的数据传递给Observer
  • onComplete
    表示Observable成功完成了所有数据的发射, 并将不再发射任何数据
  • onError
    表示Observable由于某种错误被终止, 并将不在发射任何数据
  • onSubscribe (可选)
    表示Observable已经准备好接收Observer的请求通知了

Observer通过以下通知来与Observable进行通信:

  • subscribe
    表示Observer已经准备好从Observable接收通知了
  • unsubscribe
    表示Observer不再希望接收Observable的通知
  • request (可选)
    表示Observer希望不再从Observable接收多余某种特定数量的onNext通知

通知的约束

一个Observable可以创建0个或多个onNext通知, 每个通知代表一个单独发射的数据, 最后跟随一个onCompleted或onError通知, 两者之一. 当发出onCompleted或onError通知后, 它将不再发射任何其他通知.

一个Observable可以不发射任何数据. 一个Observable也可以永远不通过onCompleted和onError来终止. 也就是说, Observable可以不发出任何通知, 或只发出onCompleted或onError通知, 或执法处onNext通知.

Observable必须按顺序向Observer发出通知, 而不能并行发出通知. Observable可以在不同的线程中发出通知, 但通知之间必须存在 happens-before 的关系.

Observable的终止

如果一个Observable没有发出onCompleted或onError通知, 那么Observer会认为该Observable仍然是活动的(即使已经不再发射任何数据), 并且该Observer可能会向其发出通知(比如unsubscribe或是request通知). 当一个Observable发出onCompleted或onError通知时, 该Observable可能会释放他的资源, 并终止, 他的Observer则不应该再与其进行通信.

onError通知必须包含错误的原因 (也就是说, 调用带有null值的onError是无效的)

在Observable终止之前, 它必须向订阅它的Observer发出onCompleted或onError之一

subscribe和unsubscribe

Observable在接收到Observer发出的subscribe通知后, 会开始发出自己的通知

当一个Observer向Observable发出unsubscribe通知时, Observable会尝试停止向该Observer发出通知. 但并不保证这种情况.

当Observable向Observer发出onError或onCompleted通知时, 会终止订阅关系. Observer不再需要想Observable发出unsubscribe通知

多个Observer

如果有第二个Observer订阅了Observable, 而这个Observable此时已经向第一个Observer发射了一些数据, 那么该Observable是否会向第二个Observer继续发射数据, 或者是否将完整的数据序列重新发射给第二个Observer, 或者是否会向第二个Observer发射完全不同的数据, 以上这些都取决于该Observable的设置. 并不会保证订阅同一个Observable的两个Observer会接收到相同的数据序列.

背压(backpressure)

背压是可选的; 并不是所有的RX语言都实现了背压, 并且在实现了背压的RX语言中, 也并不是所有的Observable或操作符会推荐背压. (@todo 不再翻译了)

操作符

大多数的操作符都会返回Observable对象, 所以可以利用这一点进行操作符的链式调用, 完成一系列的操作

创建操作符

用于创建Observable

  • create
    手动创建一个Observable, 手动调用observer的方法
  • defer
    当Observer进行subscribe时, 才创建Observable, 并且是为每个observer都创建一个新的Observable
  • empty/never/throw
    创建有限行为的Observable
  • from
    将其他对象或数据结构转换为Observable
  • interval
    创建一个根据指定时间间隔, 发射整形数字队列的Observable
  • just
    将一个对象或一个对象集合转换为一个Observable, 并将它们发射
  • range
    创建一个发射某个范围内整形数字的Observable
  • repeat
    创建一个重复发射某一个或某个序列数据的Observable
  • start
    创建一个发射某个方法返回值的Observable
  • timer
    创建一个每隔一定时间发射一个数据的Observable

变换操作符

用于将Observable发射的数据进行变换

  • buffer
    间接性收集Observable发射的数据, 将这些数据放入bundle中, 并发射这个bundle
  • flatMap
    将Observable发射的多个数据变换为多个Observable, 然后将他们扁平化, 并放入一个Observable中
  • groupBy
    将一个Observable拆分为多个Observable的集合, 每次发射其中一组Observable, 通过key来结组
  • map
    通过某个函数将一个Observable发射的数据进行变换
  • scan
    对一个Observable发射的每个数据都按顺序应用某个方法, 并将返回值发射
  • window
    间接地从一个Observable中拆分数据, 放入window中, 并从window中一次发射一个数据

过滤操作符

用于从Observable中有选择地发射数据

  • debounce
    只发射Observable中指定间隔之后的数据
  • distinct
    忽略Observable中重复的数据
  • elementAt
    只发射Observable中指定位置的数据
  • filter
    只发射Observable中满足条件的数据
  • first
    只发射Observable中第一个数据, 或第一个满足条件的数据
  • ignoreElements
    不发射数据, 而只通知结束(onError或onComplete)
  • last
    只发射Observable中最后一个数据
  • sample
    定期采样Observable的数据, 并发送距离上次采样时间最近发射的那个数据
  • skip
    忽略Observable的前n个数据
  • skipLast
    忽略Observable的后n个数据
  • take
    发射Observable的前n个数据
  • takeLast
    发射Observable的后n个数据

组合操作符

用于将多个Observable组合为一个单独的Observable

  • and/then/when
    利用Pattern和Plan作为中介, 将多个Observable发射的数据合并到一个Observable
  • combineLatest
    将两个Observable最新发射的数据结合, 通过某个方法进行运算, 并将该方法的结果发射出去
  • join
    将两个Observable发射的, 在同一时间窗口内的数据结合起来
  • merge
    将多个Observable的数据合并为一个Observable
  • startWith
    在发射Observable的数据之前, 先发射指定序列的数据
  • switch
    将发射Observable的多个Observable转换为一个单独的Observable
  • zip
    将多个Observable的发射数据通过一个指定的方法结合在一起, 并将通过该方法结合后的数据发射出去

错误处理操作符

从错误通知中恢复

  • catch
    从onError中恢复, 并继续执行序列
  • retry
    如果Observable发送了一个onError通知, 则重新subscribe这个Observable

Observable工具操作符

工具

  • delay
    延时后发射
  • do
    注册一个action来处理Observable的生命周期事件
  • meterialize/dematerialize
    将onNext, onError, onComplete转换为数据序列, 由Observable发出; 或者反之
  • observeOn
    指定observer观察Observable所在的线程
  • Serialize
    强制序列同步执行
  • subscribe
    对Observable进行发射和通知操作
  • subscribeOn
    指定当Observable被订阅时, 所应使用的线程
  • timeInterval
    将打算发射的数据转换为发射数据的时间
  • timeout
    如果某段时间内没有发射任何数据, 则发出onError通知
  • timestamp
    为Observable发射的每个数据都附加一个时间戳
  • using
    创建一个一次性的资源, 该资源和Observable具有相同的生命周期

条件和布尔操作符

对Observable和数据进行判断

  • all
    判断是否所有发射的数据都符合某个约束
  • amb
    将多个Observable的数据由第一个Observable来发射
  • contains
    判断某个Observable是否发射了指定的数据
  • defaultIfEmpty
    发射Observable的数据, 如果Observable没有数据可以发射时, 发射一个默认数据
  • sequenceEqual
    判断两个Observable发射的数据序列是否相同
  • skipUntil
    放弃一个Observable所发射的数据, 直到另一个Observable开始发射数据
  • skipWhile
    放弃一个Observable所发射的数据, 直到某个指定条件变为false
  • takeUntil
    在一个Observable开始发射或结束发射数据后, 放弃另一个Observable所发射的数据
  • takeWhile
    当某个指定条件变为false时, 放弃一个Observable所发射的数据

数学和汇总操作符

操作数据序列

  • average
    计算一个Observable发射数据的数量的平均值, 并将该平均值发射
  • concat
    拼接多个Observable所发射的数据, 并将所有数据发射
  • count
    计算Observable所发射数据的数量, 并将该数量发射
  • max
    计算并发射Observable所发射的最大的数据
  • min
    计算并发射Observable所发射的最小的数据
  • Reduce
    对Observable所发射的数据按顺序应用一个方法, 并将方法值发射
  • sum
    计算Observable所发射的数据数量总和, 并将该总和值发送

背压操作符

@todo

连接操作符

特殊的Observable, 拥有更多特性

  • connect
    指示一个可连接的Observable开始发射数据到它的subscriber
  • publish
    将原始Observable转换为一个可连接的Observable
  • refCount
    使一个可连接的Observable表现的和原始Observable一样
  • replay
    确保所有Observable发射数据序列的顺序是相同的, 即使当subscribe时Observable已经开始发射数据

转换操作符

  • to
    将一个Observable转换为另一个对象或数据结构

异步操作符

用于将同步方法转换为Observable

  • start()
  • toAsync()
    将一个 方法 转换为 Observable 来执行方法并发射返回值
  • asyncAction()
  • asyncFunc()
  • startFuture()
    将一个 返回值为Future的方法 转换为一个 Observable , 并发射Future的返回值
  • deferFuture()
    将一个 返回值为Observable的Future 转换为一个 Observable , 当有Subscriber订阅时, 才返回Future的Observable返回值(有点绕)
  • forEachFuture()
    将Subscriber方法转入一个Observable中, 直到complete时才执行
  • fromAction()
    将一个Action转换为一个Observable, 当Subscriber订阅时, 执行该动作并发射结果
  • fromCallable()
    将一个Callable转换为一个 Observable , 当Subscriber订阅时, 执行callable并发射其结果或异常
  • fromRunnable()
    将一个 Runnable 转换为一个 Observable , 当Subscriber订阅时, 执行runnable并发射其结果
  • runAsync()
    返回一个 StoppableObservable , 它可以发射某个Action或Scheduler指定的多个action

操作符的选择树

以下可以帮助你选择合适的操作符:

我想创建一个新的Observable
|_ 它只发射一个数据: `just`
| |_ 该数据是在subscribe时, 由一个方法返回的: `start`
| |_ 该数据是在subscribe时, 由Action, Callable, Runnable或类似的返回的: `from`
| |_ 该数据在某段时间后发射: `timer`
|_ 它是从某个Array, Iterrable或类似的发射数据的: `from`
|_ 它是从一个Future获取到的: `start`
|_ 它是从一个Future获取的数据序列: `from`
|_ 它会重复发射数据序列: `repeat`
|_ 它是从某个自定义逻辑创建的: `create`
|_ 它为每个subscribe的Observer都创建一个新的Observable: `defer`
|_ 它发射一个整形数字序列: `range`
| |_ 该序列会根据某种时间间隔发射: `interval`
| |_ 并且会在某段延时后才开始发射: `timer`
|_ 它不发射任何数据就会结束: `empty`
|_ 它什么都不做: `never`
我想通过组合多个Observable来创建一个新的Observable
 |_ 它会发射所有的Observable中的数据, 顺序按照数据默认的顺序: `merge`
 |_ 它会发射所有的Observable中的数据, 一次只发射一个Observable的数据: `concat`
 |_ 它会将多个Observable的数据按顺序组合, 生成一个新的数据来发射
 | |_ 并且将每个Observable发射的数据通过某个方法结合, 发射该方法处理后的数据: `zip`
 | |_ 并且将每个Observable最新发射的数据结合为一个数据进行发射: `combinLatest`
 | |_ 并且将同一window内的数据结合为一个数据进行发射
 | |_ 通过Pattern和Plan中介来发射: `and/then/when`
 |_ 它是从这些Observable最近发射的数据中发射数据: `switch`
我想将Observable的数据进行变换后再发射
|_ 变换的方式是通过某个方法一次发射一个数据: `map`
|_ 变换的方式是发射多个Observable中的所有数据: `flatMap`
| |_ 按照发射时间顺序, 一次发射一个Observable: `concatMap`
|_ based on all of the items that preceded them: `scan`
|_ 变换的方式是为每个数据附加一个时间戳: `timestamp`
|_ 变换的方式是将数据转变为距离上次发射的时间间隔: `timeInterval`
我想延长数据发射时间: `delay`
我想将数据和通知都转换为该Observable的数据, 并重新进行发射
|_ 通过将他们封装在Notification对象中: `materialize`
|_ 并且我还可以再次解除封装: `dematerialize`
我想忽略Observable的所有数据, 只发射complete和error通知: `ignoreElements`
我想复制一个Observable, 并在它的数据序列前添加其他的数据序列: `startWith`
|_ 并且仅在该Observable数据序列为空的情况下才添加其他数据序列: `defaultEmpty`
我想从一个Observable中收集数据, 并通过一个数据的缓冲重新发射: `buffer`
|_ 该缓冲只包含最后一个数据: `takeLastBuffer`
我想将一个Observable拆分为多个Observable: `window`
|_ 并且相似的数据可以在同一个Observable中: `groupBy`
我想从一个Observable发射的数据中获取某个特定的数据
|_ 要获取的是在complete前发射的最后一个数据: `last`
|_ 要获取的是一个单独的数据: `single`
|_ 要获取的是第一个发射的数据: `first`
我想重新发射一个Observable中的某些数据
|_ 只要满足过滤条件的数据: `filter`
|_ 只要第一个数据: `first`
|_ 只要前n个数据: `take`
|_ 只要最后一个数据: `last`
|_ 只要第n个数据: `elementAt`
 |_ 只要前几个数据之后的数据
| |_ 即, 跳过前n个数据: `skip`
| |_ 即, 直到某个数据满足某种特定条件之后所发射的数据: `skipWhile`
| |_ 即, 在开始发射某段时间后的数据: `skip`
| |_ 即, 在另一个Observable开始发射数据之后, 原Observable所发射的数据: `skipUtil`
 |_ 只要除最后几个数据之外的数据
| |_ 即, 除最后n个数据之外的数据: `skipLast`
| |_ 即, 在某个数据满足某种特定条件之前所发射的数据: `takeWhile`
| |_ 即, 在complete某段时间之前所发射的数据: `skipLast`
| |_ 即, 在另一个Observable开始发射数据之前, 原Observable所发射的数据: `takeUtil`
|_ 只要间歇采样的数据: `sample`
|_ 只要某段时间内不会再有数据发射的数据: `debounce`
|_ 只要与已发射数据不重复的数据: `distinct`
| |_ if they immediately follow the item they are duplicates of: `distinctUntilChanged`
|_ 只要在Observable开始发射数据后, 我的subscriber进行subscribe以后的数据: `delaySubscription`
如果某个Observable是一个Observable集合的第一个, 则重新发射他的数据: `amb`
我想对Observable发射的数据序列进行判断
|_ 判断是否所有数据都满足条件, 然后发射一个单独的boolean值: `all`
|_ 判断是否其中某个数据满足条件, 然后发射一个单独的boolena值: `contains`
|_ 判断是否Observable没有发射任何数据, 然后发射一个单独的boolean值: `isEmpty`
|_ 判断是否该Observable的数据序列和另一个Observable的数据序列一样, 然后发射一个单独的boolean值: `sequenceEqual`
|_ 发射所有数据的平均值: `average`
|_ 发射所有数据的总和: `sum`
|_ 发射数据的个数: `count`
|_ 发射数据序列中的最大值: `max`
|_ 发射数据序列中的最小值: `min`
|_ 通过对每个数据应用一个方法, 并发射该方法的结果: `scan`
我想将Observable发射的整个数据序列转换为另一种数据结构: `to`
我想控制操作符进行操作所在的线程: `subscribeOn`
|_ 想控制通知Observer的线程: `observeOn`
我想创建一个在某种事件发生后, 可以激活某个特定的action的Observable: `do`
我想创建一个可以通知Observer发生错误的Observable: `throw`
|_ 如果在某段时间内没有发射任何数据, 则通知错误: `timeout`
我想创建一个可以从错误中恢复的Observable
|_ 它可以通过转换到一个备份Observable来从超时中恢复: `timeout`
|_ 它可以从上游错误通知中恢复: `catch`
|_ 通过尝试重新subscribe上游的Observable: `retry`
我想创建一个与Observable有相同生命周期的对象: `using`
我想subscribe一个Observable, 并一直阻塞, 直到该Observable完成时, 接收一个Future: `start`
我想创建一个Observable, 在subscribe时并不发射数据, 直到我要求它才发射: `publish`
|_ 它只发射最后一个数据: `publishLast`
|_ 它发射全部数据序列, 无论在序列发射后是否有其他进行subscribe: `replay`
|_ 当所有subscriber都取消subscribe时, 我要放弃发射: `refCount`
|_ 我要要求它开始发射: `connect`

Scheduler线程切换

如果你想在操作符中引入多线程, 你可以使用 Schedulers

默认情况下, Observable和操作链会在调用 subscribe 方法的线程中进行操作和发出通知. subscribeOn 操作符可以指定操作Observable的具体线程. observeOn 操作符可以指定Observable向Observer发出通知的线程

  • observeOn
    • 指定Observable向Observer发出通知的线程
    • 可以在操作链多次调用
    • 作用范围: 从本次observeOn调用开始, 到下次observeOn操作符结束.
  • subscribeOn
    • 指定操作Observable的具体线程
    • 可以在操作链中多次调用
    • 作用范围: 从 创建操作符 或 doOnSubscribe操作符 开始, 到下次subscribeOn操作符结束

注意: subscribe中的通知回调方法是 observeOn 指定的线程, 而不是subscribeOn指定的线程

原文  http://blog.lixplor.com/2016/10/16/rxjava/
正文到此结束
Loading...