RxJava
是一款基于 Java VM
实现的响应式编程扩展库 - 基于观察者模式的异步和事件处理框架。 RxJava
官方目前同时维护了两个版本,分别是 1.x
和 2.x
,区别是它们使用不同的 group id
和 namespaces
。
版本 | group id | namespaces |
---|---|---|
v1.x | io.reactivex | io.reactivex |
v2.x | io.reactivex.rxjava2 | rx |
本系列的文章将针对 RxJava 1.x
进行介绍,先给出 Github
的地址:
通过 Gradle 引入相关依赖:
compile 'io.reactivex:rxjava:1.0.14' compile 'io.reactivex:rxandroid:1.0.1'
一个精准的解释如下: RxJava
是一个运行于 Java VM
,由可观测序列组成的,异步、基于事件的函数库。
换句话说,『同样是做异步,为什么人们用它,而不用现成的 AsyncTask
/ Handler
/ XXX
/ ... ?』
一个词: 简洁 。
异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android
创造的 AsyncTask 和Handler
,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。
在 Android
开发中,假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView
,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders
中每个目录下的 png
图片都加载出来并显示在 imageCollectorView
中。
注意: 由于读取图片的过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。
常用的实现方式有多种,这里给出其中一种:
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start();
而如果使用 RxJava
,实现方式是这样的:
Observable.from(folders) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } });
可以发现,使用 RxJava 方式代码量明显大大增加,所谓简洁从何而来?
这里说的简洁是指的 逻辑 上的。观察一下你会发现, RxJava
的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显(试想如果还要求只选取前 10 张图片,常规方式要怎么办?如果有更多这样那样的要求呢?再试想,在这一大堆需求实现完两个月之后需要改功能,当你翻回这里看到自己当初写下的那一片迷之缩进,你能保证自己将迅速看懂,而不是对着代码重新捋一遍思路?)。
另外,如果你的 IDE
是 Android Studio
,其实每次打开某个 Java
文件的时候,你会看到被自动 Lambda
化的预览,这将让你更加清晰地看到程序逻辑:
Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
所以, RxJava
有啥优点?就好在简洁,优点就是把复杂逻辑,通过函数式编程模型穿成一条线。
RxJava
的异步实现,是通过一种扩展的观察者模式来实现的。
观察者模式面向的需求是: A
对象(观察者)对 B
对象(被观察者)的某种变化高度敏感,需要在 B
变化的一瞬间做出反应。
举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。
程序的观察者模式略有不同,观察者不需要时刻盯着被观察者(例如 A
不需要每过 2ms
就检查一次 B
的状态),而是采用 注册 ( Register
)或者称为 订阅 ( Subscribe
)的方式,告诉被观察者: 我需要你的某种状态,你要在它变化的时候通知我 。
采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。
Android
开发中一个典型的例子是点击监听器 OnClickListener
。对设置 OnClickListener
来说, View
是 被观察者 , OnClickListener
是 观察者 ,二者通过 setOnClickListener()
方法达成 订阅关系 。订阅之后用户点击按钮的瞬间, Android Framework
就会将点击事件发送给 已注册 的 OnClickListener 。
OnClickListener
的观察者模式大致如下图:
如图所示,通过 setOnClickListener()
方法, Button
持有 OnClickListener
的引用(这一过程没有在图上画出)。当用户点击时, Button
自动调用 OnClickListener
的 onClick()
方法。
按照观察者模式抽象出来的各个概念:
就由专用的观察者模式转变成了通用的观察者模式,如下图:
RxJava
有四个基本概念:
Observable
和 Observer
通过 subscribe()
方法实现订阅关系,使得 Observable
可以在需要的时候发出事件来通知 Observer
。
与传统观察者模式不同, RxJava
的事件回调方法除了普通事件 onNext()
(相当于 onClick()
) 之外,还定义了两个特殊的事件: onCompleted()
和 onError()
。
RxJava
不仅把每个事件单独处理,还会把它们看做一个队列。 RxJava
规定,当不会再有新的 onNext()
发出时,需要触发 onCompleted()
方法作为事件完成标志。
在事件处理过程中出异常时, onError()
会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个被调用,并且是事件序列中的最后一个执行。
RxJava
的观察者模式大致如下图:
基于以上的概念, RxJava
的基本使用有 3 个步骤:
Observer
即观察者,它决定事件触发的时候将有怎样的行为。 RxJava
中的 Observer
接口的声明方式:
Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error: " + e.getMessage()); } };
除了 Observer
接口之外, RxJava
还内置了一个实现了 Observer
的抽象类: Subscriber
。 Subscriber
对 Observer
接口进行了一些扩展,但他们的基本使用方式是完全一样的:
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error: " + e.getMessage()); } };
实质上,在 RxJava
的 subscribe
过程中, Observer
也总是会先被转换成一个 Subscriber
再使用。所以如果你只想使用基本功能,选择 Observer
和 Subscriber
是完全一样的。它们的区别对于使用者来说主要有两点:
这是 Subscriber
增加的方法。它会在 subscribe
刚开始,而事件还未发送之前被调用。可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。
需要注意的是,如果对准备工作的线程有要求(例如: 弹出一个显示进度的对话框,这必须在主线程执行), onStart()
就不适用了。因为它总是在 subscribe
所发生的 线程 被调用,而不能 指定线程 。要在指定的线程来做准备工作,可以使用 doOnSubscribe()
方法,具体可以在后面的章节中看到。
这是 Subscriber
所实现的另一个接口 Subscription
的方法,用于 取消订阅 。在这个方法被调用后, Subscriber
将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed()
先判断一下状态。
unsubscribe()
这个方法很重要,因为在 subscribe()
之后, Observable
会持有 Subscriber 的引用。这个引用如果不能及时被释放,将有 内存泄露 的风险。
注意:在不再使用的时候尽快在合适的地方(例如: onPause()
和 onStop()
等方法中)调用 unsubscribe()
来解除 引用关系 ,以避免内存泄露的发生。
Observable
即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava
使用 create()
方法来创建一个 Observable
,并为它定义事件触发规则。示例如下:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } });
可以看到,这里传入了一个 OnSubscribe
对象作为参数。 OnSubscribe
会被存储在返回的 Observable
对象中。
它的作用相当于一个计划表,当 Observable
被订阅的时候, OnSubscribe
的 call()
方法会自动被调用, 事件序列 就会依照设定依次触发(对于上面的代码,就是观察者 Subscriber
将会被调用三次 onNext()
和一次 onCompleted()
)。
这样,由 被观察者 调用了 观察者 的回调方法,就实现了由被观察者向观察者的 事件传递 ,即观察者模式。
create()
方法是 RxJava
最基本的创建 事件序列 的方法。基于这个方法, RxJava
还提供了一些方法用于快捷创建事件队列,例如 just()
方法:
Observable observable = Observable.just("Hello", "Hi", "Aloha"); // 将会依次调用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted()
将传入的数组或 Iterable
拆分成具体对象后,依次发送给观察者,示例如下:
String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words); // 将会依次调用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted()
创建了 Observable
和 Observer
之后,再用 subscribe()
方法将它们关联起来,整条链子就可以工作了。代码很简单:
observable.subscribe(observer); // 或者 observable.subscribe(subscriber);
可能会注意到,subscribe() 这个方法有点怪:它看起来是『observable 订阅了 observer / subscriber』,而不是『observer / subscriber 订阅了 observable』。这看起来就像『杂志订阅了读者』一样颠倒了对象关系。
这让人读起来有点别扭,不过如果把 API 设计成 『observer.subscribe(observable) / subscriber.subscribe(observable)』,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。
Observable.subscribe(Subscriber)
的内部实现是这样的(核心代码):
public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; }
可以看到 subscriber()
做了3件事:
(a). 调用Subscriber.onStart()
这个方法在前面已经介绍过,是一个可选的准备方法。
(b). 调用Observable中的OnSubscribe.call(Subscriber)
事件发送的逻辑开始运行。从这也可以看出,在RxJava中,Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()方法执行的时候。
(c). 返回Subscription
将传入的Subscriber作为Subscription返回。这是为了方便后面的unsubscribe()。
整个过程中对象间的关系如下图:
或者可以看动图:
除了 subscribe(Observer)
和 subscribe(Subscriber)
, subscribe()
还支持不完整定义的回调, RxJava
会自动根据定义创建出 Subscriber
。形式如下:
Action1<String> onNextAction = new Action1<String>() { // onNext() @Override public void call(String s) { Log.d(tag, s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { // onError() @Override public void call(Throwable throwable) { // Error handling } }; Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { Log.d(tag, "completed"); } }; // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext() observable.subscribe(onNextAction); // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
简单解释一下这段代码中出现的 Action1
和 Action0
。
Action0
是 RxJava
的一个接口,它只有一个方法 call()
,这个方法是 无参无返回值的 。由于 onCompleted()
方法也是 无参无返回值的 ,因此 Action0
可以被当成一个 包装对象 ,将 onCompleted()
的内容打包起来将自己作为一个参数传入 subscribe()
以实现不完整定义的回调。
Action1
也是一个接口,它同样只有一个方法 call(T param)
,这个方法也无返回值,但有一个参数。与 Action0
同理,由于 onNext(T obj)
和 onError(Throwable error)
也是 单参数无返回值的 ,因此 Action1
可以将 onNext(obj)
和 onError(error)
打包起来传入 subscribe()
以实现不完整定义的回调。
事实上,虽然 Action0
和 Action1
在 API
中使用最广泛,但 RxJava
提供了多个 ActionX
形式的接口 (例如: Action2
, Action3
),它们可以被用以包装不同的无返回值的方法。
将字符串数组 names 中的所有字符串依次打印出来:
String[] names = ...; Observable.from(names) .subscribe(new Action1<String>() { @Override public void call(String name) { Log.d(tag, name); } });
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } });
正如上面两个例子这样,创建出 Observable
和 Subscriber
,再用 subscribe()
将它们串起来,一次 RxJava
的基本使用就完成了,非常简单!
然而。
在 RxJava
的默认规则中,事件的发出和消费都是在 同一个线程 的。也就是说,如果只用上面的方法,实现出来的只是一个 同步的观察者模式 。观察者模式本身的目的就是『后台处理,前台回调』的 异步机制 ,因此异步对于 RxJava
是至关重要的。而要实现异步,则需要用到 RxJava
的另一个核心的概念 Scheduler
,后续将给出详细介绍。
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。