转载

RxJava2 Flowable源码浅析

看过 Rxjava2 Observable源码浅析 的你会发现其实Rxjava的实现套路都差不多,所以其实 Flowable 也差不多,只是在实现的细节上稍微有些差异而已。

背景

Flowable 的出现其实主要是为了解决在异步模型中上下游数据发送和接收的差异性而存在的。上游发送速度大于下游接收速度时就会产生数据积压导致OOM,而 Flowable 就提供了 背压(BackPressure) 策略来处理数据积压问题。

流程

从最原始的 Flowable#create 开始

//FlowableOnSubscribe就是最原始的数据源发生器
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");
    ObjectHelper.requireNonNull(mode, "mode is null");
    //将FlowableOnSubscribe转化成了FlowableCreate
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
复制代码

可以看到 create 方法也是将数据源进行了一层封装。而 subscribe 方法和 Observable#subscribe 就是差不多,最终还是调用的 Flowable#subscribeActual ,而这里就是 FlowableCreate#subscribeActual

public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;

    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        //根据不同的背压策略实现不同Emitter
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        //一般来说在Subscriber#onSubscribe,调用emitter.request指定拉取上游多少数据
        t.onSubscribe(emitter);
        try {
            //将上下游关联
            //调用Flowable#create一开始创建的FlowableOnSubscribe#subscribe
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}
复制代码

可以才看到这里核心就是根据不用背压策略实现不同的 Emitter 。一般来说在 Subscriber#onSubscribe ,调用 emitter.request 指定拉取上游多少数据,而不通过背压策略对数据下发的策略不同。

BaseEmitter

abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;

        final Subscriber<? super T> actual;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> actual) {
            this.actual = actual;
            this.serial = new SequentialDisposable();
        }

        @Override
        public void onComplete() {
            complete();
        }

        protected void complete() {
            if (isCancelled()) {
                return;
            }
            try {
                actual.onComplete();
            } finally {
                serial.dispose();
            }
        }

        @Override
        public final void onError(Throwable e) {
            //尝试下发完成缓存数据
            if (!tryOnError(e)) {
                RxJavaPlugins.onError(e);
            }
        }

        @Override
        public boolean tryOnError(Throwable e) {
            return error(e);
        }

        protected boolean error(Throwable e) {
            if (e == null) {
                e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (isCancelled()) {
                return false;
            }
            try {
                actual.onError(e);
            } finally {
                serial.dispose();
            }
            return true;
        }

        @Override
        public final void cancel() {
            serial.dispose();
            onUnsubscribed();
        }

        @Override
        public final boolean isCancelled() {
            return serial.isDisposed();
        }

        @Override
        public final void request(long n) {
            //记录请求的个数
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        void onRequested() {
            // default is no-op
        }

        @Override
        public final void setDisposable(Disposable s) {
            serial.update(s);
        }

        @Override
        public final void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public final long requested() {
            return get();
        }
        .....
    }
复制代码

这里可以看到 BaseEmitter 通过自身继承 AtomicLong 取记录请求个数,而不是通过锁或者 volatile 来提高性能。

MissingEmitter - BackpressureStrategy.MISSING

不做任何处理,由下游自行处理overflow。 MissingEmitter 实现很简单。

static final class MissingEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 3776720187248809713L;

        MissingEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            //这里可以看出,对应数据下发没有任何限制
            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //request减1
            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }

    }
复制代码

BufferAsyncEmitter - BackpressureStrategy.BUFFER

static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {

    private static final long serialVersionUID = 2427151001689639875L;

    final SpscLinkedArrayQueue<T> queue;///数据缓存列表

    Throwable error;
    volatile boolean done;//标记是否onComplete或onError

    final AtomicInteger wip;//标记调用了多少次drain

    BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
        super(actual);
        this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
        this.wip = new AtomicInteger();
    }

    @Override
    public void onNext(T t) {
        if (done || isCancelled()) {
            return;
        }

        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        queue.offer(t);///数据入队列
        drain();//检测并下发数据
    }

    @Override
    public boolean tryOnError(Throwable e) {
        if (done || isCancelled()) {
            return false;
        }

        if (e == null) {
            e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }

        error = e;
        done = true;//标记完成
        drain();//检测并下发未完成数据
        return true;
    }

    @Override
    public void onComplete() {//仅标记,若队列有数据继续下发完成
        done = true;//标记完成
        drain();//检测并下发未完成数据
    }

    @Override
    void onRequested() {//#request(long n)后调用
        drain();//检测并下发数据
    }

    @Override
    void onUnsubscribed() {
        if (wip.getAndIncrement() == 0) {
            queue.clear();
        }
    }

    void drain() {
        //类似于if(wip++ != 0)
        //所以这里多次调用#drain只有第一次调用才会通过,或者已经清空队列等待一下调用#drain
        if (wip.getAndIncrement() != 0) {
            return;
        }

        int missed = 1;
        final Subscriber<? super T> a = actual;
        final SpscLinkedArrayQueue<T> q = queue;

        for (; ; ) {
            long r = get();//数据请求数,由#request决定
            long e = 0L;

            while (e != r) {
                if (isCancelled()) {
                    q.clear();
                    return;
                }
                //是否已完成,调用onComplete/onError后会标记done==true
                boolean d = done;
                //获取队列第一条数据
                T o = q.poll();
                //用于标记队列是否为空
                boolean empty = o == null;
                //已标记完成且队列为空,调用onComplete/onError
                if (d && empty) {
                    Throwable ex = error;
                    if (ex != null) {
                        error(ex);
                    } else {
                        complete();
                    }
                    return;
                }
                //队列为空,退出获取数据循环
                if (empty) {
                    break;
                }
                //下发数据
                a.onNext(o);
                //标记已下发数据
                e++;
            }
            //数据下发量和请求数相符
            if (e == r) {
                if (isCancelled()) {
                    q.clear();
                    return;
                }
                //标记是否完成
                boolean d = done;
                //标记队列是否为空
                boolean empty = q.isEmpty();
                //队列为空且已完成,调用onComplete/onError
                if (d && empty) {
                    Throwable ex = error;
                    if (ex != null) {
                        error(ex);
                    } else {
                        complete();
                    }
                    return;
                }
            }
            //request数减去已经下发数
            if (e != 0) {
                BackpressureHelper.produced(this, e);
            }
            //已处理一次drain,wip-missed避免错过多次调用drain
            //和Observable#observeOn时的ObserveOnObserver#drainNormal处理方式一样
            missed = wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}
复制代码

这里的 #drain 下发数据方法和 Observable#observeOn -> ObserveOnObserver#drainNormal 的处理方式是有点相似的。通过本身记录 request 数和 wip 协调下发数据量及正确的下发。在调用 Subscriber#onSubscribeEmitter#onNextEmitter#onComplete 都会触发 #drain 尝试去下发缓存的数据。其中 Emitter#onNext 时先缓存数据在尝试下发,而且数据还没下发完成前调用 onCompleteonError (这里重写了 tryOnError )仅先标记完成,还要等数据完全下发才会真正调用 actual 对应方法。

其实这里我们还是可以学到一些东西的:

  • 如果可以的话,使用 Atomic 包下的类代替 volatile 和锁提高性能
  • 使用 missedwip 来协调多线程分发任务
  • 多线程中标志位的判断最好通过临时变量存储判断并多次判断

LatestAsyncEmitter - BackpressureStrategy.LATEST

BackpressureStrategy.LATEST 当数据背压时只会缓存最后一次下发的数据(通过 AtomicReference 来缓存)。具体实现原理和 BackpressureStrategy.BUFFER 较为类似就不贴代码了。

BackpressureStrategy.DROP & BackpressureStrategy.ERROR

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //是否已达请求数
            if (get() != 0) {
                actual.onNext(t);//未达请求数,下发
                BackpressureHelper.produced(this, 1);//请求数减1
            } else {
                onOverflow();//已超过请求,调用对应策略方法
            }
        }
        //
        abstract void onOverflow();
}
复制代码

BackpressureStrategy.DROP 对应的 DropAsyncEmitterBackpressureStrategy.ERROR 对应的 ErrorAsyncEmitter 都是继承于 NoOverflowBaseAsyncEmitter 。实现方式也是很简单,仅仅在 onNext 判断一下是否已经到达了请求数,未到达就下发,若到达了调用 onOverflow() 处理溢出方案。

BackpressureStrategy.DROP 的溢出方案为空实现即舍去溢出数据 BackpressureStrategy.ERROR 的溢出方案为调用 onError 即溢出时报错

总结

MISS 策略需要下游自行处理背压问题

BUFFER 策略则在还有数据未下发完成时就算上游调用 onCompleteonError 也会等待数据下发完成

LATEST 策略则当产生背压时仅会缓存最新的数据

DROP 策略为背压时抛弃背压数据

ERROR 策略是背压时抛出异常调用 onError

在学习源码时得到的一些关于多线程的领悟:

  • 如果可以的话,使用 Atomic 包下的类代替 volatile 和锁提高性能
  • 使用 missedwip 来协调多线程分发任务
  • 多线程中标志位的判断最好通过临时变量存储判断并多次判断
原文  https://juejin.im/post/5dbba10e518825225f483254
正文到此结束
Loading...