看过 Rxjava2 Observable源码浅析
的你会发现其实Rxjava的实现套路都差不多,所以其实 Flowable
也差不多,只是在实现的细节上稍微有些差异而已。
Flowable
的出现其实主要是为了解决在异步模型中上下游数据发送和接收的差异性而存在的。上游发送速度大于下游接收速度时就会产生数据积压导致OOM,而 Flowable
就提供了 背压(BackPressure)
策略来处理数据积压问题。
从最原始的 Flowable#create
开始
//FlowableOnSubscribe就是最原始的数据源发生器 public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) { ObjectHelper.requireNonNull(source, "source is null"); ObjectHelper.requireNonNull(mode, "mode is null"); //将FlowableOnSubscribe转化成了FlowableCreate return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode)); } 复制代码
可以看到 create
方法也是将数据源进行了一层封装。而 subscribe
方法和 Observable#subscribe
就是差不多,最终还是调用的 Flowable#subscribeActual
,而这里就是 FlowableCreate#subscribeActual
public final class FlowableCreate<T> extends Flowable<T> { final FlowableOnSubscribe<T> source; final BackpressureStrategy backpressure; public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) { this.source = source; this.backpressure = backpressure; } @Override public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; //根据不同的背压策略实现不同Emitter switch (backpressure) { case MISSING: { emitter = new MissingEmitter<T>(t); break; } case ERROR: { emitter = new ErrorAsyncEmitter<T>(t); break; } case DROP: { emitter = new DropAsyncEmitter<T>(t); break; } case LATEST: { emitter = new LatestAsyncEmitter<T>(t); break; } default: { emitter = new BufferAsyncEmitter<T>(t, bufferSize()); break; } } //一般来说在Subscriber#onSubscribe,调用emitter.request指定拉取上游多少数据 t.onSubscribe(emitter); try { //将上下游关联 //调用Flowable#create一开始创建的FlowableOnSubscribe#subscribe source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); } } } 复制代码
可以才看到这里核心就是根据不用背压策略实现不同的 Emitter
。一般来说在 Subscriber#onSubscribe
,调用 emitter.request
指定拉取上游多少数据,而不通过背压策略对数据下发的策略不同。
abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription { private static final long serialVersionUID = 7326289992464377023L; final Subscriber<? super T> actual; final SequentialDisposable serial; BaseEmitter(Subscriber<? super T> actual) { this.actual = actual; this.serial = new SequentialDisposable(); } @Override public void onComplete() { complete(); } protected void complete() { if (isCancelled()) { return; } try { actual.onComplete(); } finally { serial.dispose(); } } @Override public final void onError(Throwable e) { //尝试下发完成缓存数据 if (!tryOnError(e)) { RxJavaPlugins.onError(e); } } @Override public boolean tryOnError(Throwable e) { return error(e); } protected boolean error(Throwable e) { if (e == null) { e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (isCancelled()) { return false; } try { actual.onError(e); } finally { serial.dispose(); } return true; } @Override public final void cancel() { serial.dispose(); onUnsubscribed(); } @Override public final boolean isCancelled() { return serial.isDisposed(); } @Override public final void request(long n) { //记录请求的个数 if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); onRequested(); } } void onRequested() { // default is no-op } @Override public final void setDisposable(Disposable s) { serial.update(s); } @Override public final void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public final long requested() { return get(); } ..... } 复制代码
这里可以看到 BaseEmitter
通过自身继承 AtomicLong
取记录请求个数,而不是通过锁或者 volatile
来提高性能。
不做任何处理,由下游自行处理overflow。 MissingEmitter
实现很简单。
static final class MissingEmitter<T> extends BaseEmitter<T> { private static final long serialVersionUID = 3776720187248809713L; MissingEmitter(Subscriber<? super T> actual) { super(actual); } @Override public void onNext(T t) { if (isCancelled()) { return; } //这里可以看出,对应数据下发没有任何限制 if (t != null) { actual.onNext(t); } else { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //request减1 for (;;) { long r = get(); if (r == 0L || compareAndSet(r, r - 1)) { return; } } } } 复制代码
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> { private static final long serialVersionUID = 2427151001689639875L; final SpscLinkedArrayQueue<T> queue;///数据缓存列表 Throwable error; volatile boolean done;//标记是否onComplete或onError final AtomicInteger wip;//标记调用了多少次drain BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) { super(actual); this.queue = new SpscLinkedArrayQueue<T>(capacityHint); this.wip = new AtomicInteger(); } @Override public void onNext(T t) { if (done || isCancelled()) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } queue.offer(t);///数据入队列 drain();//检测并下发数据 } @Override public boolean tryOnError(Throwable e) { if (done || isCancelled()) { return false; } if (e == null) { e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } error = e; done = true;//标记完成 drain();//检测并下发未完成数据 return true; } @Override public void onComplete() {//仅标记,若队列有数据继续下发完成 done = true;//标记完成 drain();//检测并下发未完成数据 } @Override void onRequested() {//#request(long n)后调用 drain();//检测并下发数据 } @Override void onUnsubscribed() { if (wip.getAndIncrement() == 0) { queue.clear(); } } void drain() { //类似于if(wip++ != 0) //所以这里多次调用#drain只有第一次调用才会通过,或者已经清空队列等待一下调用#drain if (wip.getAndIncrement() != 0) { return; } int missed = 1; final Subscriber<? super T> a = actual; final SpscLinkedArrayQueue<T> q = queue; for (; ; ) { long r = get();//数据请求数,由#request决定 long e = 0L; while (e != r) { if (isCancelled()) { q.clear(); return; } //是否已完成,调用onComplete/onError后会标记done==true boolean d = done; //获取队列第一条数据 T o = q.poll(); //用于标记队列是否为空 boolean empty = o == null; //已标记完成且队列为空,调用onComplete/onError if (d && empty) { Throwable ex = error; if (ex != null) { error(ex); } else { complete(); } return; } //队列为空,退出获取数据循环 if (empty) { break; } //下发数据 a.onNext(o); //标记已下发数据 e++; } //数据下发量和请求数相符 if (e == r) { if (isCancelled()) { q.clear(); return; } //标记是否完成 boolean d = done; //标记队列是否为空 boolean empty = q.isEmpty(); //队列为空且已完成,调用onComplete/onError if (d && empty) { Throwable ex = error; if (ex != null) { error(ex); } else { complete(); } return; } } //request数减去已经下发数 if (e != 0) { BackpressureHelper.produced(this, e); } //已处理一次drain,wip-missed避免错过多次调用drain //和Observable#observeOn时的ObserveOnObserver#drainNormal处理方式一样 missed = wip.addAndGet(-missed); if (missed == 0) { break; } } } } 复制代码
这里的 #drain
下发数据方法和 Observable#observeOn
-> ObserveOnObserver#drainNormal
的处理方式是有点相似的。通过本身记录 request
数和 wip
协调下发数据量及正确的下发。在调用 Subscriber#onSubscribe
、 Emitter#onNext
、 Emitter#onComplete
都会触发 #drain
尝试去下发缓存的数据。其中 Emitter#onNext
时先缓存数据在尝试下发,而且数据还没下发完成前调用 onComplete
和 onError
(这里重写了 tryOnError
)仅先标记完成,还要等数据完全下发才会真正调用 actual
对应方法。
其实这里我们还是可以学到一些东西的:
Atomic
包下的类代替 volatile
和锁提高性能 missed
和 wip
来协调多线程分发任务
BackpressureStrategy.LATEST
当数据背压时只会缓存最后一次下发的数据(通过 AtomicReference
来缓存)。具体实现原理和 BackpressureStrategy.BUFFER
较为类似就不贴代码了。
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> { private static final long serialVersionUID = 4127754106204442833L; NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) { super(actual); } @Override public final void onNext(T t) { if (isCancelled()) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //是否已达请求数 if (get() != 0) { actual.onNext(t);//未达请求数,下发 BackpressureHelper.produced(this, 1);//请求数减1 } else { onOverflow();//已超过请求,调用对应策略方法 } } // abstract void onOverflow(); } 复制代码
BackpressureStrategy.DROP
对应的 DropAsyncEmitter
和 BackpressureStrategy.ERROR
对应的 ErrorAsyncEmitter
都是继承于 NoOverflowBaseAsyncEmitter
。实现方式也是很简单,仅仅在 onNext
判断一下是否已经到达了请求数,未到达就下发,若到达了调用 onOverflow()
处理溢出方案。
BackpressureStrategy.DROP
的溢出方案为空实现即舍去溢出数据 BackpressureStrategy.ERROR
的溢出方案为调用 onError
即溢出时报错
MISS
策略需要下游自行处理背压问题
BUFFER
策略则在还有数据未下发完成时就算上游调用 onComplete
或 onError
也会等待数据下发完成
LATEST
策略则当产生背压时仅会缓存最新的数据
DROP
策略为背压时抛弃背压数据
ERROR
策略是背压时抛出异常调用 onError
在学习源码时得到的一些关于多线程的领悟:
Atomic
包下的类代替 volatile
和锁提高性能 missed
和 wip
来协调多线程分发任务