转载

rxjava2源码解析(四)--变换

上一篇说了 rxjava2 的线程池原理,这篇我们来说说 rxjava 的变换。

变换和线程切换算是 rxjava 最关键的两个功能。常见的变换有 map() , flatMap() 。我们先从 map 方法说起吧。

map

我们先举一个简单的例子,来看看 map 能做什么:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Function<Student, String>() {
        @Override
        public String apply(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);
复制代码

上面的例子是一个功能,打印一个班级里students的名字。很简单,通过 from 方法对student进行遍历,一个 map 方法将student变换成name,然后下游打印就完事了。我们知道 rxjava2 里面是有很多泛型设定的,如果类型错误是会直接标红。 from 方法返回的下游数据类型是student,而 subscriber 中接收的数据类型必须是String。很显然,这里map就将下游的数据类型进行了变换。

具体在源码中是如何实现的呢?我们先看 map 的源码:

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
复制代码

还是老样子,抛开判空代码和hock机制,直接看 ObservableMap 类。不过在此之前,先看看 map 方法里面设定的泛型。T是Observable里设定的上游数据类型,map方法会返回一个Observable,这里就将整个链条的数据类型进行了变换。

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
复制代码

看过前面的几篇就知道,这里还是老套路,还是装饰器模式,还是创建一个内部处理器 MapObserver 。内部处理器 MapObserver 负责与上游绑定,所以它的处理数据类型仍为T。 ObservableMap 与下游进行绑定订阅,所以 ObservableMap 中数据的类型为R。我们在看 MapObserver 之前,先看看 Function 是什么。

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}
复制代码

OK,Function是一个接口,只有一个接口方法 applyFunction 规定了两个泛型:T、R。其中T是 apply 的参数类型,R是返回值类型。我们在使用过程中,重写 apply 方法进行数据类型变换,然后再用 map 方法插入到整条流水线中,就达到了变换的目的。

下面看看 MapObserver 中具体是怎么实现的:

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
复制代码

很简单, MapObserveronNext 负责处理上游下来的数据,在 onNext 方法中调用 Functionapply 方法,将 T 变换为下游需要的 U (也就是前面的 R ),然后再将数据传递下去,达到变换的目的。

map的使用和源码都很简单,我们来看看 flatMap 的。

flatMap

还是先用一个简单的例子来看 flatMap 的用途:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Function<Student, Observable<Course>>() {
        @Override
        public Observable<Course> apply(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);
复制代码

产品说功能要改一改,不是打印每个student的名字,而是要打印每个sutdent所有课程名称。正常情况下,我们在 subscriber 中获取到每个student,然后用个for循环进行遍历打印就行,但是 flatMap 可以直接一步搞定。

细心的已经发现,这里的 Function 比较奇怪,它的返回值类型竟然是 Observable 。具体怎么回事,我们看看源码:

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        //这里的delayErrors,maxConcurrency,bufferSize都是默认值。
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
复制代码

先解释一下, delayErrorsmaxConcurrency , bufferSize 这几个参数的意义:

  • delayErrors 表示异常是否需要延迟到所有内部数据都传输完毕后抛出。默认值是 false
  • maxConcurrency 表示最大并发数,默认值为 Integer.MAX_VALUE
  • bufferSize 缓存的内部被观察者事件总数大小,默认值为128.

老样子,我们直接看 ObservableFlatMap

public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
复制代码

还是原来的配方,还是原来的味道。我们来看看 MergeObserver 的源码一探究竟:

@Override
        public void onNext(T t) {
            //调用apply方法,获取到转换的Observable
            ObservableSource<? extends U> p;
            try {
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                upstream.dispose();
                onError(e);
                return;
            }
            //隐藏了一些判断代码
            subscribeInner(p);
        }

        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                //这里会走到else
                if (p instanceof Callable) {
                    ...
                } else {
                //这里新建一个InnerObserver,调用addInner添加到队列中,然后用apply中生成的Observable与之订阅。
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }
复制代码

如注释中所示,这里根据上游每一个数据,生成一个 Observable ,然后新建一个 InnerObserver ,将这个 InnerObserver 添加到内部处理器队列中,并将 Observable 与这个 InnerObserver 进行订阅。

我们以 Observable.from() 为例,看看这中间的流程是什么样的。

//from 方法返回一个ObservableFromArray装饰器
    public static <T> Observable<T> fromArray(T... items) {
       //省略部分判空代码
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    
    
//ObservableFromArray源码
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

    
    @Override
    public void subscribeActual(Observer<? super T> observer) {
        //订阅后,创建一个FromArrayDisposable内部类对象
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
        //这个方法很关键,我们待会可以看看InnerObserver的onSubscribe方法。
        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }
    
    //FromArrayDisposable不是一个处理器,他只是一个带简单队列的Disposable
    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        final Observer<? super T> downstream;
        final T[] array;
        int index;
        boolean fusionMode;
        volatile boolean disposed;
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.downstream = actual;
            this.array = array;
        }
        // 这里显然是返回同步
        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }

        //poll方法会逐个返回队列中的数据
        @Nullable
        @Override
        public T poll() {
            int i = index;
            T[] a = array;
            if (i != a.length) {
                index = i + 1;
                return ObjectHelper.requireNonNull(a[i], "The array element is null");
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == array.length;
        }

        @Override
        public void clear() {
            index = array.length;
        }

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
        //在run方法中,开始向下游传递数据。不过这时候已经不重要了,因为在InnerObserver的onSubscribe方法中,已经通过poll方法将队列中的数据都传递出去了。当然这仅仅是在这个示例中是这样
        void run() {
            T[] a = array;
            int n = a.length;
            //开始向下游传递数据
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }
    }
}

复制代码

如上面注释所示, from 方法返回一个简单的 ObservableFromArrayObservableFromArraysubscribe 中,调用下游处理器的 onSubscribe 方法,然后调用自身的 run 方法。我们看看 InnerObserver 中是怎么处理的:

static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;

        volatile boolean done;
        volatile SimpleQueue<U> queue;

        int fusionMode;
        //这里会用一个独特的ID来给每个InnerObserver做标记
        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                //FromArrayDisposable满足这个条件
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<U> qd = (QueueDisposable<U>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    //由上面FromArrayDisposable的源码可知这里返回SYNC
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        queue = qd;
                        //这里直接将done设置为true,是因为下面的parent.drain()会直接取出所有数据并传递给下游
                        done = true;
                        //数据在这其中进行下发和传递
                        parent.drain();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;
                    }
                }
            }
        }

        @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                //当上游执行到这里时,数据已经被传递完毕了。这里单指这次示例
                parent.drain();
            }
        }

        ....
    }
复制代码

具体的信息都写在上面的注释中,我们直接来看 MergeObserverdrain() 方法。

void drain() {
            //这里进行判断,确保drainLoop还在执行时不会被再次调用
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }

        void drainLoop() {
            //获取到下游Observer
            final Observer<? super U> child = this.downstream;
            int missed = 1;
            for (;;) {
                //判断是否有error
                if (checkTerminate()) {
                    return;
                }
                ...
                boolean d = done;
                svq = queue;
                InnerObserver<?, ?>[] inner = observers.get();
                int n = inner.length;
                int nSources = 0;
                ...
                int innerCompleted = 0;
                if (n != 0) {
                //初始lastId lastIndex都为0
                    long startId = lastId;
                    int index = lastIndex;
                    ...
                    int j = index;
                    sourceLoop:
                    for (int i = 0; i < n; i++) {
                        
                        //获取到当前InnerObserver
                        @SuppressWarnings("unchecked")
                        InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                        //q就是FromArrayDisposable。
                        SimpleQueue<U> q = is.queue;
                        if (q != null) {
                            for (;;) {
                                U o;
                                try {
                                    //在这里循环调取FromArrayDisposable队列中数据,然后传递到下游
                                    o = q.poll();
                                } catch (Throwable ex) {
                                    ....
                                }
                                if (o == null) {
                                    break;
                                }
                                child.onNext(o);
                                ...
                            }
                        }
                        //前面标记过,在onSubscribe中已经将done设置为true.
                        boolean innerDone = is.done;
                        SimpleQueue<U> innerQueue = is.queue;
                        //由于上面已经将数据处理完毕,这里innerQueue.isEmpty()返回为true。
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            //将该InnerObserver从队列中移除
                            removeInner(is);
                            if (checkTerminate()) {
                                return;
                            }
                            innerCompleted++;
                        }

                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    lastIndex = j;
                    lastId = inner[j].id;
                }
                ...
                //这里与开头getAndIncrement()相呼应,确保drainLoop在执行时不会被再次调用
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
复制代码

OK,整个流程就清晰了,划重点:

  • flatMap() 是基础装饰器 Observable 的一个方法,参数是一个 Function ,只不过这个 Functionapply() 方法返回类型为一个 Observable
  • flatMap() 返回一个 ObservableFlatMap 装饰器对象。 ObservableFlatMap 被订阅后会调用 subscribeActual() 方法,在此方法中,会创建一个内部类 MergeObserver 对象,并将上游装饰器与之订阅。
  • MergeObserver 在接收到上游数据后,会调用 Functionapply() 方法,将数据转换为一个 Observable ,并创建一个内部 InnerObserver ,将这个 InnerObserver 放入队列中,然后将生成的 Observable 与之订阅。
  • 在同步的状态下, InnerObserveronSubscribe() 方法会直接调用 MergeObserverdrain() 方法,将数据全部都直接传递给下游。从而完成整个流程。

观察代码会发现,同步仅仅是 flatMap 的一个简单情况,更复杂的情况在于异步。具体的大家可以去源码里研究一下,毕竟这篇的篇幅已经够长了。下一篇预告一下,我们来看看背压。

原文  https://juejin.im/post/5eb67205f265da7bab3fda95
正文到此结束
Loading...