在介绍 RxJava 1.x 线程调度器之前,首先引入一个重要的概念 - 事件序列转换。 RxJava 提供了对事件序列进行转换的支持,这是它的核心功能之一。
所谓转换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列,有点类似 Java 1.8 中的流处理。
首先看一个 map() 的例子:
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
复制代码
这里出现了一个叫 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有 一个参数 的方法。 Func1 和 Action 的区别在于: Func1 包装的是 有返回值 的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。同理, FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。
这种直接转换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的转换远不止这样,它不仅可以针对 事件对象 ,还可以针对整个 事件队列 ,这使得 RxJava 变得非常灵活。
下面给出几个示例:
事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava 最常用的变换。 map() 的示意图如下:
这是一个很有用但非常难理解的变换。首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
复制代码
如果要打印出每个学生所需要修的所有课程的名称呢?需求的区别在于,每个学生只有一个名字,但却有多个课程,首先可以这样实现:
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
};
Observable.from(students)
.subscribe(subscriber);
复制代码
如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是 一对一 的转化,而现在需要 一对多 的转化。问题出现了:怎样把一个 Student 转化成多个 Course ?
这个时候, flatMap() 就派上了用场:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
复制代码
从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。
flatMap() 和 map() 不同的是, flatMap() 返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到 Subscriber 的回调方法中。
flatMap() 示意图如下:
flatMap() 的原理是这样的:
Observable 对象; Observable , 而是将它激活,然后开始发送事件; Observable 发送的事件,都被汇入同一个 Observable 。 而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了 两级 ,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat 。
这些转换虽然功能各有不同,但实质上都是针对 事件序列的处理和再发送 。而在 RxJava 的内部,它们是基于同一个基础的转换方法: lift(Operator) 。
首先看一下 lift() 的内部实现(核心代码):
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
复制代码
这段代码实现的功能,简单来说就是创建了一个新的 Observable 并返回。如果看过上篇博客会发现有些蹊跷。重温一下 Observable.subscribe(Subscriber) 的实现(核心代码):
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
复制代码
对比一下以上两段代码的方法体(忽略返回值),会发现一行突兀的代码:
Subscriber newSubscriber = operator.call(subscriber); 复制代码
解释一下 lift() 方法完成的操作:
利用 Observable.create() 方法创建一个新的 Observable 对象,加上之前的原始 Observable ,已经有两个 Observable 。
创建 Observable 的同时创建一个新的 OnSubscribe 用于发出事件。
通过 lift() 传入的 Operator 函数的 call() 方法构造一个新的 Subscriber 对象,并将新 Subscriber 和原始 Subscriber 进行关联。
利用这个新 Subscriber 向原始 Observable 进行订阅,实现事件序列的转换。
这种实现基于代理模式,通过事件拦截和处理实现事件序列的变换。
在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable ,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber 。
整个过程的思维导图如下:
或者可以看动图:
两次和多次的 lift() 同理,如下图:
举一个具体的 Operator 的实现。下面是一个将事件的 Integer 对象转换成 String 的例子,仅供参考:
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 将事件序列中的 Integer 对象转换为 String 对象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
复制代码
学习 lift() 的原理只是为了更好地理解 RxJava ,从而可以更好地使用它。然而RxJava 不建议开发者自定义 Operator 来直接使用 lift(),而是尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合。
除了 lift() 之外, Observable 还有一个转方法叫做 compose() 。它和 lift() 的区别在于, lift() 是针对 事件项 和 事件序列 的,而 compose() 是针对 Observable 自身进行转换。
举个例子,假设在程序中有多个 Observable 都需要应用一组相同的 lift() 进行转换,通常会这样写:
observable1.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
observable2.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);
observable3.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3);
observable4.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
复制代码
可以发现有太多重复代码,代码重构如下:
private Observable liftAll(Observable observable) {
return observable.lift1()
.lift2()
.lift3()
.lift4();
}
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
复制代码
可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性进行了限制。怎么办?这个时候,就应该用 compose() 来解决了:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable.lift1()
.lift2()
.lift3()
.lift4();
}
}
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
复制代码
如上,使用 compose() 方法, Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,而不是被包在方法的里面。
本文主要介绍了 RxJava 事件及事件序列转换原理,其中 lift() 方法的使用方法和实现原理是重点、难点。后续将会介绍的 RxJava 线程调度器底层也是基于它实现的。
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。