在介绍 RxJava 1.x
线程调度器之前,首先引入一个重要的概念 - 事件序列转换。 RxJava
提供了对事件序列进行转换的支持,这是它的核心功能之一。
所谓转换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列,有点类似 Java 1.8
中的流处理。
首先看一个 map()
的例子:
Observable.just("images/logo.png") // 输入类型 String .map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 参数类型 String return getBitmapFromPath(filePath); // 返回类型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 参数类型 Bitmap showBitmap(bitmap); } }); 复制代码
这里出现了一个叫 Func1
的类。它和 Action1
非常相似,也是 RxJava
的一个接口,用于包装含有 一个参数 的方法。 Func1
和 Action
的区别在于: Func1
包装的是 有返回值 的方法。另外,和 ActionX
一样, FuncX
也有多个,用于不同参数个数的方法。同理, FuncX
和 ActionX
的区别在 FuncX
包装的是有返回值的方法。
可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。
这种直接转换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava
的转换远不止这样,它不仅可以针对 事件对象 ,还可以针对整个 事件队列 ,这使得 RxJava
变得非常灵活。
下面给出几个示例:
事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava
最常用的变换。 map()
的示意图如下:
这是一个很有用但非常难理解的变换。首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
Student[] students = ...; Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } }; Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber); 复制代码
如果要打印出每个学生所需要修的所有课程的名称呢?需求的区别在于,每个学生只有一个名字,但却有多个课程,首先可以这样实现:
Student[] students = ...; Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } }; Observable.from(students) .subscribe(subscriber); 复制代码
如果我不想在 Subscriber
中使用 for
循环,而是希望 Subscriber
中直接传入单个的 Course
对象呢(这对于代码复用很重要)?用 map()
显然是不行的,因为 map()
是 一对一 的转化,而现在需要 一对多 的转化。问题出现了:怎样把一个 Student
转化成多个 Course
?
这个时候, flatMap()
就派上了用场:
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } }; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber); 复制代码
从上面的代码可以看出, flatMap()
和 map()
有一个相同点:它也是把传入的参数转化之后返回另一个对象。
flatMap()
和 map()
不同的是, flatMap()
返回的是个 Observable
对象,并且这个 Observable
对象并不是被直接发送到 Subscriber
的回调方法中。
flatMap()
示意图如下:
flatMap()
的原理是这样的:
Observable
对象; Observable
, 而是将它激活,然后开始发送事件; Observable
发送的事件,都被汇入同一个 Observable
。 而这个 Observable
负责将这些事件统一交给 Subscriber
的回调方法。这三个步骤,把事件拆成了 两级 ,通过一组新创建的 Observable
将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap()
所谓的 flat
。
这些转换虽然功能各有不同,但实质上都是针对 事件序列的处理和再发送 。而在 RxJava
的内部,它们是基于同一个基础的转换方法: lift(Operator)
。
首先看一下 lift()
的内部实现(核心代码):
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); } 复制代码
这段代码实现的功能,简单来说就是创建了一个新的 Observable
并返回。如果看过上篇博客会发现有些蹊跷。重温一下 Observable.subscribe(Subscriber)
的实现(核心代码):
public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; } 复制代码
对比一下以上两段代码的方法体(忽略返回值),会发现一行突兀的代码:
Subscriber newSubscriber = operator.call(subscriber); 复制代码
解释一下 lift()
方法完成的操作:
利用 Observable.create()
方法创建一个新的 Observable
对象,加上之前的原始 Observable
,已经有两个 Observable
。
创建 Observable
的同时创建一个新的 OnSubscribe
用于发出事件。
通过 lift()
传入的 Operator
函数的 call()
方法构造一个新的 Subscriber
对象,并将新 Subscriber
和原始 Subscriber
进行关联。
利用这个新 Subscriber
向原始 Observable
进行订阅,实现事件序列的转换。
这种实现基于代理模式,通过事件拦截和处理实现事件序列的变换。
在 Observable
执行了 lift(Operator)
方法之后,会返回一个新的 Observable
,这个新的 Observable
会像一个代理一样,负责接收原始的 Observable
发出的事件,并在处理后发送给 Subscriber
。
整个过程的思维导图如下:
或者可以看动图:
两次和多次的 lift()
同理,如下图:
举一个具体的 Operator
的实现。下面是一个将事件的 Integer
对象转换成 String
的例子,仅供参考:
observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 将事件序列中的 Integer 对象转换为 String 对象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } }); 复制代码
学习 lift() 的原理只是为了更好地理解 RxJava ,从而可以更好地使用它。然而RxJava 不建议开发者自定义 Operator 来直接使用 lift(),而是尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合。
除了 lift()
之外, Observable
还有一个转方法叫做 compose()
。它和 lift()
的区别在于, lift()
是针对 事件项 和 事件序列 的,而 compose()
是针对 Observable
自身进行转换。
举个例子,假设在程序中有多个 Observable
都需要应用一组相同的 lift()
进行转换,通常会这样写:
observable1.lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1); observable2.lift1() .lift2() .lift3() .lift4() .subscribe(subscriber2); observable3.lift1() .lift2() .lift3() .lift4() .subscribe(subscriber3); observable4.lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1); 复制代码
可以发现有太多重复代码,代码重构如下:
private Observable liftAll(Observable observable) { return observable.lift1() .lift2() .lift3() .lift4(); } liftAll(observable1).subscribe(subscriber1); liftAll(observable2).subscribe(subscriber2); liftAll(observable3).subscribe(subscriber3); liftAll(observable4).subscribe(subscriber4); 复制代码
可读性、可维护性都提高了。可是 Observable
被一个方法包起来,这种方式对于 Observale
的灵活性进行了限制。怎么办?这个时候,就应该用 compose()
来解决了:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> { @Override public Observable<String> call(Observable<Integer> observable) { return observable.lift1() .lift2() .lift3() .lift4(); } } Transformer liftAll = new LiftAllTransformer(); observable1.compose(liftAll).subscribe(subscriber1); observable2.compose(liftAll).subscribe(subscriber2); observable3.compose(liftAll).subscribe(subscriber3); observable4.compose(liftAll).subscribe(subscriber4); 复制代码
如上,使用 compose()
方法, Observable
可以利用传入的 Transformer
对象的 call
方法直接对自身进行处理,而不是被包在方法的里面。
本文主要介绍了 RxJava
事件及事件序列转换原理,其中 lift()
方法的使用方法和实现原理是重点、难点。后续将会介绍的 RxJava
线程调度器底层也是基于它实现的。
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。