对Observable发射的 每一项数据应用一个函 数,执行变换操作,如下图。
map操作符对原始Observable发射的的 每一项数据应用一个你选择的函数 ,然后返回一个发射这些结果的Observable。
RxJava将 这个操作符实现为map函数 ,这个操作符默认 不再任何特定的调度器 上执行。
public void mapTest(){ Observable.just("HELLO") .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { return s.toLowerCase(); } }) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { return s + " world"; } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); } 复制代码
输出结果:
hello world 复制代码
/** * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and * emits the results of these function applications. * * @param <R> the output type * @param mapper * a function to apply to each item emitted by the ObservableSource * @return an Observable that emits the items from the source ObservableSource, transformed by the specified * function */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } 复制代码
将自定义的Function对象赋值给成员变量:function. 该类中同样有个重写的subscribeActual()方法, 在
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } 复制代码
后续调用subscribe()时,其会先调用Observable类中的subscribe()方法
/** * Subscribes to an ObservableSource and provides a callback to handle the items it emits. * * @param onNext * the {@code Consumer<T>} you have designed to accept emissions from the ObservableSource * @return a {@link Disposable} reference with which the caller can stop receiving items before * the ObservableSource has finished sending them * @throws NullPointerException * if {@code onNext} is null */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } 复制代码
subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer())
该方法会调用Observable同类中的重载的方法:subscribe()
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); // 核心方法 subscribe(ls); return ls; } 复制代码
该方法又会调用同类中重载的subscribe()方法
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 核心方法,继承Observable的类都须该方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码
此时就会调用 ObservableMap 类中的 subscribeActual() 方法实现,
该方法又会调用上述方法,不过此时subscribeActual()方法是ObservableJust类中的方法
@Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } 复制代码
该方法就会使用s.onSubscribe()来BasicFuseableObserver类中的OnSubscribe()方法
// final: fixed protocol steps to support fuseable and non-fuseable upstream @SuppressWarnings("unchecked") @Override public final void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { this.qs = (QueueDisposable<T>)s; } if (beforeDownstream()) { //对应的代码 actual.onSubscribe(this); afterDownstream(); } } } 复制代码
进过多次的调用,最终就会调用我们再主方法中重写的accept()方法,输出对应的结果。
flatMap将一个发射数据的Observable变换为多个Observable,然后将它们发射的数据 合并放进一个单独的Observable ,如图2:
flapMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatMap合并这些Observables发射的数据,最后将合并后的结果当作它自己的数据序列发射。
数据类
public class User { public String userName; public List<Address> addresses; public static class Address { public String street; public String city; } } 复制代码
public void flatMapTest() { User user = new User(); user.userName = "tony"; user.addresses = new ArrayList<>(); User.Address address1 = new User.Address(); address1.street = "ren ming road"; address1.city = "Su zhou"; user.addresses.add(address1); User.Address address2 = new User.Address(); address2.street = "dong wu bei road"; address2.city = "Su zhou"; user.addresses.add(address2); Observable.just(user) .flatMap(new Function<User, ObservableSource<User.Address>>() { @Override public ObservableSource<User.Address> apply(User user) throws Exception { return Observable.fromIterable(user.addresses); } }) .subscribe(new Consumer<User.Address>() { @Override public void accept(User.Address address) throws Exception { System.out.println(address.street); } }); } 复制代码
输出结果:
ren ming road dong wu bei road 复制代码
flatMap()底层会调用有多个重载的方法,最终会调用如下方法:
/** * Returns an Observable that emits items based on applying a function that you supply to each item emitted * by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting * ObservableSources and emitting the results of this merger, while limiting the maximum number of concurrent * subscriptions to these ObservableSources. * * @param <R> the value type of the inner ObservableSources and the output type * @param mapper * a function that, when applied to an item emitted by the source ObservableSource, returns an * ObservableSource * @param maxConcurrency * the maximum number of ObservableSources that may be subscribed to concurrently * @param delayErrors * if true, exceptions from the current Observable and all inner ObservableSources are delayed until all of them terminate * if false, the first one signalling an exception will terminate the whole sequence immediately * @param bufferSize * the number of elements to prefetch from each inner ObservableSource * @return an Observable that emits the result of applying the transformation function to each item emitted * by the source ObservableSource and merging the results of the ObservableSources obtained from this * transformation */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable<T>)this).call(); if (v == null) { return empty(); } return ObservableScalarXMap.scalarXMap(v, mapper); } return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize)); } 复制代码
而后根据this是否为ScalarCallable类,返回不同的对象:
本示例是该示例,进而调用 ObservableScalarXMap.scalarXMap(v, mapper);
/** * Maps a scalar value into an Observable and emits its values. * * @param <T> the scalar value type * @param <U> the output value type * @param value the scalar value to map * @param mapper the function that gets the scalar value and should return * an ObservableSource that gets streamed * @return the new Observable instance */ public static <T, U> Observable<U> scalarXMap(T value, Function<? super T, ? extends ObservableSource<? extends U>> mapper) { return RxJavaPlugins.onAssembly(new ScalarXMapObservable<T, U>(value, mapper)); } 复制代码
进而创建ScalarXMapObservable对象。
该方法也有多个重载方法,最终会调用 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
并执行其中的**subscribe(ls); 方法 进而再调用 subscribe(Observer<? super T> observer)
并执行其中的 subscribeActual(observer);**方法
进而调用 ObservableScalarXMap
的**subscribeActual(Observer<? super R> s)**方法
@SuppressWarnings("unchecked") @Override public void subscribeActual(Observer<? super R> s) { ObservableSource<? extends R> other; try { // 核心代码 // 这儿调用在用户自定义的flatMap方法中自定义apply()方法 other = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource"); } catch (Throwable e) { EmptyDisposable.error(e, s); return; } // 此处是判断other的类型,进而执行不同的方法 if (other instanceof Callable) { R u; try { u = ((Callable<R>)other).call(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); EmptyDisposable.error(ex, s); return; } if (u == null) { EmptyDisposable.complete(s); return; } ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u); s.onSubscribe(sd); sd.run(); } else { // 因为other是Observable类型,因而执行这步 other.subscribe(s); } } } 复制代码
在subscribeActual()调用 mapper.apply(value)
执行 用户自定义的flatMap方法中的apply()方法 。
此时又会调用Observable类的subscribe()方法。
注意此处的subscribe()和上面已经调用的Observable不是同一个对象!!!
再次调用该方法中的subscribeActual()方法,进而调用 ObservableFromIterable
类中的**subscribeActual()**方法:
@Override public void subscribeActual(Observer<? super T> s) { Iterator<? extends T> it; try { // 此处对source的数据进行依次发送 it = source.iterator(); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, s); return; } boolean hasNext; try { // 判断是否it ------------------------------------------ hasNext = it.hasNext(); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, s); return; } if (!hasNext) { EmptyDisposable.complete(s); return; } FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it); s.onSubscribe(d); if (!d.fusionMode) { d.run(); } } 复制代码