阻塞队列中目前还剩下一个比较特殊的队列实现,相比较前面讲解过的队列,本文中要讲的LinkedBlockingDeque比较容易理解了,但是与之前讲解过的阻塞队列又有些不同,从命名上你应该能看出一些端倪,接下来就一起看看这个特殊的阻塞队列
JDK版本号:1.8.0_171
LinkedBlockingDeque在结构上有别于之前讲解过的阻塞队列,它不是Queue而是Deque,中文翻译成双端队列,双端队列指可以从任意一端入队或者出队元素的队列,实现了在队列头和队列尾的高效插入和移除
LinkedBlockingDeque是链表实现的线程安全的无界的同时支持FIFO、LIFO的双端阻塞队列,可以回顾下之前的LinkedBlockingQueue阻塞队列特点,本质上是类似的,但是又有些不同:
Queue和Deque的关系有点类似于单链表和双向链表,LinkedBlockingQueue和LinkedBlockingDeque的内部结点实现就是单链表和双向链表的区别,具体可参考源码
在第二点中可能有些人有些疑问,两个互斥锁和一个互斥锁的区别在哪里?我们可以考虑以下场景:
A线程先进行入队操作,B线程随后进行出队操作,如果是LinkedBlockingQueue,A线程入队过程还未结束(已获得锁还未释放),B线程出队操作不会被阻塞等待(锁不同),如果是LinkedBlockingDeque则B线程会被阻塞等待(同一把锁)A线程完成操作才继续执行
LinkedBlockingQueue一般的操作是获取一把锁就可以,但有些操作例如remove操作,则需要同时获取两把锁,之前的LinkedBlockingQueue讲解曾经说明过,这里就不详细讲解了
实现BlockingDeque接口,其中定义了双端队列应该实现的方法,具体方法不说明了,主要是每个方法都分为了First和Last两种方式,从头部或者尾部进行队列操作
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable
/** * 头结点 */ transient Node<E> first; /** * 尾结点 */ transient Node<E> last; /** 双端队列实际结点个数 */ private transient int count; /** 双端队列容量 */ private final int capacity; /** 互斥重入锁 */ final ReentrantLock lock = new ReentrantLock(); /** 非空条件对象 */ private final Condition notEmpty = lock.newCondition(); /** 非满条件对象 */ private final Condition notFull = lock.newCondition();
为了实现双端队列,内部使用了双向链表,不像LinkedBlockingQueue使用的是单链表,前驱和后继指针的特殊情况需要注意
/** 双向链表Node */ static final class Node<E> { /** * 节点数据值,移除则为null */ E item; /** * 下列情况之一: * - 前驱节点 * - 前驱是尾节点,则prev指向当前节点 * - 无前驱节点则为null */ Node<E> prev; /** * 下列情况之一: * - 后继节点 * - 后继是头节点,则next指向当前节点 * - 无后继节点则为null */ Node<E> next; Node(E x) { item = x; } }
构造方法比较简单,可设置容量上限,不设置默认Integer.MAX_VALUE,类似之前的LinkedBlockingQueue
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; // 确保可见性 lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
在入队操作中,将每种方法都分成了队首入队和队尾入队两种,在获取锁之后最终调用方法是linkFirst/linkLast。在putFirst中如果队列已满则通过notFull.await()阻塞操作,比较容易理解
入队方法如下:
类别 | 失败返回特殊值 | 失败抛异常 | 阻塞等待 | 超时阻塞等待 |
---|---|---|---|---|
队首 | offerFirst | push/addFirst | putFirst | offerFirst(E e, long timeout, TimeUnit unit) |
队尾 | offer/offerLast | add/addLast | put/putLast | offer/offerLast(E e, long timeout, TimeUnit unit) |
public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } } public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); } finally { lock.unlock(); } }
将结点添加到头部或者尾部,如果队列已满则返回false,在调用方法之前已经获取互斥锁才进行操作
/** * Links node as first element, or returns false if full. */ private boolean linkFirst(Node<E> node) { // assert lock.isHeldByCurrentThread(); // 队列已满 if (count >= capacity) return false; // 更新first Node<E> f = first; node.next = f; first = node; // last为空则表示原队列为空,当前队列仅有node则last更新指向node即可 if (last == null) last = node; else // 原队列非空,更新原first的前驱指针 f.prev = node; ++count; // 通过条件对象唤醒出队操作阻塞线程 notEmpty.signal(); return true; } /** * Links node as last element, or returns false if full. */ private boolean linkLast(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; // 更新last Node<E> l = last; node.prev = l; last = node; // first为空则表示原队列为空,当前队列仅有node则first更新指向node即可 if (first == null) first = node; else // 原队列非空,更新原last的后继指针 l.next = node; // 实际节点数量 + 1 ++count; // 通过条件对象唤醒出队操作阻塞线程 notEmpty.signal(); return true; }
在出队操作中,同样将每种方法都分成了队首出队和队尾出队两种,在获取锁之后最终调用方法是unlinkFirst/unlinkLast。在takeFirst中如果队列为空则通过notEmpty.await()阻塞操作,比较容易理解
出队方法如下:
类别 | 失败返回特殊值 | 失败抛异常 | 阻塞等待 | 超时阻塞等待 |
---|---|---|---|---|
队首 | poll/pollFirst | pop/remove/removeFirst | take/takeFirst | poll/pollFirst(long timeout, TimeUnit unit) |
队尾 | pollLast | removeLast | takeLast | pollLast(long timeout, TimeUnit unit) |
public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } }
移除返回队头元素或队尾元素,假如队列为空则返回null
/** * Removes and returns first element, or null if empty. */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; // 判空 if (f == null) return null; // 更新first Node<E> n = f.next; E item = f.item; // item置空,next指向自己,方便gc回收 f.item = null; f.next = f; // help GC first = n; // 当前队列为空,last指向置空 if (n == null) last = null; else // 当前队列非空,新的头结点前驱置空 n.prev = null; // 实际结点数量 - 1 --count; // 通过条件对象唤醒入队操作阻塞线程 notFull.signal(); return item; } /** * Removes and returns last element, or null if empty. */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) return null; // 更新last Node<E> p = l.prev; // item置空,prev指向自己,方便gc回收 E item = l.item; l.item = null; l.prev = l; // help GC last = p; // 当前队列为空,last指向置空 if (p == null) first = null; else // 当前队列非空,新的尾结点后继置空 p.next = null; --count; // 通过条件对象唤醒入队操作阻塞线程 notFull.signal(); return item; }
将队列中匹配的结点删除,链表中进行解除前后关联即可,注意下如果x处于队列中间(非头和尾结点),则x本身的前驱和后继指针不会被更新修改,为的是防止迭代器循环到x找不到前后结点,避免迭代器异常
/** * Unlinks x. */ void unlink(Node<E> x) { // assert lock.isHeldByCurrentThread(); // 前驱结点和后继结点 Node<E> p = x.prev; Node<E> n = x.next; // 前驱为空,相当于删除头结点 if (p == null) { unlinkFirst(); } else if (n == null) { // 后继为空,相当于删除尾结点 unlinkLast(); } else { // 前驱后继都不为空,解除删除结点与前后结点的关系,item置空 // 注意,这里x本身的前驱和后继没有被更新修改,为的是防止迭代器循环到x找不到前后结点,避免迭代器异常 p.next = n; n.prev = p; x.item = null; --count; // 通过条件对象唤醒入队操作阻塞线程 notFull.signal(); } }
peek操作直接通过首尾节点指向获得对应的item即可,不会删除节点
public E peekFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } public E peekLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return (last == null) ? null : last.item; } finally { lock.unlock(); } }
删除第一个/最后一个满足条件的队列结点,removeFirstOccurrence从前向后进行匹配,removeLastOccurrence从后向前进行匹配,找到第一个满足条件的结点进行删除操作
public boolean removeFirstOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { // 从前向后循环判断相等则通过unlink移除 for (Node<E> p = first; p != null; p = p.next) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } public boolean removeLastOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { // 从后向前循环判断相等则通过unlink移除 for (Node<E> p = last; p != null; p = p.prev) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } }
转移队列操作
public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); for (int i = 0; i < n; i++) { // 先添加防止未添加到新队列中时原队列结点出队 c.add(first.item); // In this order, in case add() throws. // 解除关联 unlinkFirst(); } return n; } finally { lock.unlock(); } }
清空队列操作,比较简单,很好理解
public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> f = first; f != null; ) { f.item = null; Node<E> n = f.next; f.prev = null; f.next = null; f = n; } first = last = null; count = 0; notFull.signalAll(); } finally { lock.unlock(); } }
由于其双向链表的实现,迭代器可分为升序迭代器(Itr)和倒序迭代器(DescendingItr),通过AbstractItr封装公共操作方法,Itr和DescendingItr分别实现对应不同的方法,模板方法设计模式写法可借鉴,一个从头节点开始向后进行遍历,一个从尾节点向后进行遍历
private abstract class AbstractItr implements Iterator<E> { /** * next方法返回的node */ Node<E> next; /** * 保存next的item,防止hasNext为true后节点被删除再调用next获取不到值的情况 */ E nextItem; /** * 最近一次调用next返回的节点,如果通过调用remove删除了此元素,则重置为null */ private Node<E> lastRet; // 两个不同迭代器实现不同 abstract Node<E> firstNode(); abstract Node<E> nextNode(Node<E> n); // 构造方法初始化,设置next和nextItem AbstractItr() { // set to initial position final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { next = firstNode(); nextItem = (next == null) ? null : next.item; } finally { lock.unlock(); } } /** * 返回后继节点 */ private Node<E> succ(Node<E> n) { for (;;) { Node<E> s = nextNode(n); if (s == null) return null; else if (s.item != null) return s; else if (s == n) return firstNode(); else n = s; } } /** * 设置下一次执行next应该返回的值 */ void advance() { final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { // assert next != null; next = succ(next); nextItem = (next == null) ? null : next.item; } finally { lock.unlock(); } } public boolean hasNext() { return next != null; } public E next() { if (next == null) throw new NoSuchElementException(); lastRet = next; E x = nextItem; advance(); return x; } // 移除操作,注意,这里直接在原队列中移除了lastRet对应的节点 public void remove() { Node<E> n = lastRet; if (n == null) throw new IllegalStateException(); lastRet = null; final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { if (n.item != null) unlink(n); } finally { lock.unlock(); } } } /** Forward iterator */ // 从头节点向后遍历迭代器 private class Itr extends AbstractItr { Node<E> firstNode() { return first; } Node<E> nextNode(Node<E> n) { return n.next; } } /** Descending iterator */ // 从尾节点向后遍历迭代器 private class DescendingItr extends AbstractItr { Node<E> firstNode() { return last; } Node<E> nextNode(Node<E> n) { return n.prev; } }
源码分析完毕,整理LinkedBlockingDeque有如下特点:
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢