这篇文章理解起来不难,相比于 ConcurrentHashMap 比较简单,因为不涉及扩容以及数据迁移等操作,相信你读完一定会有收获的。
本文是死磕Java并发编程系列文章的 第 9 篇 ,主角就是 java 并发包中提供的 ConcurrentLinkedQueue
这是一个 线程安全且无界 的队列(因为是基于链表实现的,所以无界) ,在并发编程中经常需要用到线程安全的队列,面试线程池时,其中的队列也可以采用这个队列来实现,它线程安全,且采用先进先出的规则排序。
通过整个并发系列文章的学习,我们能想到如果要实现一个线程安全的队列那么有两种方式:一种是使用 阻塞方式 ,即 锁的形式 ,在出队、入队方法加上 synchronized 或者获取 lock 锁 等方式实现。另一种是 非阻塞方式 ,使用自旋CAS的方式来实现。 而 Java 并发包中你会看到 Concurrent 开头的类都是支持并发的,也就是非阻塞的。
这篇文章我们一起来从源码分析下 并发大师 Doug Lea 是如何使用非阻塞的方式实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师身上我们能学到不少并发技巧。
通过 ConcurrentLinkedQueue 的类图来分析一下它的结构:
看以看出,ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点即子类 Node 由 节点属性 item 和 next(指向下一个节点Node的引用)组成。 即节点之间通过 next 关联,从而组成一张链表。
入队即每次将元素包装成节点放到链表结尾,出队从链表头删除一个元素返回。
了解了整体结构,你应该也看出来了,并发队列 ConcurrentLinkedQueue 最重要的就是俩操作,入队和出队,接下来我们直接从源码层面学习。
入队过程其实就是将入队节点添加到队列尾部,这个入队操作,其实 Doug Lea 大师做了一些优化,为了一会看源码更加清晰,这里先看一组入队的过程图,直观的了解一下优化点。假设现在要插入四个节点:
通过上面入队的操作,观察到 head 和 tail 节点的变化,总结其实就是干了两件事: 第一是将入队节点设置到当前队尾节点的 next 节点上;第二就是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置为 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设为 tail 节点的 next 节点 。也就是说 tail 节点并不一定是尾结点,一定要清楚记着这一点,这对理解下面的入队源码非常有用。
下面我就不多说了,直接看代码,理解上面的描述,在结合代码注释,相信你一定能看懂:
// 将指定的元素插入到此队列的末尾,因为队列是无界的,所以这个方法永远不会返回 false 。 public boolean offer(E e) { checkNotNull(e); // 入队前,创建一个入队节点 final Node<E> newNode = new Node<E>(e); // 死循环,入队不成功反复入队 // 创建一个指向tail节点的引用,p用来表示队列的尾节点,默认情况下等于tail节点 for (Node<E> t = tail, p = t;;) { // 获得p节点的下一个节点 Node<E> q = p.next; // next节点为空,说明p是尾节点 if (q == null) { // p is last node // p是尾结点,则设置p节点的next节点为入队节点 if (p.casNext(null, newNode)) { // 首先要知道入队操作不是每次都设置tail节点为尾结点,为了减少CAS操作提高性能,也就是说tail节点不总是尾节点 // 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点, // p != t 这个条件者结合下面 else 分支看,下面在冲突的时候会修改 p 指向 p.next,所以导致p不等于 tail, // 即tail节点有大于等于1个的next节点 if (p != t) // hop two nodes at a time // 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点, // 这里允许失败,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点 casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } // 这个分支要想进去,即 p 节点等于 p.next 节点,只有一种可能,就是 p 节点和 p.next 节点都为空 // 表示这队列刚初始化,正准备添加节点,所以需要返回head else if (p == q) // 如果tail变了,说明被其他线程添加成功了,则 p 取新的 tail,否则 p 从 head 开始 p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. // 进行这个分支说明next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点 // 执行 p != t 说明尾结点不等于tail,t != (t = tail)) 说明tail做了变动, // 同时满足说明tail已经重新设置了,即结尾就是tail,否则尾结点取tail.next p = (p != t && t != (t = tail)) ? t : q; } } 复制代码
从源代码角度来看,整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用 CAS 算法将入队节点设置成尾节点的 next 节点,如不成功则重试。
这里我们思考一下,上面我们分析出入队操作在先进先出队列中就是将其设置为尾结点, Doug Lea 大师的代码写的有点复杂,我们可以不可以用下面的代码来替代呢?
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); for (;;) { Node<E> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; } } } 复制代码
上面的代码每次入队都将 tail 设置为尾结点,这样能节省很多的代码量,并且更加容易理解。 但是这样做的缺点就是每次都要使用 循环 CAS 来进行设置 tail 节点。如果能减少 CAS 更新 tail 节点则能提高入队的效率。 但是我们同样要考虑到,由于 并不是 tail 一定等于尾结点,所以在入队定位末尾节点时就要多一次循环操作。
但是这样效率还是高的,因为 tail 节点它是 volatile 变量,本质上来看是通 过增加 volatile 变量的读来减少 volatile 变量的写 ,而 对于 volatile 写作的开销是远远大于读操作的,所以入队效率会提升。大神对于性能的追求真实到了极致,源码读起来还是有用的吧!!
说到出队操作,你肯定会想到,出队是不是也要减少更新 head 节点,而直接弹出 队首元素 从而减少 CAS 更新操作以提升性能呢?
带着这个问题,我们一起往下看,实现为了便于读者理解,这里还是先放一组出队操作的快照图。
从图中得知,并不是每次出队都会 更新 head 节点,如果 head 节点的有元素时,则直接弹出 head 中的元素,清空该节点对元素的引用。如果 head 节点中元素为空,才会更新 head 节点。
有了大概的理解,然后就可以去读源码来分析了:
// 从队头出队 public E poll() { restartFromHead: for (;;) { // p 表示头节点,需要出队的节点 for (Node<E> h = head, p = h, q;;) { // 获取p节点的元素 E item = p.item; // 如果头节点p中元素不为空,则进行CAS清空p节点对元素的应用,返回p节点的元素 if (item != null && p.casItem(item, null)) { // CAS 设为成功后进入到这里,需要判断头节点p是否和head节点不是同一个了,即头节点p已经变更了 if (p != h) // hop two nodes at a time // 更新head节点,将p节点的next节点设为head节点,如果p.next不为空则设置p.next,否则设置p本身 updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 到这说明头节点p中的元素为null或者发生冲突CAS失败,CAS失败也说明被别的线程取走了当前元素,所以就该取一个节点即next节点 // 如果队列头节点p的next节点为空,说明队列已空,将则head设为p,返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } // 进到这里说明 (q = p.next) == null 返回false,即p的next不为空 // 同时q=p.next,而这个分支是说p=p.next,只有队列初始化时满足条件,两者都为空,则返回head,重头开始赋值 else if (p == q) continue restartFromHead; else // 说明p节点的next不为空,且队列不是初始化状态,所以头节点p指向p.next p = q; } } } 复制代码
首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
出队操作时,也是当 head 节点不等于 头节点 p 时,再次出队,才会将 head 设置为 最新的队头节点,减少了 CAS 操作,提升了效率。
ConcurrentLinkedQueue 无界是因为结构是用链表组成的,天生无界,当然受到系统资源大小限制;
ConcurrentLinkedQueue 在入队和出队时,均采用了减少 CAS 更新 head 和 tail 的操作,提升了性能;
ConcurrentLinkedQueue 采用非阻塞模式实现,即无锁,通过自旋和 CAS 实现线程安全;
今天学习的并发包中线程安全的无界队列 ConcurrentLinkedQueue 源码也不难,相信会让你有更深的了解,方便以后在工作中使用和应付面试。
笔者水平有限,文章难免会有纰漏,如有错误欢迎一起交流探讨,我会第一时间更正的。都看到这里了,码字不易,可爱的你记得 "点赞" 哦,我需要你的正向反馈。
(全文完)fighting!