public CompletableFuture<Void> consume(Consumer<? super T> consumer) { if (consumer == null) throw new NullPointerException(); CompletableFuture<Void> status = new CompletableFuture<>(); //绑定Publisher、Subscriber,传入ConsumerSubscriber,包装一个Consumer subscribe(new ConsumerSubscriber<T>(status, consumer)); return status; } 复制代码
public void subscribe(Subscriber<? super T> subscriber) { if (subscriber == null) throw new NullPointerException(); //max默认取Flow.defaultBufferSize()=256,,,INITIAL_CAPACITY为2的指数倍,默认32,此时Object[32] int max = maxBufferCapacity; // allocate initial array Object[] array = new Object[max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY]; //将Subscriber包装为一个BufferedSubscription BufferedSubscription<T> subscription = new BufferedSubscription<T>(subscriber, executor, onNextHandler, array, max); synchronized (this) { if (!subscribed) { //初次订阅subscribed设为true,拿到当前线程 subscribed = true; owner = Thread.currentThread(); } //下面整段for循环解读:初次订阅,clients指向上面new的BufferedSubscription,此时pred=null。后续订阅时,将上面new的BufferedSubscription放在链尾,并且会remove链表之前已经处于close状态的BufferedSubscription。之后执行subscription.onSubscribe();clients = subscription;break;退出循环。可以看到SubmissionPublisher对每一个Suscriber将其包装成一个独立的BufferedSubscription放入链表并执行其onSubscribe()方法。 for (BufferedSubscription<T> b = clients, pred = null;;) { if (b == null) { Throwable ex; subscription.onSubscribe(); if ((ex = closedException) != null) subscription.onError(ex); else if (closed) subscription.onComplete(); else if (pred == null) clients = subscription; else pred.next = subscription; break; } BufferedSubscription<T> next = b.next; if (b.isClosed()) { // remove b.next = null; // detach if (pred == null) clients = next; else pred.next = next; } else if (subscriber.equals(b.subscriber)) { b.onError(new IllegalStateException("Duplicate subscribe")); break; } else pred = b; b = next; } } } 复制代码
再来看BufferedSubscription#onSubscribe():
final void onSubscribe() { startOnSignal(RUN | ACTIVE); } //BufferedSubscription#onSubscribe()根据状态,执行BufferedSubscription#tryStart() //将BufferedSubscription包装成ConsumerTask交给ForkJoinPool线程池执行BufferedSubscription#consume()方法。 final void tryStart() { try { Executor e; ConsumerTask<T> task = new ConsumerTask<T>(this); if ((e = executor) != null) // skip if disabled on error e.execute(task); } catch (RuntimeException | Error ex) { getAndBitwiseOrCtl(ERROR | CLOSED); throw ex; } } 复制代码
再来看BufferedSubscription#consume():
final void consume() { Subscriber<? super T> s; if ((s = subscriber) != null) { // hoist checks subscribeOnOpen(s); long d = demand; //for循环,一直从BufferedSubscription的Object[] array 中获取元素消费 for (int h = head, t = tail;;) { int c, taken; boolean empty; if (((c = ctl) & ERROR) != 0) { closeOnError(s, null); break; } // else if ((taken = takeItems(s, d, h)) > 0) { head = h += taken; d = subtractDemand(taken); } else if ((d = demand) == 0L && (c & REQS) != 0) weakCasCtl(c, c & ~REQS); // exhausted demand else if (d != 0L && (c & REQS) == 0) weakCasCtl(c, c | REQS); // new demand else if (t == (t = tail)) { // stability check if ((empty = (t == h)) && (c & COMPLETE) != 0) { closeOnComplete(s); // end of stream break; } else if (empty || d == 0L) { int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN; if (weakCasCtl(c, c & ~bit) && bit == RUN) break; // un-keep-alive or exit } } } } } 复制代码
再来看BufferedSubscription#takeItems(Subscriber<? super T> s, long d, int h):
final int takeItems(Subscriber<? super T> s, long d, int h) { Object[] a; int k = 0, cap; if ((a = array) != null && (cap = a.length) > 0) { int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8) int n = (d < (long)b) ? (int)d : b; for (; k < n; ++h, ++k) { //从array中获取索引为h的元素x Object x = QA.getAndSet(a, h & m, null); if (waiting != 0) signalWaiter(); if (x == null) break; //ConsumerSubscriber消费元素x else if (!consumeNext(s, x)) break; } } return k; } final boolean consumeNext(Subscriber<? super T> s, Object x) { try { @SuppressWarnings("unchecked") T y = (T) x; if (s != null) s.onNext(y); return true; } catch (Throwable ex) { handleOnNext(s, ex); return false; } } //调用consumer.accept(item)消费元素 public final void onNext(T item) { try { consumer.accept(item); } catch (Throwable ex) { subscription.cancel(); status.completeExceptionally(ex); } } 复制代码
接着来看往BufferedSubscription维护的数组Object[]中放入元素的submit(T item)方法:
public int submit(T item) { return doOffer(item, Long.MAX_VALUE, null); } private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) { if (item == null) throw new NullPointerException(); int lag = 0; boolean complete, unowned; synchronized (this) { Thread t = Thread.currentThread(), o; BufferedSubscription<T> b = clients; if ((unowned = ((o = owner) != t)) && o != null) owner = null; // disable bias if (b == null) complete = closed; else { complete = false; boolean cleanMe = false; BufferedSubscription<T> retries = null, rtail = null, next; do { next = b.next; // int stat = b.offer(item, unowned); if (stat == 0) { // saturated; add to retry list b.nextRetry = null; // avoid garbage on exceptions if (rtail == null) retries = b; else rtail.nextRetry = b; rtail = b; } else if (stat < 0) // closed cleanMe = true; // remove later else if (stat > lag) lag = stat; } while ((b = next) != null); //重试 if (retries != null || cleanMe) lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); } } if (complete) throw new IllegalStateException("Closed"); else return lag; } final int offer(T item, boolean unowned) { Object[] a; int stat = 0, cap = ((a = array) == null) ? 0 : a.length; int t = tail, i = t & (cap - 1), n = t + 1 - head; if (cap > 0) { boolean added; if (n >= cap && cap < maxCapacity) // resize //将item放入Object[] a added = growAndOffer(item, a, t); else if (n >= cap || unowned) // need volatile CAS added = QA.compareAndSet(a, i, null, item); else { // can use release mode QA.setRelease(a, i, item); added = true; } if (added) { tail = t + 1; stat = n; } } return startOnOffer(stat); } 复制代码
总结:JDK9提供的响应式api SubmissionPublisher内部维护了ConsumerSubscriber、BufferedSubscription。通过submit方法产生元素并缓存在每个订阅Subscriber独立的BufferedSubscription中的缓冲buffer。通过consume(Consumer consumer)方法,将consumer包装成内部的ConsumerSubscriber,并调用subscribe(),通过创建独立的BufferedSubscription将Publisher、Subscriber建立关系消费元素。如何实现的呢?BufferedSubscription通过调用onSubscribe()将消费元素的动作(从BufferedSubscription中的缓冲buffer中获取元素消费)包装成SubmissionPublisher内部的ConsumerTask对象,并交给ForkJoinPool线程池(也可以自指定)执行。