正如上篇文章 聊聊 JDK 阻塞队列源码(ReentrantLock实现) 所说,队列在我们现实生活中队列随处可见,最经典的就是去银行办理业务,超市买东西排队等。今天楼主要讲的就是JDK中安全队列的另一种实现使用CAS算法实现的安全队列。
在 JDK 中的队列都实现了 java.util.Queue 接口,下面就是楼主要说的无锁版本的队列实现:
队列名字 | 是否加锁 | 数据结构 | 关键技术点 | 是否有锁 | 是否有界 |
---|---|---|---|---|---|
LinkedTransferQueue | 否 | 链表 | CAS | 无锁 | 无界 |
ConcurrentLinkedQueue | 否 | 链表 | CAS | 无锁 | 无界 |
LinkedTransferQueue 的原理就是通过使用原子变量compare and swap(简称“CAS”)这种不加锁的方式来实现的进行并发控制,LinkedTransferQueue是一个无界的安全队列,其长度可以无限延伸,当然其带来的问题也是显而易见的。
add
方法:
public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
offer
方法:
public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; }
poll
方法:
public E poll() { return xfer(null, false, NOW, 0); }
take
方法:
public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
从上面代码中可以看出,这些方法最终都指向了 xfer
方法,只不过传入的不同的参数。
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take * @param haveData true if this is a put, else a take * @param how NOW, ASYNC, SYNC, or TIMED * @param nanos timeout in nanosecs, used only if mode is TIMED * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */
从源码的 doc 注释中可以知道
第一个参数,如果是 put 类型,就是实际的值,反之就是 null。
第二个参数,是否包含数据,put 类型就是 true,take 就是 false。
第三个参数,执行类型,有立即返回的NOW,有异步的ASYNC,有阻塞的SYNC, 有带超时的 TIMED。
第四个参数,只有在 TIMED类型才有作用。
接下来我们来看看 xfer
到底是何方神圣
private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 从 head 开始 for (Node h = head, p = h; p != null;) { // find & match first node // head 的类型。 boolean isData = p.isData; // head 的数据 Object item = p.item; // item != null 有 2 种情况,一是 put 操作, 二是 take 的 itme 被修改了(匹配成功) // (itme != null) == isData 要么表示 p 是一个 put 操作, 要么表示 p 是一个还没匹配成功的 take 操作 if (item != p && (item != null) == isData) { // 如果当前操作和 head 操作相同,就没有匹配上,结束循环,进入下面的 if 块。 if (isData == haveData) // can't match break; // 如果操作不同,匹配成功, 尝试替换 item 成功, if (p.casItem(item, e)) { // match // 更新 head for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 唤醒原 head 线程. LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } // 找下一个 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 如果这个操作不是立刻就返回的类型 if (how != NOW) { // No matches available // 且是第一次进入这里 if (s == null) // 创建一个 node s = new Node(e, haveData); // 尝试将 node 追加对队列尾部,并返回他的上一个节点。 Node pred = tryAppend(s, haveData); // 如果返回的是 null, 表示不能追加到 tail 节点,因为 tail 节点的模式和当前模式相反. if (pred == null) // 重来 continue retry; // lost race vs opposite mode // 如果不是异步操作(即立刻返回结果) if (how != ASYNC) // 阻塞等待匹配值 return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
代码有点长,其实逻辑很简单。就是找到 head
节点,如果 head
节点是匹配的操作,就直接赋值,如果不是,添加到队列中。
注意:队列中永远只有一种类型的操作,要么是 put
类型, 要么是 take
类型.
与 LinkedTransferQueue
一样,ConcurrentLinkedQueue 一样是采用原子变量实现的并发控制, ConcurrentLinkedQueue
是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现。
add
方法:
public boolean add(E e) { return offer(e); }
offer
方法:
ConcurrentLinkedQueue
是无界的,所以 offer
永远返回true,不能通过返回值来判断是否入队成功,
public boolean offer(E e) { // 校验是否为空 checkNotNull(e); //入队前,创建一个入队节点 final Node<E> newNode = new Node<E>(e); //循环CAS直到入队成功。 // 1、根据tail节点定位出尾节点(last node); // 2、将新节点置为尾节点的下一个节点, // 3、更新尾节点casTail。 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null if (q == null) { // p is last node if (p.casNext(null, newNode)) { //设置P节点的下一个节点为新节点,如果p的next为null,说明p是尾节点,casNext返回true; // 如果p的next不为null,说明有其他线程更新过队列的尾节点,casNext返回false。 // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) //p节点是null的head节点刚好被出队,更新head节点时h.lazySetNext(h)把旧的head节点指向自己 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. //判断tail节点有没有被更新,如果没被更新,1)p=q:p指向p.next继续寻找尾节点; //如果被更新了,2)p=t:P赋值为新的tail节点 p = (p != t && t != (t = tail)) ? t : q; } }
poll
方法:
public E poll() { restartFromHead: //两层循环 for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } //队列为空,更新head节点 else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) //p节点是null的head节点刚好被出队,更新head节点时h.lazySetNext(h);把旧的head节点指向自己。 //重新从head节点开始 continue restartFromHead; else p = q; //将p执行p的下一个节点 } } } //更新head节点 final void updateHead(Node<E> h, Node<E> p) { //通过CAS将head更新为P if (h != p && casHead(h, p)) h.lazySetNext(h);//把旧的head节点指向自己 } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
remove
方法:
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; // 循环CAS直到删除节点 for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; if (item != null) { if (!o.equals(item)) { next = succ(p); continue; } // 通过CAS删除节点 removed = p.casItem(item, null); } next = succ(p); if (pred != null && next != null) // unlink pred.casNext(p, next); if (removed) return true; } } return false; }
本文主要介绍了两种CAS算法实现的安全队列,然而稳定性要较高的系统中,为了防止生产者速度过快,导致内存溢出,通常是不建议选择无界队列的。当然楼主水平有限,文章中不免有纰漏,望小伙伴谅解并指出,在技术的道路上一起成长。