RxJava是基于JVM实现的Reactive扩展. 它是一种使用Reactor模式实现的, 异步的, 事件驱动的编程方式. 它支持数据/事件序列, 可以使用操作符将序列结合在一起, 而不用关心底层的细节, 比如线程安全, 同步等问题.
Reactor模式是事件驱动的, 有一个或多个并发的事件输入源, 有一个Service Handler集中收集输入事件, 并分发到相应的Request Handler中进行处理.
Reactor模式的维基百科
+-------+ +-----------------+ |Input|-----+ +---->|Request Handler| +-------+ | |+-----------------+ | | +-------+ |+-----------------+|+-----------------+ |Input|-----+-->|Service Handler|---+---->|Request Handler| +-------+ |+-----------------+|+-----------------+ | | +-------+ | |+-----------------+ |Input|-----+ +---->|Request Handler| +-------+ +-----------------+
RxJava主要有2个角色: Observable和Observer, 他们之间通过订阅建立联系, Observable发射出数据, 期间通过 操作符
进行处理, 最后被 Observer
得到
+------------+ | Observable | +------------+ | subscribe | +------------+ | Observer | +------------+
当Observable发射数据时, Observer进行相应的动作
伪代码如下
Observable .operator() .operator() .operator() .operator() .subscribe()
Observable是发射数据的来源, 并实现了以下3种方法:
onNext()
onError()
onComplete()
Observable的 onNext
可以被调用0次或多次
最后, 会调用 onComplete
或者 onError
, 这两者的调用标志着序列结束.
onNext
通常被叫做 发射
, onComplete
和 onError
通常被叫做 通知
热Observable
冷Observable
可连接的Observable
connect
方法后, 才开始发射数据, 无论是否有Observer对他进行订阅 Subscriber是Observer的实现. Subscriber会订阅Observable发出的事件, 它有两种基本操作:
subscribe
unsubscribe
官网
Observable通过以下 通知 与订阅它的Observer进行通信:
onNext
onComplete
onError
onSubscribe
(可选) Observer通过以下通知来与Observable进行通信:
subscribe
unsubscribe
request
(可选) 一个Observable可以创建0个或多个onNext通知, 每个通知代表一个单独发射的数据, 最后跟随一个onCompleted或onError通知, 两者之一. 当发出onCompleted或onError通知后, 它将不再发射任何其他通知.
一个Observable可以不发射任何数据. 一个Observable也可以永远不通过onCompleted和onError来终止. 也就是说, Observable可以不发出任何通知, 或只发出onCompleted或onError通知, 或执法处onNext通知.
Observable必须按顺序向Observer发出通知, 而不能并行发出通知. Observable可以在不同的线程中发出通知, 但通知之间必须存在 happens-before 的关系.
如果一个Observable没有发出onCompleted或onError通知, 那么Observer会认为该Observable仍然是活动的(即使已经不再发射任何数据), 并且该Observer可能会向其发出通知(比如unsubscribe或是request通知). 当一个Observable发出onCompleted或onError通知时, 该Observable可能会释放他的资源, 并终止, 他的Observer则不应该再与其进行通信.
onError通知必须包含错误的原因 (也就是说, 调用带有null值的onError是无效的)
在Observable终止之前, 它必须向订阅它的Observer发出onCompleted或onError之一
Observable在接收到Observer发出的subscribe通知后, 会开始发出自己的通知
当一个Observer向Observable发出unsubscribe通知时, Observable会尝试停止向该Observer发出通知. 但并不保证这种情况.
当Observable向Observer发出onError或onCompleted通知时, 会终止订阅关系. Observer不再需要想Observable发出unsubscribe通知
如果有第二个Observer订阅了Observable, 而这个Observable此时已经向第一个Observer发射了一些数据, 那么该Observable是否会向第二个Observer继续发射数据, 或者是否将完整的数据序列重新发射给第二个Observer, 或者是否会向第二个Observer发射完全不同的数据, 以上这些都取决于该Observable的设置. 并不会保证订阅同一个Observable的两个Observer会接收到相同的数据序列.
背压是可选的; 并不是所有的RX语言都实现了背压, 并且在实现了背压的RX语言中, 也并不是所有的Observable或操作符会推荐背压. (@todo 不再翻译了)
大多数的操作符都会返回Observable对象, 所以可以利用这一点进行操作符的链式调用, 完成一系列的操作
用于创建Observable
create
defer
empty/never/throw
from
interval
just
range
repeat
start
timer
用于将Observable发射的数据进行变换
buffer
flatMap
groupBy
map
scan
window
用于从Observable中有选择地发射数据
debounce
distinct
elementAt
filter
first
ignoreElements
last
sample
skip
skipLast
take
takeLast
用于将多个Observable组合为一个单独的Observable
and/then/when
combineLatest
join
merge
startWith
switch
zip
从错误通知中恢复
catch
retry
工具
delay
do
meterialize/dematerialize
observeOn
Serialize
subscribe
subscribeOn
timeInterval
timeout
timestamp
using
对Observable和数据进行判断
all
amb
contains
defaultIfEmpty
sequenceEqual
skipUntil
skipWhile
takeUntil
takeWhile
操作数据序列
average
concat
count
max
min
Reduce
sum
@todo
特殊的Observable, 拥有更多特性
connect
publish
refCount
replay
to
用于将同步方法转换为Observable
start()
toAsync()
方法
转换为 Observable
来执行方法并发射返回值 asyncAction()
asyncFunc()
startFuture()
返回值为Future的方法
转换为一个 Observable
, 并发射Future的返回值 deferFuture()
返回值为Observable的Future
转换为一个 Observable
, 当有Subscriber订阅时, 才返回Future的Observable返回值(有点绕) forEachFuture()
fromAction()
fromCallable()
Observable
, 当Subscriber订阅时, 执行callable并发射其结果或异常 fromRunnable()
Runnable
转换为一个 Observable
, 当Subscriber订阅时, 执行runnable并发射其结果 runAsync()
StoppableObservable
, 它可以发射某个Action或Scheduler指定的多个action 以下可以帮助你选择合适的操作符:
我想创建一个新的Observable |_ 它只发射一个数据: `just` | |_ 该数据是在subscribe时, 由一个方法返回的: `start` | |_ 该数据是在subscribe时, 由Action, Callable, Runnable或类似的返回的: `from` | |_ 该数据在某段时间后发射: `timer` |_ 它是从某个Array, Iterrable或类似的发射数据的: `from` |_ 它是从一个Future获取到的: `start` |_ 它是从一个Future获取的数据序列: `from` |_ 它会重复发射数据序列: `repeat` |_ 它是从某个自定义逻辑创建的: `create` |_ 它为每个subscribe的Observer都创建一个新的Observable: `defer` |_ 它发射一个整形数字序列: `range` | |_ 该序列会根据某种时间间隔发射: `interval` | |_ 并且会在某段延时后才开始发射: `timer` |_ 它不发射任何数据就会结束: `empty` |_ 它什么都不做: `never`
我想通过组合多个Observable来创建一个新的Observable |_ 它会发射所有的Observable中的数据, 顺序按照数据默认的顺序: `merge` |_ 它会发射所有的Observable中的数据, 一次只发射一个Observable的数据: `concat` |_ 它会将多个Observable的数据按顺序组合, 生成一个新的数据来发射 | |_ 并且将每个Observable发射的数据通过某个方法结合, 发射该方法处理后的数据: `zip` | |_ 并且将每个Observable最新发射的数据结合为一个数据进行发射: `combinLatest` | |_ 并且将同一window内的数据结合为一个数据进行发射 | |_ 通过Pattern和Plan中介来发射: `and/then/when` |_ 它是从这些Observable最近发射的数据中发射数据: `switch`
我想将Observable的数据进行变换后再发射 |_ 变换的方式是通过某个方法一次发射一个数据: `map` |_ 变换的方式是发射多个Observable中的所有数据: `flatMap` | |_ 按照发射时间顺序, 一次发射一个Observable: `concatMap` |_ based on all of the items that preceded them: `scan` |_ 变换的方式是为每个数据附加一个时间戳: `timestamp` |_ 变换的方式是将数据转变为距离上次发射的时间间隔: `timeInterval`
我想延长数据发射时间: `delay`
我想将数据和通知都转换为该Observable的数据, 并重新进行发射 |_ 通过将他们封装在Notification对象中: `materialize` |_ 并且我还可以再次解除封装: `dematerialize`
我想忽略Observable的所有数据, 只发射complete和error通知: `ignoreElements`
我想复制一个Observable, 并在它的数据序列前添加其他的数据序列: `startWith` |_ 并且仅在该Observable数据序列为空的情况下才添加其他数据序列: `defaultEmpty`
我想从一个Observable中收集数据, 并通过一个数据的缓冲重新发射: `buffer` |_ 该缓冲只包含最后一个数据: `takeLastBuffer`
我想将一个Observable拆分为多个Observable: `window` |_ 并且相似的数据可以在同一个Observable中: `groupBy`
我想从一个Observable发射的数据中获取某个特定的数据 |_ 要获取的是在complete前发射的最后一个数据: `last` |_ 要获取的是一个单独的数据: `single` |_ 要获取的是第一个发射的数据: `first`
我想重新发射一个Observable中的某些数据 |_ 只要满足过滤条件的数据: `filter` |_ 只要第一个数据: `first` |_ 只要前n个数据: `take` |_ 只要最后一个数据: `last` |_ 只要第n个数据: `elementAt` |_ 只要前几个数据之后的数据 | |_ 即, 跳过前n个数据: `skip` | |_ 即, 直到某个数据满足某种特定条件之后所发射的数据: `skipWhile` | |_ 即, 在开始发射某段时间后的数据: `skip` | |_ 即, 在另一个Observable开始发射数据之后, 原Observable所发射的数据: `skipUtil` |_ 只要除最后几个数据之外的数据 | |_ 即, 除最后n个数据之外的数据: `skipLast` | |_ 即, 在某个数据满足某种特定条件之前所发射的数据: `takeWhile` | |_ 即, 在complete某段时间之前所发射的数据: `skipLast` | |_ 即, 在另一个Observable开始发射数据之前, 原Observable所发射的数据: `takeUtil` |_ 只要间歇采样的数据: `sample` |_ 只要某段时间内不会再有数据发射的数据: `debounce` |_ 只要与已发射数据不重复的数据: `distinct` | |_ if they immediately follow the item they are duplicates of: `distinctUntilChanged` |_ 只要在Observable开始发射数据后, 我的subscriber进行subscribe以后的数据: `delaySubscription`
如果某个Observable是一个Observable集合的第一个, 则重新发射他的数据: `amb`
我想对Observable发射的数据序列进行判断 |_ 判断是否所有数据都满足条件, 然后发射一个单独的boolean值: `all` |_ 判断是否其中某个数据满足条件, 然后发射一个单独的boolena值: `contains` |_ 判断是否Observable没有发射任何数据, 然后发射一个单独的boolean值: `isEmpty` |_ 判断是否该Observable的数据序列和另一个Observable的数据序列一样, 然后发射一个单独的boolean值: `sequenceEqual` |_ 发射所有数据的平均值: `average` |_ 发射所有数据的总和: `sum` |_ 发射数据的个数: `count` |_ 发射数据序列中的最大值: `max` |_ 发射数据序列中的最小值: `min` |_ 通过对每个数据应用一个方法, 并发射该方法的结果: `scan`
我想将Observable发射的整个数据序列转换为另一种数据结构: `to`
我想控制操作符进行操作所在的线程: `subscribeOn` |_ 想控制通知Observer的线程: `observeOn`
我想创建一个在某种事件发生后, 可以激活某个特定的action的Observable: `do`
我想创建一个可以通知Observer发生错误的Observable: `throw` |_ 如果在某段时间内没有发射任何数据, 则通知错误: `timeout`
我想创建一个可以从错误中恢复的Observable |_ 它可以通过转换到一个备份Observable来从超时中恢复: `timeout` |_ 它可以从上游错误通知中恢复: `catch` |_ 通过尝试重新subscribe上游的Observable: `retry`
我想创建一个与Observable有相同生命周期的对象: `using`
我想subscribe一个Observable, 并一直阻塞, 直到该Observable完成时, 接收一个Future: `start`
我想创建一个Observable, 在subscribe时并不发射数据, 直到我要求它才发射: `publish` |_ 它只发射最后一个数据: `publishLast` |_ 它发射全部数据序列, 无论在序列发射后是否有其他进行subscribe: `replay` |_ 当所有subscriber都取消subscribe时, 我要放弃发射: `refCount` |_ 我要要求它开始发射: `connect`
如果你想在操作符中引入多线程, 你可以使用 Schedulers
默认情况下, Observable和操作链会在调用 subscribe
方法的线程中进行操作和发出通知. subscribeOn
操作符可以指定操作Observable的具体线程. observeOn
操作符可以指定Observable向Observer发出通知的线程
observeOn
subscribeOn
注意: subscribe中的通知回调方法是 observeOn
指定的线程, 而不是subscribeOn指定的线程