在JDK8的阻塞队列实现中还有两个未进行说明,今天继续对其中的一个阻塞队列LinkedTransferQueue进行源码分析,如果之前的队列分析已经让你对阻塞队列有了一定的了解,相信本文要讲解的LinkedTransferQueue的源码也能很快被理解,接下来一起学习吧
JDK版本号:1.8.0_171
LinkedTransferQueue是基于链表的FIFO无界阻塞队列,在源码分析前,需要提前对源码实现整体有个印象,便于细节的理解。注释部分对于这个类进行了一些说明和介绍,如果有能力的话可以阅读理解,对于其中的部分这里进行简单说明:
LinkedTransferQueue使用了松弛型双重队列,双重的意思可以理解为两种类型的节点(请求数据的消费者和生产数据的生产者),也就是说队列中保存了这两种类型的节点,理解上要稍微复杂些,其实之前SynchronousQueue中就使用了类似的队列,队列维护了两个指针:head指向第一个匹配节点(M)(如果为空则为空);tail 指向队列中的最后一个节点(如果为空则为空)
在双重队列中为了减少CAS的开销,加入了Slack(松弛度)的处理方式,在节点被匹配(被删除)之后,不会立即更新 head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个阈值之后才会更新,在LinkedTransferQueue中松弛度值设置为2,这是一个经验值,不多深究。同时为了避免匹配节点在队列中的堆积,在CAS更新head时,会把已匹配的head的next引用指向自己。当我们进行遍历时,遇到这种节点,表示当前线程已经落后于其他线程,需要重新获取head来进行遍历
其与其他阻塞队列不同之处在于,LinkedTransferQueue允许消费者线程获取元素时,如果未请求到数据,则可以生成一个数据节点(节点item为null)入队,然后消费者线程在这个节点线程上等待,直到之后生产者线程入队时发现有一个item为null的数据节点,生产者线程就不再进行入队操作了,直接就将元素填充到该节点的item,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用处返回,生产者同样直接返回
实现TransferQueue接口中的方法是LinkedTransferQueue操作的核心部分
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable
先了解其中的每个方法的含义,便于下面源码实现的理解
/** * 如果机器为多处理器则为true,MP为multiprocessor缩写 */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 节点自旋等待的次数 128 */ private static final int FRONT_SPINS = 1 << 7; /** * 当前驱节点在处理,当前节点自旋等待的次数 64 */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * sweepVotes的阈值,达到这个阈值上限则进行一次清理操作 */ static final int SWEEP_THRESHOLD = 32; /** 队列头节点 */ transient volatile Node head; /** 队列尾节点*/ private transient volatile Node tail; /** 解除删除节点关联失败的次数 */ private transient volatile int sweepVotes; /* * xfer方法how参数可能的取值类型 * 队列操作统一方法根据类型进行不同的处理 */ /** poll和tryTransfer使用 */ private static final int NOW = 0; // for untimed poll, tryTransfer /** offer, put, add方法使用 */ private static final int ASYNC = 1; // for offer, put, add /** transfer, take方法使用 */ private static final int SYNC = 2; // for transfer, take /** 超时等待的poll和tryTransfer使用 */ private static final int TIMED = 3; // for timed poll, tryTransfer
上述参数中对xfer参数类型进行详细说明:
LinkedTransferQueue中链表的节点实现Node与SynchronousQueue中的实现类似,需要注意的是当节点已经匹配或被取消时我们必然需要将节点离队,通过forgetNext和forgetContents来将节点排除队列匹配操作
/** * 队列Node实现 * CAS更新Node成员变量 */ static final class Node { // 数据节点和请求节点类型区分标识 final boolean isData; // false if this is a request node // 数据节点保存数据,请求节点为null volatile Object item; // initially non-null if isData; CASed to match // 指向队列中下一个节点 volatile Node next; // 当前节点对应的等待线程 volatile Thread waiter; // null until waiting // CAS methods for fields final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * 构造方法需传入的参数 */ Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } /** * 将next指向自己,避免无用Node过长影响垃圾回收 * 在cas更新head后调用 */ final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } /** * 匹配或者节点被取消的时候被调用,设置item指向自己,waiter为null */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 如果此节点已匹配或者是被取消匹配的节点,则返回true * x == this 调用了forgetContents * (x == null) == isData 表示请求节点匹配了数据节点(请求节点的item更新为数据节点的数据) * 或者数据节点匹配了请求节点(数据节点的item更新为null) */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 当前节点是否是未匹配的请求节点 * !isData 请求节点 * item == null 还未匹配被更新 */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 能否将指定的节点node(haveData类型)追加到当前节点后。如果node节点属性与当前节点相反,且当前节点还未进行匹配则不能追加 */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 尝试人为匹配数据节点,匹配成功返回true,设置item为null(不用再匹配了) * 相当于移除当前数据节点,用在remove方法中 */ final boolean tryMatchData() { // assert isData; Object x = item; // item非空且未指向自己则表示当前节点为还未匹配的数据节点 // 之后尝试将item置为null同时唤醒等待的线程 if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } private static final long serialVersionUID = -3375979862319811754L; // Unsafe mechanics // CAS操作 private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
构造方法比较容易理解,addAll最终循环调用add方法一个一个进行添加
public LinkedTransferQueue() { } public LinkedTransferQueue(Collection<? extends E> c) { this(); addAll(c); }
put,offer,add 都是调用 xfer(e, true, ASYNC, 0)
,需要注意,offer设置超时的那个方法没用,使用时需要注意!ASYNC表示异步操作,相当于这些方法执行后直接入队元素然后结束不会像SynchronousQueue队列那样阻塞等待匹配元素出现
public void put(E e) { xfer(e, true, ASYNC, 0); } public boolean offer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); return true; } public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
当我们需要不同的队列入队操作时,根据需要使用下列方法
// 由于中断操作导致失败会抛错 public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } // 立刻尝试匹配返回,不进行任何等待操作,xfer源码部分有判断这个标识 public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } // 尝试匹配未匹配等待超时时间才返回,如被中断则抛错 public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
出队操作,请求数据节点,这里xfer方法的参数也能看出其使用方法的不同,take方法获取不到对应的匹配节点会阻塞操作,而poll方法在未设置超时时间时以NOW模式,相当于直接获取数据,不管有没有都会直接返回结果,不会进行阻塞等待
public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return xfer(null, false, NOW, 0); }
LinkedTransferQueue核心方法,所有的操作最终都通过xfer实现,通过how参数的不同进行不同的处理,在匹配上时判断当前head的slack阈值,如果达到上限则进行head更新
/** * @param e 数据节点(e非空)或者请求节点(e为null) * @param haveData 数据节点为true,请求节点为false * @param how NOW, ASYNC, SYNC, or TIMED 4种类型,上面已经介绍过 * @param nanos TIMED模式下设置的超时时间 * @return 节点匹配上则返回对应的匹配项否则传入的参数e * @throws */ private E xfer(E e, boolean haveData, int how, long nanos) { // 校验节点 if (haveData && (e == null)) throw new NullPointerException(); // 假如需要的话,这里先声明待添加的节点 Node s = null; // the node to append, if needed // 标签 continue跳转 retry: for (;;) { // restart on append race // 从头节点开始尝试匹配 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; // item != p 表示p节点item未指向自己,未执行forgetContents,未被取消或匹配 // (item != null) == isData 表示 p 是一个还未匹配的数据节点或请求节点 // 不满足条件可能需要执行后面逻辑 if (item != p && (item != null) == isData) { // unmatched // 相同节点类型,说明和队列中所有节点相同类型,无需匹配,跳出这个循环根据类型继续接下来的操作 if (isData == haveData) // can't match break; // 执行到这说明p节点还未匹配上且与当前节点是相异类型,cas更新item成功则表示匹配上了 // 注意这里只更新了head指向的节点,因为本次线程的e节点到这里还未入队 // 这里将p的item指向为对应操作的节点e,表示p对应的节点已经与此次的e匹配上了 // 如果未更新成功,说明p已经被其他人匹配上,执行后面逻辑继续循环 if (p.casItem(item, e)) { // match // p当前已经不是指向h了,说明p已经被循环next更新过了 for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton // h依旧指向头节点尝试更新head指向 // 松弛度等于2则更新head,h->q->n if (head == h && casHead(h, n == null ? q : n)) { // 将h的next更新方便回收 h.forgetNext(); break; } // advance and retry // head为空或者head的next节为空或者head的next节点未被匹配或取消 // 此时跳出循环,slack较小不需要更新head if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // p已经与e匹配上了,唤醒p节点对应的等待线程 LockSupport.unpark(p.waiter); // 转换类型返回 return LinkedTransferQueue.<E>cast(item); } } // 已被其他线程匹配则遍历下一个节点 Node n = p.next; // p == n 即 p == p.next 执行了forgetNext // 说明头节点指向已经更新了,p节点已经离队需要重新从头开始匹配 p = (p != n) ? n : (h = head); // Use head if p offlist } // 循环队列未匹配上同时为非NOW模式,NOW则直接返回入参e if (how != NOW) { // No matches available // s为空则初始化s节点 if (s == null) s = new Node(e, haveData); // 尝试添加节点s到队列尾部 Node pred = tryAppend(s, haveData); // null表示当前有匹配的节点了,从retry开始重新开始判断处理 // 在后面的方法中会分析tryAppend if (pred == null) continue retry; // lost race vs opposite mode // 执行到这里说明pred非null,s添加到队列中了,pred代表的是s的前驱节点或者s本身 // 处理SYNC/TIMED模式 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
为了方便理解,这里通过流程图显示xfer的操作流程:
尝试将s节点添加到队列尾部,同时当tail的slack达到阈值时则更新tail指向,不同的返回值对应不同的处理过程,查看xfer源码上的处理,返回值含义如下:
/** * * @param s 添加到队列的节点元素 * @param haveData 数据节点入队为true,请求节点入队为false */ private Node tryAppend(Node s, boolean haveData) { // p指向尾结点 for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail // 头尾节点为空则表示队列为空 if (p == null && (p = head) == null) { // 队列为空时尝试更新头节点为s即可,失败重新循环处理 if (casHead(null, s)) return s; // initialize } // 队列非空同时添加的s节点与p节点数据类型不同表示两者可以匹配则返回null进行标记处理(xfer中使用到了) else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // p节点非当前尾节点(可能被其他线程更新了tail) else if ((n = p.next) != null) // not last; keep traversing // 满足条件更新t指向tail,p指向t重新循环开始 p = p != t && t != (u = tail) ? (t = u) : // stale tail // 在队列更新p = p.next 重新开始循环或离队状态则置p为null循环从头节点开始 (p != n) ? n : null; // restart if off list // p为当前尾结点,尝试更新p的next指向s失败则更新p指向p的next // 失败说明别的线程更新了p的next,此时更新p重新循环执行 else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure else { // 执行到这表明更新p的next为s成功 // p和t已经不同了,p可能循环了几次才成功更新next,t还是之前的指向,需要更新 // p != t 为真时 slack >=2 // 如果t = p 更新next为s成功,则slack = 1,这个条件不会进去 // t != p,整个节点关联为...->t->...->p->s,t到s距离 >= 2 if (p != t) { // update if slack now >= 2 // 更新tail,不停的判断是否是tail,不是则持续向前直到尾部节点,然后更新tail退出while while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } // 返回添加节点的前驱节点p return p; } } }
xfer中处理SYNC/TIMED模式时调用,处理阻塞等待和超时等待匹配节点方式,参考上面xfer中调用的地方,其中的入参说明如下:
注意返回值的含义,匹配上返回对应匹配节点的item,如未匹配上,中断或者超时则返回入参e
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { // 计算过期时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 初始化 int spins = -1; // initialized after first item and cancel checks // 使用ThreadLocalRandom产生并发随机数 ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // item != e 说明s的item已经被更新了,表示已经与其他节点匹配上了 // item更新成对应匹配节点的item,参考xfer匹配节点过程理解 if (item != e) { // matched // assert item != s; // 已经被匹配上了,将当前节点forgetContents,避免垃圾堆积 s.forgetContents(); // avoid garbage // 类型转化返回结束 return LinkedTransferQueue.<E>cast(item); } // 还未被匹配,先判断当前线程是否被中断或者超时 // 第一个条件为true时,s节点尝试更新item指向自己(取消操作,这里s是本次操作的节点,取消了就不用再继续处理了) if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel // 将s和其前驱节点解除关联 unsplice(pred, s); return e; } // 到这里表明没匹配上同时也没被中断或超时 // 自旋次数设置,单核机器不会进行自旋 if (spins < 0) { // establish spins at/near front // 计算spins if ((spins = spinsFor(pred, s.isData)) > 0) // 调用current获取并发随机数产生类 randomYields = ThreadLocalRandom.current(); } // 自旋次数循环递减 else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) // 随机等于0时,当前线程让步,给其他线程执行机会 Thread.yield(); // occasionally yield } // 执行到这已经进行过自旋了spins = 0,说明暂时无匹配节点先保存当前线程 // 这里设置完了还继续循环处理 else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } // 设置超时则超时等待 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } // 阻塞等待当前线程直到被其他线程唤醒 else { LockSupport.park(this); } } }
根据当前节点的前驱节点和当前节点的数据类型返回对应的自旋值,不同情况下返回不同的数值
private static int spinsFor(Node pred, boolean haveData) { // 多CPU,前驱节点非空才能自旋操作 if (MP && pred != null) { // 前驱和当前节点类型不同则自旋FRONT_SPINS + CHAINED_SPINS if (pred.isData != haveData) // phase change return FRONT_SPINS + CHAINED_SPINS; // 前驱节点已经被匹配了,返回自旋FRONT_SPINS次数 if (pred.isMatched()) // probably at front return FRONT_SPINS; // 前驱等待线程为空,还没更新waiter,说明前驱节点在自旋操作,返回CHAINED_SPINS if (pred.waiter == null) // pred apparently spinning return CHAINED_SPINS; } return 0; }
返回p的后继节点,如果p.next指向p(p节点已经离队),则返回head头节点
final Node succ(Node p) { Node next = p.next; return (p == next) ? head : next; }
找到第一个未匹配节点,数据类型一致则返回节点,不一致则返回null。hasWaitingConsumer使用firstOfMode来进行了判断,firstDataNode,firstDataItem(peek使用了)类似不详细进行说明了,countOfMode计算对应类型节点的数量,源码也比较简单
private Node firstOfMode(boolean isData) { // 从头开始进行循环判断 for (Node p = head; p != null; p = succ(p)) { // 未匹配节点 if (!p.isMatched()) // 数据类型一致则返回p,否则返回null return (p.isData == isData) ? p : null; } return null; } final Node firstDataNode() { for (Node p = head; p != null;) { Object item = p.item; if (p.isData) { if (item != null && item != p) return p; } // 头节点未被匹配同时非数据节点则队列中此刻应该只有请求节点不需要再循环判断下去了 else if (item == null) break; if (p == (p = p.next)) p = head; } return null; } private E firstDataItem() { for (Node p = head; p != null; p = succ(p)) { Object item = p.item; if (p.isData) { if (item != null && item != p) return LinkedTransferQueue.<E>cast(item); } else if (item == null) return null; } return null; } private int countOfMode(boolean data) { int count = 0; for (Node p = head; p != null; ) { if (!p.isMatched()) { if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) // saturated break; } Node n = p.next; if (n != p) p = n; else { count = 0; p = head; } } return count; }
前驱节点与已删除或者取消状态的s节点取消连接,将两个节点取消关联
/** * * @param pred s的前驱节点或者为null或者为s自己(当s为头节点时) * @param s 取消或删除的节点 */ final void unsplice(Node pred, Node s) { // 清理s节点变量 s.forgetContents(); // forget unneeded fields // 确认pred的next指向s即两者之间还有关联才处理 if (pred != null && pred != s && pred.next == s) { Node n = s.next; // s的next为空表示s为尾结点 // s的后继非s且pred更新next成功且pred已被匹配,尝试解除s节点 if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) { // 检查是否是头节点并更新 for (;;) { // check if at, or could be, head Node h = head; // 头节点为前驱节点 // 头节点为s节点 // 头节点为空,则表示为空队列 if (h == pred || h == s || h == null) return; // at head or list empty // 头节点未被匹配则跳出循环 if (!h.isMatched()) break; // 到这说明h已经被匹配,需要更新head Node hn = h.next; // 头节点后继节点为空,验证队列为空 if (hn == null) return; // now empty // 头节点后继节点非头节点并且尝试更新头节点为后继节点 if (hn != h && casHead(h, hn)) // 清理原头节点 h.forgetNext(); // advance head } // 解除前后节点链接失败则统计阈值处理 // 再次检查是否离队 if (pred.next != pred && s.next != s) { // recheck if offlist // 根据SWEEP_THRESHOLD阈值进行判断处理 for (;;) { // sweep now if enough votes int v = sweepVotes; // 小于阈值则尝试将阈值加1 if (v < SWEEP_THRESHOLD) { if (casSweepVotes(v, v + 1)) break; } // 大于等于阈值则将阈值归0同时通过sweep方法进行清理 else if (casSweepVotes(v, 0)) { sweep(); break; } } } } } }
从头节点开始遍历清理匹配节点(取消的节点)的节点关联关系
private void sweep() { // 从头节点开始,p开始为头节点,s为p的后继节点 for (Node p = head, s, n; p != null && (s = p.next) != null; ) { // s为未匹配的节点,开始遍历下一个 if (!s.isMatched()) // Unmatched nodes are never self-linked p = s; // s已经被匹配了,如果s为尾节点,遍历完了,终止 else if ((n = s.next) == null) // trailing node is pinned break; // s的next指向自己,说明s已经离队 else if (s == n) // stale // No need to also check for p == s, since that implies s == n // 从头重新开始 p = head; else // 更新p的next p.casNext(s, n); } }
移除对应的节点
private boolean findAndRemove(Object e) { if (e != null) { // 循环 for (Node pred = null, p = head; p != null; ) { // 匹配item Object item = p.item; // 数据节点比较item是否相等,相等则通过tryMatchData自我匹配,然后unsplice取消前后节点关系 if (p.isData) { if (item != null && item != p && e.equals(item) && p.tryMatchData()) { unsplice(pred, p); return true; } } // 请求节点同时还未被匹配,队列中没有数据节点,直接跳出 else if (item == null) break; pred = p; // p已经是旧的数据,需要更新p指向head重新循环处理 if ((p = p.next) == pred) { // stale pred = null; p = head; } } } return false; }
主要的源码部分基本已分析完毕,关于迭代器的部分不再详述,读者可自行阅读理解
LinkedTransferQueue作为一个基于链表的FIFO无界阻塞队列,使用了一些复杂的概念,双重队列,松弛度都是需要好好理解的部分,应该先从整体了解其流程处理,再细看其内部实现,其核心方法在于xfer,可以参考流程图进行梳理,作为阻塞队列,使用好LinkedTransferQueue是不容易的,方法的使用需要参考源码,否则用错地方导致线上事故得不偿失,希望本文对各位有所帮助
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢