在Java并发体系中,很多并发场景都离不开队列。比如Java中的定时任务框架、线程池框架、MQ等。本篇文章将记录我的队列学习之旅中的无阻塞队列源码学习。
首先,队列必须是线程安全的,否则,在并发场编程中,就失去了使用队列的意义了。队列实现线程安全的方式有两种: 非阻塞队列和阻塞队列 。本篇文章我们先研究非阻塞队列。
在Java中要实现非阻塞的线程安全,一定绕不开自旋和CAS这一对好搭配。基于这个认知,我们来深入剖析一波Doug Lea大神笔下的ConcurrentLinkedQueue的源码。
ConcurrentLinkedQueue是一个基于链表的无界线程安全队列。采用FIFO的方式。它的结构如下:
我们以两个线程来模拟它的入队过程,这个过程很艰难,但是弄懂了之后,将发现不过如此! 源码中的one表示第一次循环,two表示第二次循环,three表示第三次循环。
public boolean offer(E e) { // 线程1、线程2同时进入, // 首先,校验不为空,如果为空,则抛出NPE checkNotNull(e); // 线程1、线程2同时将当前元素构建一个新的Node节点 final Node<E> newNode = new Node<E>(e); // 这是一个死循环,以保证元素最终一定入队成功。 // one:初始化时,t变量指向tail,p变量指向t,即p->t->tail. // two:线程2由于第一次循环CAS操作失败了,因此将进行第二次循环, // two:此时,依然是p->t->tail。因为线程1并没有改变tail的值,所以tail依然是没有改变的。 // three:线程2的第三次循环来咯,此时p节点已经指向了真正的尾节点了。 for (Node<E> t = tail, p = t;;) { // one:一开始时,q变量指向p节点的next节点,即:指向tail的next节点,此时next节点为空。 // two:此时q = p.next,虽然tail没有变化,但是tail的next节点已经不为空了! // two:因为线程1已经通过CAS操作设置成功了!因此,此时线程2将跳转到else分支 // three:由于p已经指向了真正的尾节点,因此p.next == null成立,因此,此时线程2将进入if分支。 Node<E> q = p.next; // one:此时,线程1和线程2都会执行到这里,进入if分支。 if (q == null) { // one:线程1和线程2同时通过CAS操作设置tail节点的next节点 // one:此时必然只有一个线程CAS操作成功。假设线程1执行成功,线程2执行失败。 // three:线程2通过CAS操作设置尾节点的next节点。 if (p.casNext(null, newNode)) { // one:线程1执行成功后,此时p == t 这个条件是成立的,因此不会通过CAS操作更新tail节点指向尾节点,退出循环。 // one:而线程2执行失败了,因此,将指向第二轮循环。 // three:线程2如果执行成功后,此时p指向的是真正的尾节点,而t节点依然指向的是tail节点,由于线程1并没有更新tail节点为尾节点,因此p!=t成立! if (p != t) // three: 线程2通过CAS操作更新tail节点指向尾部节点, // 就算更新失败了也没有关系,说明又有其他线程更新成功了, // 线程2退出循环。 casTail(t, newNode); // Failure is OK. return true; } } else if (p == q) // 这个分支的成立条件只有一种情况,那就是初始化时的情况,此时p=q=null. p = (t != (t = tail)) ? t : head; else // two: 此时将通过for循环寻找到真正的尾节点并赋值给p变量。 // 找到后,进入第三次循环three. p = (p != t && t != (t = tail)) ? t : q; } } 复制代码
通过以上的源码分析,我们知道了元素是如何入队的,并且能够理解为什么tail节点并不总是尾节点了。 以图片的形式展示入队的过程如下:
以上的入队过程为什么要设计的这么复杂呢?tail节点能不能必须是尾节点呢?我们带着这样的问题来尝试进行优化:
public boolean offer(E e) { checkNotNull(e); Node<E> n = new Node<>(e); for(;;) { Node<E> t = tail; if(t.casNext(null, t) && casTail(t, n)) { return true; } } } 复制代码
让tail节点永远指向队列的尾节点,这样实现的代码量很少,并且语义更加清晰。但是Doug Lea大神却偏偏不按照我们普通人的思路实现。因为这样做有一个明显的缺陷:每次都要通过循环CAS更新tail节点。如果能够减少循环次数,就能够提高入队效率。
所以Doug Lea大神采用了“间隔式”的方式来更新tail节点。即:当tail节点和尾节点超过一定距离才更新tail节点。JDK1.7之前,用的是HOPS变量来控制距离,而JDK1.8则通过p == t来判断,其本质是和HOPS变量的作用一样的。其实这种处理方式从本质上来说就是通过增加对volatile变量的读操作来减少volatile变量的写操作,而读操作比写操作的效率要高很多,所以入队的效率才会有所提升!
通过以上解析,入队其实就是解决两个问题:1.更新tail节点;2.找到尾节点。其中对于更新tail节点,Doug Lea大神做了特殊优化,优化的方式就是“间隔式”的更新tail变量。
同样的,我们以两个线程来模拟队列出队的过程。有了入队的解析过程,相信出队的解析会轻松一些,因为出队和入队的一些思想是一样的。源码如下:
public E poll() { restartFromHead: // 死循环 // 线程1和纤程2同时进入循环。 for (;;) { // one: 一开始,p->h->head,即指向头节点 // two: 线程2由于cas操作失败,因此将进行第二次循环,此时依然是p->h->head // three: 线程2将进行第三笔循环执行,此时h = head,然而,p = head.next,即下个节点了。 for (Node<E> h = head, p = h, q;;) { // one: 线程1、线程2同时获取到head的元素item // two: 线程2执行获取head的item,由于线程1已经将head节点的item设置成null了,因此线程2获取的item = null,因此进入第一个else if分支。 // three: 线程2将获取到下个节点的元素, E item = p.item; // one: 线程1、线程2同时通过CAS操作来设置头节点的item为null, // 此时,必然只有一个线程设置成功,另一个线程设置失败,假设是线程1设置成功。线程2设置失败,此时线程2进行第二次循环。 // three: 线程2通过CAS操作成功。 if (item != null && p.casItem(item, null)) { // one: 线程1执行到这里,并且此时p = h = head,因此判断条件不成立,返回item. // three: 线程2执行到这里,p != h 判断条件成立。因此cas更新hdead节点。如果p.next为空,则就为p节点自己,否则就是p.next节点。 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // two: 线程2执行到这个判断分支,目的是设置q = head节点的next节点。 // 如果q == null成立,则说明已经到了队列尾部,此时直接更新头结点并返回null。 // 如果q != null,线程2将继续执行第二个else if分支。 else if ((q = p.next) == null) { updateHead(h, p); return null; } // two: 线程2继续判断p == q,如果条件成立,则说明又有其他线程更新了head节点,则使用新的head重新循环。 // 如果条件不成立,则继续执行else分支。为了当前例子的分析,假设这里条件不成立。 else if (p == q) continue restartFromHead; // two: 线程2执行到此处,执行p = q,实际上就是将head节点的下个节点赋值给p变量,即:p = head.next.此时线程2将继续进行第3次循环。 else p = q; } } } 复制代码
通过以上的分析,我们发现,出队和入队的处理思想居然是一致的。即head节点不总是指向头节点。也是采用“间隔式”的方式更新。更具体一点就是:如果head节点的元素不为空,则直接取head节点的元素且不会更新head节点,当head节点的元素为空时,才会更新head节点指向头节点。
以图片的形式展示出队的过程如下:
方法名 | 说明 |
---|---|
peek() | 获取元素而不删除元素,但是也会像poll那样更新head节点 |
size() | 当前队列中的有效元素个数,这个方法得到的结果不一定精确,因为它没有使用同步锁,在统计的过程中,队列可以进行入队、出队和删除元素操作。因此不是线程安全的 |
remove(Obj o) | 删除元素 |
ConcurrentLinkedQueue是非阻塞队列的经典实现。非阻塞队列的应用场景大多是多端消费的场景。