本文作为《Java编程方法论:响应式Rxjava与代码设计实战》一书第二章 Rxjava中的Subject一节的补充解读。
首先来看一个Demo:
@Test void replay_PublishSubject_test() { PublishSubject<Object> publishSubject = PublishSubject.create(); ConnectableObservable<Object> replay = publishSubject.replay(); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); List<Integer> integers = new ArrayList<>(); for (int i=1;i<10;i++){ integers.add(i); } Disposable subscribe1 = replay.subscribe(x -> { log("一郎神: " + x); }, Throwable::printStackTrace, () -> System.out.println("Emission completed")); Disposable subscribe2 = replay.subscribe(x -> { log("二郎神: " + x); }, Throwable::printStackTrace, () -> System.out.println("Emission completed")); Disposable subscribe3 = replay.subscribe(x -> { log("三郎神: " + x); }, Throwable::printStackTrace, () -> System.out.println("Emission completed")); AtomicInteger atomicInteger = new AtomicInteger(integers.size()); try { forkJoinPool.submit(() -> { integers.forEach(id -> { sleep(1,TimeUnit.SECONDS); publishSubject.onNext(id); if (atomicInteger.decrementAndGet() == 0) { publishSubject.onComplete(); } }); }); replay.connect(); sleep(2,TimeUnit.SECONDS); subscribe1.dispose(); sleep(1,TimeUnit.SECONDS); //replay.connect(consumer -> consumer.dispose()); publishSubject.onComplete(); System.out.println("test"); } finally { try { forkJoinPool.shutdown(); int shutdownDelaySec = 2; System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… "); forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS); } catch (Exception ex) { System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName()); } finally { System.out.println("调用 forkJoinPool.shutdownNow()结束服务..."); List<Runnable> l = forkJoinPool.shutdownNow(); System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 "); } } } 复制代码
得到的结果如下所示:
ForkJoinPool.commonPool-worker-3: 一郎神: 1 ForkJoinPool.commonPool-worker-3: 二郎神: 1 ForkJoinPool.commonPool-worker-3: 三郎神: 1 ForkJoinPool.commonPool-worker-3: 二郎神: 2 ForkJoinPool.commonPool-worker-3: 三郎神: 2 Emission completed Emission completed test ………………等待 2 秒后结束服务……………… 调用 forkJoinPool.shutdownNow()结束服务... 还剩 0 个任务等待被执行,服务已关闭 复制代码
在调用 subscribe1.dispose()
的时候,完成了订阅者自行解除订阅关系的约定,而假如后面调用的是 replay.connect(consumer -> consumer.dispose())
,依然会在发送元素的过程中强行中断,不带任何通知。而在使用 publishSubject.onComplete()
后,则可以很优雅地通知后续订阅者优雅地结束。 如图 2-3
所示,我们按照图中文字操作,并在 System.out.println("test")
这行打断点查看状态,发现其他2个订阅者并没有被移除,为什么会出现这种情况?
通过 publishSubject.replay()
,我们得到了一个 ConnectableObservable
对象,具体如下:
//io.reactivex.Observable#replay public final ConnectableObservable<T> replay() { return ObservableReplay.createFrom(this); } 复制代码
结合前面 ConnectableObservable
相关知识的学习,在调用 replay.subscribe(...)
时,会将下游的订阅者与 DEFAULT_UNBOUNDED_FACTORY
所得到的 UnboundedReplayBuffer
对象通过一个 ReplayObserver
对象建立起联系:
//ObservableReplay#createFrom public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) { return create(source, DEFAULT_UNBOUNDED_FACTORY); } //ObservableReplay#create static <T> ConnectableObservable<T> create(ObservableSource<T> source, final BufferSupplier<T> bufferFactory) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<ReplayObserver<T>>(); //注意此处 ObservableSource<T> onSubscribe = new ReplaySource<T>(curr, bufferFactory); //此处这个curr会作为ObservableReplay下current字段的值,记住,它是个引用类型对象 return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory)); } //ObservableReplay#subscribeActual protected void subscribeActual(Observer<? super T> observer) { onSubscribe.subscribe(observer); } //ObservableReplay.ReplaySource#subscribe public void subscribe(Observer<? super T> child) { for (;;) { ReplayObserver<T> r = curr.get(); if (r == null) { ReplayBuffer<T> buf = bufferFactory.call(); ReplayObserver<T> u = new ReplayObserver<T>(buf); //此时ObservableReplay中current字段的值所指对象也会发生改变 if (!curr.compareAndSet(null, u)) { continue; } r = u; } InnerDisposable<T> inner = new InnerDisposable<T>(r, child); child.onSubscribe(inner); //通过ReplayObserver的observers字段将下游订阅者管理起来 r.add(inner); if (inner.isDisposed()) { r.remove(inner); return; } //此处UnboundedReplayBuffer对象与下游订阅者建立联系 r.buffer.replay(inner); break; } } } 复制代码
当调用 replay.connect(consumer -> consumer.dispose())
时,通过 current
获取上面得到的 ReplayObserver
对象,并调用该对象的 dispose()
方法(由 replay.connect(...)
中传入的 Consumer
实现可得),此时会将 ObservableReplay
中的 observers
字段设定为 TERMINATED
,同时将 ObservableReplay
自身身为 AtomicReference
角色所存储值设定为 DISPOSED
,即将 ObservableReplay
中 current
的值设定为了 DISPOSED
。
//ObservableReplay#connect public void connect(Consumer<? super Disposable> connection) { boolean doConnect; ReplayObserver<T> ps; for (;;) { ps = current.get(); if (ps == null || ps.isDisposed()) { ReplayBuffer<T> buf = bufferFactory.call(); ReplayObserver<T> u = new ReplayObserver<T>(buf); if (!current.compareAndSet(ps, u)) { continue; } ps = u; } doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); break; } try { connection.accept(ps); } catch (Throwable ex) { if (doConnect) { ps.shouldConnect.compareAndSet(true, false); } Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } if (doConnect) { source.subscribe(ps); } } //ObservableReplay.ReplayObserver#dispose public void dispose() { observers.set(TERMINATED); DisposableHelper.dispose(this); } //DisposableHelper#dispose public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } 复制代码
可以看到, ReplayObserver
只是解除了与下游订阅者的关系,但并没有进一步对下游订阅者进行结束的操作,这样与 UnboundedReplayBuffer
对象建立联系的订阅者,如果buffer中的元素还未消费完毕,会持续消费直至所存元素下发完毕,但要注意的是,该buffer中并未存放结束事件(即通过调用 UnboundedReplayBuffer#complete
往该队列中存放 NotificationLite.complete()
元素)。同时下游订阅者也并未调用 dispose()
方法,所以下面所示源码中的 output.isDisposed()
结果为 false
。请注意下面所示源码中 <1>
处的代码:
public void replay(InnerDisposable<T> output) { if (output.getAndIncrement() != 0) { return; } final Observer<? super T> child = output.child; int missed = 1; for (;;) { if (output.isDisposed()) { return; } int sourceIndex = size; Integer destinationIndexObject = output.index(); int destinationIndex = destinationIndexObject != null ? destinationIndexObject : 0; while (destinationIndex < sourceIndex) { Object o = get(destinationIndex); //此处很关键 if (NotificationLite.accept(o, child)) {//<1> return; } if (output.isDisposed()) { return; } destinationIndex++; } output.index = destinationIndex; missed = output.addAndGet(-missed); if (missed == 0) { break; } } } } //io.reactivex.internal.util.NotificationLite#accept public static <T> boolean accept(Object o, Observer<? super T> s) { if (o == COMPLETE) { s.onComplete(); return true; } else if (o instanceof ErrorNotification) { s.onError(((ErrorNotification)o).e); return true; } s.onNext((T)o); return false; } 复制代码
如果调用了 UnboundedReplayBuffer#complete
,那么在元素下发到最后时,就会出现 o == COMPLETE
为 true
,此时会调用下游订阅者的 onComplete()
方法。
//ObservableReplay.UnboundedReplayBuffer#complete public void onComplete() { if (!done) { done = true; buffer.complete(); replayFinal(); } } //ObservableReplay.UnboundedReplayBuffer#complete public void complete() { add(NotificationLite.complete()); size++; } //io.reactivex.internal.util.NotificationLite#complete public static Object complete() { return COMPLETE; } 复制代码
至此,关于 replay_PublishSubject_test()
示例中所展现的疑点已经解读完毕。