上一篇说了 rxjava2
的线程池原理,这篇我们来说说 rxjava
的变换。
变换和线程切换算是 rxjava
最关键的两个功能。常见的变换有 map()
, flatMap()
。我们先从 map
方法说起吧。
我们先举一个简单的例子,来看看 map
能做什么:
Student[] students = ...; Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } ... }; Observable.from(students) .map(new Function<Student, String>() { @Override public String apply(Student student) { return student.getName(); } }) .subscribe(subscriber); 复制代码
上面的例子是一个功能,打印一个班级里students的名字。很简单,通过 from
方法对student进行遍历,一个 map
方法将student变换成name,然后下游打印就完事了。我们知道 rxjava2
里面是有很多泛型设定的,如果类型错误是会直接标红。 from
方法返回的下游数据类型是student,而 subscriber
中接收的数据类型必须是String。很显然,这里map就将下游的数据类型进行了变换。
具体在源码中是如何实现的呢?我们先看 map
的源码:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } 复制代码
还是老样子,抛开判空代码和hock机制,直接看 ObservableMap
类。不过在此之前,先看看 map
方法里面设定的泛型。T是Observable里设定的上游数据类型,map方法会返回一个Observable,这里就将整个链条的数据类型进行了变换。
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } 复制代码
看过前面的几篇就知道,这里还是老套路,还是装饰器模式,还是创建一个内部处理器 MapObserver
。内部处理器 MapObserver
负责与上游绑定,所以它的处理数据类型仍为T。 ObservableMap
与下游进行绑定订阅,所以 ObservableMap
中数据的类型为R。我们在看 MapObserver
之前,先看看 Function
是什么。
public interface Function<T, R> { /** * Apply some calculation to the input value and return some other value. * @param t the input value * @return the output value * @throws Exception on error */ R apply(@NonNull T t) throws Exception; } 复制代码
OK,Function是一个接口,只有一个接口方法 apply
。 Function
规定了两个泛型:T、R。其中T是 apply
的参数类型,R是返回值类型。我们在使用过程中,重写 apply
方法进行数据类型变换,然后再用 map
方法插入到整条流水线中,就达到了变换的目的。
下面看看 MapObserver
中具体是怎么实现的:
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); } 复制代码
很简单, MapObserver
的 onNext
负责处理上游下来的数据,在 onNext
方法中调用 Function
的 apply
方法,将 T
变换为下游需要的 U
(也就是前面的 R
),然后再将数据传递下去,达到变换的目的。
map的使用和源码都很简单,我们来看看 flatMap
的。
还是先用一个简单的例子来看 flatMap
的用途:
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ... }; Observable.from(students) .flatMap(new Function<Student, Observable<Course>>() { @Override public Observable<Course> apply(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber); 复制代码
产品说功能要改一改,不是打印每个student的名字,而是要打印每个sutdent所有课程名称。正常情况下,我们在 subscriber
中获取到每个student,然后用个for循环进行遍历打印就行,但是 flatMap
可以直接一步搞定。
细心的已经发现,这里的 Function
比较奇怪,它的返回值类型竟然是 Observable
。具体怎么回事,我们看看源码:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { //这里的delayErrors,maxConcurrency,bufferSize都是默认值。 return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize)); } 复制代码
先解释一下, delayErrors
, maxConcurrency
, bufferSize
这几个参数的意义:
delayErrors
表示异常是否需要延迟到所有内部数据都传输完毕后抛出。默认值是 false
。 maxConcurrency
表示最大并发数,默认值为 Integer.MAX_VALUE
。 bufferSize
缓存的内部被观察者事件总数大小,默认值为128.
老样子,我们直接看 ObservableFlatMap
:
public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { super(source); this.mapper = mapper; this.delayErrors = delayErrors; this.maxConcurrency = maxConcurrency; this.bufferSize = bufferSize; } @Override public void subscribeActual(Observer<? super U> t) { if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { return; } source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } 复制代码
还是原来的配方,还是原来的味道。我们来看看 MergeObserver
的源码一探究竟:
@Override public void onNext(T t) { //调用apply方法,获取到转换的Observable ObservableSource<? extends U> p; try { p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); upstream.dispose(); onError(e); return; } //隐藏了一些判断代码 subscribeInner(p); } @SuppressWarnings("unchecked") void subscribeInner(ObservableSource<? extends U> p) { for (;;) { //这里会走到else if (p instanceof Callable) { ... } else { //这里新建一个InnerObserver,调用addInner添加到队列中,然后用apply中生成的Observable与之订阅。 InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++); if (addInner(inner)) { p.subscribe(inner); } break; } } } 复制代码
如注释中所示,这里根据上游每一个数据,生成一个 Observable
,然后新建一个 InnerObserver
,将这个 InnerObserver
添加到内部处理器队列中,并将 Observable
与这个 InnerObserver
进行订阅。
我们以 Observable.from()
为例,看看这中间的流程是什么样的。
//from 方法返回一个ObservableFromArray装饰器 public static <T> Observable<T> fromArray(T... items) { //省略部分判空代码 return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); } //ObservableFromArray源码 public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> observer) { //订阅后,创建一个FromArrayDisposable内部类对象 FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); //这个方法很关键,我们待会可以看看InnerObserver的onSubscribe方法。 observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } //FromArrayDisposable不是一个处理器,他只是一个带简单队列的Disposable static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> downstream; final T[] array; int index; boolean fusionMode; volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.downstream = actual; this.array = array; } // 这里显然是返回同步 @Override public int requestFusion(int mode) { if ((mode & SYNC) != 0) { fusionMode = true; return SYNC; } return NONE; } //poll方法会逐个返回队列中的数据 @Nullable @Override public T poll() { int i = index; T[] a = array; if (i != a.length) { index = i + 1; return ObjectHelper.requireNonNull(a[i], "The array element is null"); } return null; } @Override public boolean isEmpty() { return index == array.length; } @Override public void clear() { index = array.length; } @Override public void dispose() { disposed = true; } @Override public boolean isDisposed() { return disposed; } //在run方法中,开始向下游传递数据。不过这时候已经不重要了,因为在InnerObserver的onSubscribe方法中,已经通过poll方法将队列中的数据都传递出去了。当然这仅仅是在这个示例中是这样 void run() { T[] a = array; int n = a.length; //开始向下游传递数据 for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } } } 复制代码
如上面注释所示, from
方法返回一个简单的 ObservableFromArray
, ObservableFromArray
的 subscribe
中,调用下游处理器的 onSubscribe
方法,然后调用自身的 run
方法。我们看看 InnerObserver
中是怎么处理的:
static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> { private static final long serialVersionUID = -4606175640614850599L; final long id; final MergeObserver<T, U> parent; volatile boolean done; volatile SimpleQueue<U> queue; int fusionMode; //这里会用一个独特的ID来给每个InnerObserver做标记 InnerObserver(MergeObserver<T, U> parent, long id) { this.id = id; this.parent = parent; } @Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { //FromArrayDisposable满足这个条件 if (d instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable<U> qd = (QueueDisposable<U>) d; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); //由上面FromArrayDisposable的源码可知这里返回SYNC if (m == QueueDisposable.SYNC) { fusionMode = m; queue = qd; //这里直接将done设置为true,是因为下面的parent.drain()会直接取出所有数据并传递给下游 done = true; //数据在这其中进行下发和传递 parent.drain(); return; } if (m == QueueDisposable.ASYNC) { fusionMode = m; queue = qd; } } } } @Override public void onNext(U t) { if (fusionMode == QueueDisposable.NONE) { parent.tryEmit(t, this); } else { //当上游执行到这里时,数据已经被传递完毕了。这里单指这次示例 parent.drain(); } } .... } 复制代码
具体的信息都写在上面的注释中,我们直接来看 MergeObserver
的 drain()
方法。
void drain() { //这里进行判断,确保drainLoop还在执行时不会被再次调用 if (getAndIncrement() == 0) { drainLoop(); } } void drainLoop() { //获取到下游Observer final Observer<? super U> child = this.downstream; int missed = 1; for (;;) { //判断是否有error if (checkTerminate()) { return; } ... boolean d = done; svq = queue; InnerObserver<?, ?>[] inner = observers.get(); int n = inner.length; int nSources = 0; ... int innerCompleted = 0; if (n != 0) { //初始lastId lastIndex都为0 long startId = lastId; int index = lastIndex; ... int j = index; sourceLoop: for (int i = 0; i < n; i++) { //获取到当前InnerObserver @SuppressWarnings("unchecked") InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j]; //q就是FromArrayDisposable。 SimpleQueue<U> q = is.queue; if (q != null) { for (;;) { U o; try { //在这里循环调取FromArrayDisposable队列中数据,然后传递到下游 o = q.poll(); } catch (Throwable ex) { .... } if (o == null) { break; } child.onNext(o); ... } } //前面标记过,在onSubscribe中已经将done设置为true. boolean innerDone = is.done; SimpleQueue<U> innerQueue = is.queue; //由于上面已经将数据处理完毕,这里innerQueue.isEmpty()返回为true。 if (innerDone && (innerQueue == null || innerQueue.isEmpty())) { //将该InnerObserver从队列中移除 removeInner(is); if (checkTerminate()) { return; } innerCompleted++; } j++; if (j == n) { j = 0; } } lastIndex = j; lastId = inner[j].id; } ... //这里与开头getAndIncrement()相呼应,确保drainLoop在执行时不会被再次调用 missed = addAndGet(-missed); if (missed == 0) { break; } } } 复制代码
OK,整个流程就清晰了,划重点:
flatMap()
是基础装饰器 Observable
的一个方法,参数是一个 Function
,只不过这个 Function
中 apply()
方法返回类型为一个 Observable
。 flatMap()
返回一个 ObservableFlatMap
装饰器对象。 ObservableFlatMap
被订阅后会调用 subscribeActual()
方法,在此方法中,会创建一个内部类 MergeObserver
对象,并将上游装饰器与之订阅。 MergeObserver
在接收到上游数据后,会调用 Function
中 apply()
方法,将数据转换为一个 Observable
,并创建一个内部 InnerObserver
,将这个 InnerObserver
放入队列中,然后将生成的 Observable
与之订阅。 InnerObserver
的 onSubscribe()
方法会直接调用 MergeObserver
的 drain()
方法,将数据全部都直接传递给下游。从而完成整个流程。
观察代码会发现,同步仅仅是 flatMap
的一个简单情况,更复杂的情况在于异步。具体的大家可以去源码里研究一下,毕竟这篇的篇幅已经够长了。下一篇预告一下,我们来看看背压。