转载

从源码查看RxJava中的map和flatMap的用法与区别

RxJava中提供了大量的操作符,这大大提高了了我们的开发效率。其中最基本的两个变换操作符就是 mapflatMap 。而其他变换操作符的原理基本与 map 类似。

Observable

作用

map 对Observable发射的每一项数据应用一个函数,执行变换操作。对原始的Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。

从源码查看RxJava中的map和flatMap的用法与区别
flatMap

将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射

从源码查看RxJava中的map和flatMap的用法与区别

使用方法:

通过代码来看一下两者的使用用方法:

map

Observable.just(new User("白瑞德"))
                  .map(new Function<User, String>() {

                      @Override
                      public String apply(User user) throws Throwable {

                          return user.getName();
                      }
                  })
                  .subscribe(new Consumer<String>() {

                      @Override
                      public void accept(String s) throws Throwable {

                          System.out.println(s);
                      }
                  });
<<<白瑞德
复制代码

这段代码接受一个User对象,最后打印出User中的name。

flatMap

Observable.just(1,2,3,4,5)
                  .flatMap(new Function<Integer, ObservableSource<String>>() {

                      @Override
                      public ObservableSource<String> apply(Integer integer) throws Throwable {

                          return Observable.just(String.valueOf(integer*2));
                      }
                  })
                  .subscribe(new Consumer<String>() {

                      @Override
                      public void accept(String s) throws Throwable {
                          System.out.print(s+" ");
                      }
                  });
>>>2 4 6 8 10
复制代码

这段代码接受一个Int类型数组,并将其中的数字翻倍后转换为String。

源码分析

下面我们就结合源码来分析一下这两个操作符。为了降低代码阅读难道,这里只保留核心代码:

map

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    //接受一个Function实例,并返回一个ObservableMap
    return new ObservableMap<T, R>(this, mapper);
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //调用用父类构造方法,初始化父类中的downstream
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            v = mapper.apply(t);
            downstream.onNext(v);
        }
    }
}
复制代码

这段代码是去掉map源码中一些校验和其它相关回调后的精简代码。接下来分析一下代码流程:

  • 当在调用 map 时,map接受一个匿名内部类 Function 的实例,并返回一个 ObservableMap 对象。
  • ObservableMap 本质上是一个 Observable ,也是一个被观察者,其构造方法接受最外层的那个被 Observable 实例,和 Function 实例。 ObservableMap 重写了 subscribeActual 方法,在 subscribeActual 中使用新建了一个 MapObserver 实现了对原始 Observable 的观察。
  • 原始的 Observable 中的数据变会被发送到 MapObserver 的实例中。
  • MapObserver 构造方法接收原始 Observable 的观察者 actual ,和 Function 的实例 mapper
  • MapObserver 在其 onNext 方法中调用 mapperapply 方法,获得该方法的返回值v apply方法就是map实例中: public String apply(User user) throws Throwable { return user.getName(); }
  • 调用 downstream 的onNext方法,并传入实参v。其中 downstreamMapObserver 父类中定义的变量,在 MapObserver 构造方法中 super(actual); 时初始化,其本身就是传入的 actual ,本质上就是最原始的 Observable

整个流程可以概括如下: 存在一个原始的 ObservableA 和一个观察者 ObserverA ,当原始的被观察者 ObservableA 调用 map ,并传入一个匿名内部类实例化的’function‘, map 新建并返回了一个被观察者 ObservableB ,通过 subscribe 让观察者 ObserverA 对其进行订阅。并重写 subscribeActual 方法,在其被订阅时创建一个新的观察者 ObserverB 其接受的,并用 ObserverB 对原始的 ObservableA 进行订阅观察。当原始的 ObservableA 发出事件,调用 ObserverBonNext 方法, subscribeActual 接受的观察者便是最原始的观察者 ObserverAObserverB 变执行通过匿名内部类实例化的’function‘的 apply 方法得到数据 v ,紧接着调用原始的 ObservableAonNext 方法,并传入实参 vObserverA 观察到事件。 一句话概括:一个原始的被观察者和观察者,但是让原始的观察者去订阅一个新的观察者,当新的被观察者被订阅的时候,创建一个新的观察者去订阅原始的被观察者,并在监听的事件之后执行指定的操作后再通知原始观察者。

flatMap

faltMapmap 的基本原理类似,其代码如下:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}


public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

    static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {

        MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
            ...   
            this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
        }

        @Override
        public void onSubscribe(Disposable d) {
            downstream.onSubscribe(this);
        }

        @Override
        public void onNext(T t) {
            
            ObservableSource<? extends U> p;
            p = mapper.apply(t);    
            subscribeInner(p);
        }

        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
           
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                p.subscribe(inner);
        }

        void drain() {    
            drainLoop();
        }
        void drainLoop() {
            final Observer<? super U> child = this.downstream;
            child.onNext(o);
        }
    }

    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;

        volatile boolean done;
        volatile SimpleQueue<U> queue;

        int fusionMode;

        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }

        @Override
        public void onNext(U t) {
            parent.drain();
        }
        
    }
}

复制代码

上述代码即是 faltMap 精简后的源码,其中大部分代码的运作流程和前文中的 map 源码一致,我们继续延续上文中讲解中的观察者和被观察者。重点关注其不同的地方: faltMap 返回一个新的被观察者 ObservableB ,重写 ObservableBsubscribeActual 方法在原始的观察者 ObserverA 对其进行订阅时新建一个观察者 ObserverB 对原始的 ObservableA 进行订阅。新的观察者 ObserverB 持有原始的 ObserverAfaltMap 接收的匿名对象实例 function 。当 ObserverB 监听到原始的被观察者 ObservableA 的事件时, ObserverB 调用 functionapply 方法获得新新的被观察者 ObservableC ,再创建一个新的观察者 ObserverCObservableC 进行订阅, ObserverC 持有原始的观察者 ObserverA ,在 ObserverC 观察到被观察者 ObservableC 的时间时,调用原始的观察者 ObserverA 的方法。

结语

至此,map和flatMap已基本分析完毕,其中map的代码比较简单易懂,flatMap中还涉及到大量辅助操作,阅读起来有些困难。如果仅仅是为了了解二者的原理,可以阅读 Single<T> 中的代码。其中的代码量远远少于 Observable 中的代码量。如果对RxJava基本的模式还不了解,可以阅读大神博客 手写极简版的Rxjava

原文  https://juejin.im/post/5d5517526fb9a06af13d63a2
正文到此结束
Loading...