非阻塞队列ConcurrentLinkedQueue我们已经了解过了,既然是Queue,那么是否有其双端队列实现呢?答案是肯定的,今天就继续说一说非阻塞双端队列实现ConcurrentLinkedDeque
JDK版本号:1.8.0_171
ConcurrentLinkedDeque是一个基于链表实现的无界的线程安全的同时支持FIFO、LIFO非阻塞双端队列。操作上可类比ConcurrentLinkedQueue,利用CAS进行无锁操作,同时通过松弛度阈值设置来减少CAS操作,在理解这个类前可先去参考理解我之前对ConcurrentLinkedQueue的源码分析
为了说明的方便,我们区分下,活动结点是item非null的结点,有效结点是保持着前后关系的结点,作者在注释中解释了将一个结点删除分为3个步骤:
区分定义是便于源码分析时的说明,参考下图所示理解:
public class ConcurrentLinkedDeque<E> extends AbstractCollection<E> implements Deque<E>, java.io.Serializable
为了方便理解,这里将ConcurrentLinkedDeque操作过程进行图示,让各位先有个了解,便于后面源码的分析
1.new实例化操作
2.offer("1")
3.offer("2")
4.offerFirst("3")
5.offerFirst("4")
6.pollLast
7.poll
8.poll
9.pollLast
head结点(p.prev == null && p.next != p):
tail结点(p.next == null && p.prev != p):
由于head结点和tail结点不是实时更新(同ConcurrentLinkedQueue),达到松弛度阈值才进行更新,减少CAS操作,有可能导致head结点在tail结点之后的现象
/** * A node from which the first node on list (that is, the unique node p * with p.prev == null && p.next != p) can be reached in O(1) time. * Invariants: * - the first node is always O(1) reachable from head via prev links * - all live nodes are reachable from the first node via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * - head is never gc-unlinked (but may be unlinked) * Non-invariants: * - head.item may or may not be null * - head may not be reachable from the first or last node, or from tail */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique node p * with p.next == null && p.prev != p) can be reached in O(1) time. * Invariants: * - the last node is always O(1) reachable from tail via next links * - all live nodes are reachable from the last node via pred() * - tail != null * - tail is never gc-unlinked (but may be unlinked) * Non-invariants: * - tail.item may or may not be null * - tail may not be reachable from the first or last node, or from head */ private transient volatile Node<E> tail; // 终止结点,在gc-unlinking阶段将无用结点链接到这两个结点上,自行处理减少内内存滞留风险 private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR; // 删除结点执行unlinking/gc-unlinking的阈值,当逻辑删除结点达到阈值才触发,算是性能优化 private static final int HOPS = 2; // CAS private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { PREV_TERMINATOR = new Node<Object>(); PREV_TERMINATOR.next = PREV_TERMINATOR; NEXT_TERMINATOR = new Node<Object>(); NEXT_TERMINATOR.prev = NEXT_TERMINATOR; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedDeque.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } }
Node实现与ConcurrentLinkedQueue不同之处也就在于多了变量prev指向结点的前驱
static final class Node<E> { volatile Node<E> prev; volatile E item; volatile Node<E> next; Node() { // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR } /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext or casPrev. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } void lazySetPrev(Node<E> val) { UNSAFE.putOrderedObject(this, prevOffset, val); } boolean casPrev(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long prevOffset; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; prevOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("prev")); itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
无参构造方法创建了空结点同时头尾结点指向这个空结点,集合参数构造时先将所有集合结点构成链表,最后通过initHeadTail更新链表head,tail即可
public ConcurrentLinkedDeque() { head = tail = new Node<E>(null); } public ConcurrentLinkedDeque(Collection<? extends E> c) { // Copy c into a private chain of Nodes Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } initHeadTail(h, t); } /** * Initializes head and tail, ensuring invariants hold. */ private void initHeadTail(Node<E> h, Node<E> t) { if (h == t) { if (h == null) h = t = new Node<E>(null); else { // Avoid edge case of a single Node with non-null item. Node<E> newNode = new Node<E>(null); t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } head = h; tail = t; }
被addFirst和offerFirst所使用,将元素e添加到队列头部,即从头部入队操作。linkLast被addLast和offerLast所使用,offer,add同样最终调用此方法完成操作,将元素e添加到队列尾部,即从尾部入队操作,没什么好说的,类比linkFirst源码理解
private void linkFirst(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); restartFromHead: for (;;) for (Node<E> h = head, p = h, q;;) { // 前驱节点不为null,前驱的前驱节点不为null if ((q = p.prev) != null && (q = (p = q).prev) != null) // head应该被更新了(已经超过了松弛度阈值)判断是否已经更新了则p更新为head // 未更新则直接更新为前驱的前驱结点 p = (h != (h = head)) ? h : q; // p已经出队,没办法从p再继续判断了,无法到达其他结点,需要重新开始循环 else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p为第一个结点,更新新结点next指向p newNode.lazySetNext(p); // CAS piggyback // 尝试更新p的前驱指向新结点,更新失败则重新循环更新 if (p.casPrev(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". // 新结点入队成功,头结点已经更新了(此时的新结点距离h已经 >= 2个结点距离),尝试更新head if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // Lost CAS race to another thread; re-read prev } } }
被pollFirst,pollLast,removeFirstOccurrence,removeLastOccurrence和迭代器的remove所使用,移除非空结点,出队操作和删除时使用,主要处理处于队列中间的结点
void unlink(Node<E> x) { // assert x != null; // assert x.item == null; // assert x != PREV_TERMINATOR; // assert x != NEXT_TERMINATOR; final Node<E> prev = x.prev; final Node<E> next = x.next; // 前驱为null表示x为头结点 if (prev == null) { unlinkFirst(x, next); // 后继为null表示x为尾结点 } else if (next == null) { unlinkLast(x, prev); // 非头尾结点表示x处于中间位置需要特殊处理 } else { Node<E> activePred, activeSucc; boolean isFirst, isLast; // 记录逻辑删除结点数 int hops = 1; // Find active predecessor // 找到有效的前驱结点 for (Node<E> p = prev; ; ++hops) { // 有效前驱结点设置 if (p.item != null) { activePred = p; isFirst = false; break; } Node<E> q = p.prev; // p是第一个结点 if (q == null) { // p已经出队 if (p.next == p) return; // p的item为null,next还未更新,变量设置 activePred = p; isFirst = true; break; } // p == p.prev表示p已经出队 else if (p == q) return; // 继续循环向前查找 else p = q; } // Find active successor // 找到有效的后继结点 for (Node<E> p = next; ; ++hops) { // 有效后继结点设置 if (p.item != null) { activeSucc = p; isLast = false; break; } Node<E> q = p.next; // p是最后一个结点 if (q == null) { // p已经出队 if (p.prev == p) return; // p的item为null,prev还未更新,变量设置 activeSucc = p; isLast = true; break; } // p == p.next表示p已经出队 else if (p == q) return; // 继续循环向后查找 else p = q; } // TODO: better HOP heuristics // 达到逻辑删除结点阈值或者是内部删除结点则需要进行额外处理unlink/gc-unlink if (hops < HOPS // always squeeze out interior deleted nodes && (isFirst | isLast)) return; // Squeeze out deleted nodes between activePred and // activeSucc, including x. // 移除有效前驱和后继结点之间的有效结点,包括x,使得前驱和后继互连 skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc); // Try to gc-unlink, if possible // 有效前驱和后继是队头或队尾,尝试gc-unlink if ((isFirst | isLast) && // Recheck expected state of predecessor and successor // 检查前驱后继状态,确保未改变 (activePred.next == activeSucc) && (activeSucc.prev == activePred) && (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) { // 更新head和tail 确保x不可达 updateHead(); // Ensure x is not reachable from head updateTail(); // Ensure x is not reachable from tail // Finally, actually gc-unlink // 最后更新x,使得从x到活动节点不可达 x.lazySetPrev(isFirst ? prevTerminator() : x); x.lazySetNext(isLast ? nextTerminator() : x); } } }
unlink中调用,从队列头将第一个非空结点出队。unlinkLast从队列尾将第一个非空结点出队,代码实现与unlinkFirst类似,可参考理解
private void unlinkFirst(Node<E> first, Node<E> next) { // assert first != null; // assert next != null; // assert first.item == null; for (Node<E> o = null, p = next, q;;) { // p为活动节点或p为最后一个节点 if (p.item != null || (q = p.next) == null) { // 如果第一次循环就执行到此则不需要进行操作直接返回,p本来就是first的后继 // p的前驱不能指向自己,first的后继更新成p if (o != null && p.prev != p && first.casNext(next, p)) { // unlink阶段 skipDeletedPredecessors(p); // 检查first和p,确保没被更新修改才进行gc-unlink操作 if (first.prev == null && (p.next == null || p.item != null) && p.prev == first) { updateHead(); // Ensure o is not reachable from head updateTail(); // Ensure o is not reachable from tail // Finally, actually gc-unlink o.lazySetNext(o); o.lazySetPrev(prevTerminator()); } } return; } // p == p.next // p非活动结点同时p后继已经指向自己则直接返回 else if (p == q) return; // p非活动结点,p还有后继结点,重新赋值循环处理,注意这里o才被赋值 else { o = p; p = q; } } }
更新head结点,确保在调用此方法之前unlinked的任何结点在该方法返回之后都不能从head访问,不保证消除松弛度,仅仅是head将指向处于活动状态的结点。updateTail更新tail结点,同updateHead,基本操作一致,只是方向不同而已
private final void updateHead() { // Either head already points to an active node, or we keep // trying to cas it to the first node until it does. // head要么指向一个活动结点要么尝试指向第一个结点直到成功 Node<E> h, p, q; restartFromHead: // head指向非活动结点同时head非第一个结点 while ((h = head).item == null && (p = h.prev) != null) { for (;;) { // head前驱的前驱为空或head前驱的前驱的前驱为空 // 即head前有1个或2个结点 if ((q = p.prev) == null || (q = (p = q).prev) == null) { // It is possible that p is PREV_TERMINATOR, // but if so, the CAS is guaranteed to fail. // 将head更新指向为第一个结点 if (casHead(h, p)) return; else // 未成功更新说明已经被其他线程更新了,重新循环判断 continue restartFromHead; } // h前有超过2个的结点,表明当前h指向的结点已经与第一个结点距离超过2,同时h已经不指向head了,重新循环 else if (h != head) continue restartFromHead; // h前有超过2个的结点,同时h还指向head,则更新p为q再次判断,相当于p向前跳了1或2个结点位置 else p = q; } } }
这里有个java语法需要注意:continue lable和break lable的作用,可下列参考代码理解:
System.out.println("continue lable start "); aaa: for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { System.out.println(j); if(j == 1){ continue aaa; } } } System.out.println("continue lable end "); System.out.println("break lable start "); bbb: for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { System.out.println(j); if(j == 1){ break bbb; } } } System.out.println("break lable end ");
skipDeletedPredecessors实现将刚刚找到的后继结点的前驱指向结点p,即完成它们的互联,这一步就是所谓的unlinking,使队列的活动结点无法访问被删除的结点。skipDeletedSuccessors代码逻辑同skipDeletedPredecessors,可参考理解
private void skipDeletedPredecessors(Node<E> x) { whileActive: do { Node<E> prev = x.prev; // assert prev != null; // assert x != NEXT_TERMINATOR; // assert x != PREV_TERMINATOR; Node<E> p = prev; findActive: for (;;) { // p的item非空,说明p为活动结点,退出循环进行关联更新操作 if (p.item != null) break findActive; // p的item为空,再继续向前查找其前驱 Node<E> q = p.prev; // p的前驱结点为空 // 若p结点处于gc-unlinking状态,即通过p已经无法到达其他活动结点,则需重头开始继续循环判断 // 上面条件不满足,则表示p结点处于unlinking状态,还可以到达其他活动结点,可以继续被使用 // 表明找到了有效结点,退出循环 if (q == null) { if (p.next == p) continue whileActive; break findActive; } // p的前驱结点非空,p.prev == p // 相等则表明p已经此刻的p结点处于gc-unlinking状态,即通过p已经无法到达其他有效结点 // 无法再向前遍历,只能重头开始循环判断 else if (p == q) continue whileActive; // 到此表示p的item为空,p的前驱非空且不处于gc-unlinking状态 // 循环向前继续判断前驱结点 else p = q; } // found active CAS target // 找到活动或有效的前驱节点,前驱CAS更新成功返回否则继续循环判断更新 if (prev == p || x.casPrev(prev, p)) return; } while (x.item != null || x.next == null); }
找到结点的前驱或者后继,假如当前结点已经无效结点时,则返回第一个结点或最后一个结点
/** * Returns the successor of p, or the first node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> succ(Node<E> p) { // TODO: should we skip deleted nodes here? Node<E> q = p.next; return (p == q) ? first() : q; } /** * Returns the predecessor of p, or the last node if p.prev has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> pred(Node<E> p) { Node<E> q = p.prev; return (p == q) ? last() : q; }
返回第一个结点,有可能是逻辑删除结点,last操作类似
/** * Returns the first node, the unique node p for which: * p.prev == null && p.next != p * The returned node may or may not be logically deleted. * Guarantees that head is set to the returned node. */ Node<E> first() { restartFromHead: for (;;) for (Node<E> h = head, p = h, q;;) { // p的前驱和前驱的前驱都非空 // 表示p结点之前有2个以上的活动结点 if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. // 可能head已经被更新了则判断下更新h同时更新p // 或者head还未更新则直接将p指向q p = (h != (h = head)) ? h : q; // p的前驱为空或者前驱的前驱为空 // p == h 表明p的前驱为空(第一个条件里判断),p就是第一个结点 // p == h 不满足则p的前驱非空,前驱的前驱为空,则p的前驱为第一个结点,此时尝试更新head并返回第一个结点 else if (p == h // It is possible that p is PREV_TERMINATOR, // but if so, the CAS is guaranteed to fail. || casHead(h, p)) return p; // 第二个条件中尝试更新head失败,则说明其他线程更新了head,重新开始循环处理 else continue restartFromHead; } }
通过first()/last()方法返回第一个或最后一个结点的值
public E peekFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null) return item; } return null; } public E peekLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null) return item; } return null; }
removeFirst/removeLast同pollFirst/pollLast操作,最终调用unlink,可参考上面源码分析
public E pollFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; } public E pollLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
其他操作都是基于上面的方法进行实现的,就不再一一列举了,可自行参考源码理解
迭代器和之前队列讲解的迭代器ConcurrentLinkedQueue类似,不过由于其双向链表的实现,迭代器可分为升序迭代器(Itr)和倒序迭代器(DescendingItr),通过AbstractItr封装公共操作方法,Itr和DescendingItr分别实现对应不同的方法,一个从头节点开始向后进行遍历,一个从尾节点向后进行遍历,这部分和之前讲解过的LinkedBlockingDeque是类似的
public Iterator<E> iterator() { return new Itr(); } public Iterator<E> descendingIterator() { return new DescendingItr(); }
主要区别方法在于两个,通过这两个方法来完成不同方向的遍历
private class Itr extends AbstractItr { Node<E> startNode() { return first(); } Node<E> nextNode(Node<E> p) { return succ(p); } } /** Descending iterator */ private class DescendingItr extends AbstractItr { Node<E> startNode() { return last(); } Node<E> nextNode(Node<E> p) { return pred(p); } }
抽象类AbstractItr涉及到的方法比较简单,用到了前面所讲解过的方法,可参考之前的分析
private abstract class AbstractItr implements Iterator<E> { /** * Next node to return item for. */ private Node<E> nextNode; /** * nextItem holds on to item fields because once we claim * that an element exists in hasNext(), we must return it in * the following next() call even if it was in the process of * being removed when hasNext() was called. */ private E nextItem; /** * Node returned by most recent call to next. Needed by remove. * Reset to null if this element is deleted by a call to remove. */ private Node<E> lastRet; abstract Node<E> startNode(); abstract Node<E> nextNode(Node<E> p); AbstractItr() { advance(); } /** * Sets nextNode and nextItem to next valid node, or to null * if no such. */ private void advance() { lastRet = nextNode; Node<E> p = (nextNode == null) ? startNode() : nextNode(nextNode); for (;; p = nextNode(p)) { if (p == null) { // p might be active end or TERMINATOR node; both are OK nextNode = null; nextItem = null; break; } E item = p.item; if (item != null) { nextNode = p; nextItem = item; break; } } } public boolean hasNext() { return nextItem != null; } public E next() { E item = nextItem; if (item == null) throw new NoSuchElementException(); advance(); return item; } public void remove() { Node<E> l = lastRet; if (l == null) throw new IllegalStateException(); l.item = null; unlink(l); lastRet = null; } }
至此,队列部分已基本分析完毕,除了jdk本身的队列,还有一些比较有名的队列实现,比如Disruptor,可以参考美团的这篇文章进行一些深入了解,对于队列进行了一些底层的分析总结,比较有帮助
https://tech.meituan.com/2016...
源码已经分析完毕,我们以pollFirst出队操作为例进行一个总结说明:
在unlinking阶段根据结点位置进行不同情况的处理:
1.如果出队的结点是队列的第一个结点p,则执行unlinkFirst,其过程如下:
2.如果出队的结点是队列的最后一个结点p,则执行unlinkLast,其过程与第1种情况类似,只是方向不同
3.如果出队的结点时队列的中间位置,则执行unlink中的一个分支代码:
整体处理流程已经分析完毕,其他操作相对来说比较简单了,需要多理解
ConcurrentLinkedDeque是ConcurrentLinkedQueue的双端队列实现,在删除中涉及到了3个阶段,并且由于其无锁CAS操作和减少CAS次数的操作,导致其实现的复杂性,需要多写些例子理解下
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢