Observable
、 Flowable
、 Subject
Observer
、 Subscrption
Observable#subscribe
才开始请求上游发送数据。当下游请求 dispose()
停止通知上游停止发送。 Rxjava1
开始就有人说Rxjava可以看作流水线,上游怎么加工对于下游来说是无感知的,下游只要负责接收响应对应数据事件就行。 对于rxajva的思考,可以参考一下:Rxjava沉思录系列和 Rxjava主要负责人系列博客
一般cold Observable创建都是通过 just
、 create
、 fromXX
、 just
创建的。最简单粗暴的创建方式:
Observable.create<String> { it.onNext("") }.subscribe() 复制代码
//[仅关注点相关代码] //ObservableOnSubscribe仅一个subscribe方法 public interface ObservableOnSubscribe<T> { void subscribe(ObservableEmitter<T> e) throws Exception; } public abstract class Observable<T> implements ObservableSource<T> { public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); //RxJavaPlugins这是一个全局Hook,#onAssembly不实现默认直接返回 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } public final void subscribe(Observer<? super T> observer) { try { ........ //真正调用subscribe的实现 subscribeActual(observer); } ...... } //整个Observable唯一的抽象方法,由子Observable实现,通过这个方法将上游和下游关联起来 protected abstract void subscribeActual(Observer<? super T> observer); } 复制代码
由 Observable#create
真正返回的是 ObservableCreate
,当调用 Observable#subscribe
才真正通知上游 Observable
开始发送数据。其实质是通过 #subscribeActual
将上下游建立联系,并调用上游 #subscribe
(在 ObservableCreate
中就是 ObservableOnSubscribe#subscribe
)方法通知上游,下游已订阅可以开始发送数据。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { //source(上游)即Observable#create传入的ObservableOnSubscribe //这里就将上下游真正的联系了起来。 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } ..... } } 复制代码
所以实质就是下游通知上游,下游已产生订阅触发上游下发数据/事件,上游再通过下发数据/事件,最终下游通过指定方法响应上游下发的数据/事件。所以一开始说的流水线方式就可以理解了。
因为每次下游产生一次订阅都会通知到上游的 #subscribe
,所以如果上游只在 #subscribe
中去创建初始数据源就可以每个做到不同下游的数据不关联
Observable.create<String> { it.onNext("") }.subscribe()
流程图如下:
//Observable#map public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); //**ObservableMap将上游Observable和当前的转换Function建立联系 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } //ObservableMap.java public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { //将下游包装成MapObserver,并将MapObserver和上游建立联系 //这样上游下发时,先通过MapObserver处理才下发给真正的Observer source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { .... U v; try { //通过Function获取到map后的数据 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch .... //向下游下发数据 actual.onNext(v); } ... } } 复制代码
可以看到 map
操作符的作用就是通过将上游拦截返回 ObservableMap
提供给下游订阅,并在map上游返回数据前通过 mapper
将上游数据转化并下发给下游。
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); //emmmm,是不是点眼熟 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } //ObservableSubscribeOn public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //onSubscribe()方法执行在 订阅处所在的线程 s.onSubscribe(parent); //将上游放入scheduler中调用,且立即执行 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override //scheduler#scheduleDirect中执行完后 public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } } final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { //该方法调用已经在scheduler中调用 source.subscribe(parent); } } } 复制代码
由源码可以看出由 scheduler.scheduleDirect
-> SubscribeTask#run
-> SubscribeOnObserver#subscribe(observer)
将整个调度切换到指定线程中。
因为订阅是用下自上的,所以 subscribeOn
也总是离源最近的一个生效。因为触发源的 subscribe
是离源最近一个。
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 复制代码
可以看出Rxjava的操作符套路基本是将源 Observable
通过装饰者模式封装一层再返回新的Observable
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError;//默认false final int bufferSize;//一般128 public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //创建主线程调度器 Scheduler.Worker w = scheduler.createWorker(); //关联上下游,触发上游订阅过程 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } } 复制代码
这里可以看出 ObservableObserveOn
还是很简单的,上游订阅过程并不用关心,下游的触发则由 ObserveOnObserver
处理。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer<? super T> actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; //上游数据的缓存队列 SimpleQueue<T> queue; Disposable s; Throwable error; volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; ...... //创建对接缓存数据 queue = new SpscLinkedArrayQueue<T>(bufferSize); //回调下游onSubscribe actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) {//执行过complete/error则done为true return; } if (sourceMode != QueueDisposable.ASYNC) {//非异步数据,默认同步数据 queue.offer(t);//入队列 } schedule(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true;//标记已完成 schedule(); } @Override public void onComplete() { if (done) { return; } done = true;//标记已完成 schedule(); } @Override public void dispose() { if (!cancelled) { cancelled = true; s.dispose(); worker.dispose(); if (getAndIncrement() == 0) { queue.clear(); } } } @Override public boolean isDisposed() { return cancelled; } void schedule() { //自旋+1,!=0则表示worker.schedule已在执行无需在调度 if (getAndIncrement() == 0) { worker.schedule(this);//通过调度器处理,将数据取出下发到下游 } } @Override public void run() { if (outputFused) {//默认false drainFused(); } else { drainNormal();//取出数据下发 } } void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; for (; ; ) { //检测是否不用再处理 if (checkTerminated(done, q.isEmpty(), a)) { return; } for (; ; ) { boolean d = done; T v; try { v = q.poll();//取出一个数据 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) {//可能已经提前disposed了 return; } if (empty) {//数据为空队列无数据,退出下发循环 break; } //下发 a.onNext(v); } //可能有错过的schedule,再次循环检测 missed = addAndGet(-missed); if (missed == 0) { break; } } } //检测是否compelte/error/队列已空 boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) { if (cancelled) {//已经disposed queue.clear(); return true; } if (d) {//是否已结束 Throwable e = error; if (delayError) {//延迟error,等待队列清空 if (empty) { if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { if (e != null) { queue.clear(); a.onError(e);//下发error worker.dispose(); return true; } else if (empty) { a.onComplete(); worker.dispose(); return true; } } } return false; } } 复制代码
ObserveOnObserver
继承于 BasicIntQueueDisposable
继承于 AtomicInteger
,通过自身的原子性(自旋/CAS)来消除多线程对 #schedule
的调用。
可以看出 #observeOn
只对下游有影响。