public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
/** * Linked list node class * * 链表Node * next引用指向后一个Node */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ // 链表容量大小,不传参数,默认Integer.MAX_VALUE // 这里final说明一旦确定链表容量就不能再改变了 private final int capacity; /** Current number of elements */ // 队列实际包含元素的长度,这里使用了原子类保证数据的准确性 private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ // 链表头节点,head.item == null transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ // 链表尾节点,last.next == null private transient Node<E> last; /** Lock held by take, poll, etc */ // 出队操作互斥锁 private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ // 非空信号量,当无元素时阻塞等待入队操作 private final Condition notEmpty = takeLock.newCondition(); // 入队操作互斥锁 /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ // 非满信号量,当队列已满时阻塞等待出队操作 private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; // 这里使用锁的原因和ArrayBlockingQueue相同,确保可见性,因为链表本身并不保证可见性,防止并发操作下链表不一致的情况出现 putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } // 设置队列长度 count.set(n); } finally { putLock.unlock(); } }
/** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
入队出队最终调用的方法,enqueue方法首先将原尾节点的next引用指向新节点,然后将尾节点更新为新节点。dequeue方法移除头节点,更新头节点,注意这里实际上返回的节点是第二个节点,因为头节点head.item == null
/** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } /** * Removes a node from head of queue. * * @return the node */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; // 使用h保存原头节点,头节点这里head.item == null Node<E> h = head; // 使用first保存原头节点之后的节点,实际上的第一个节点,我们需要的出队节点也是这个节点 Node<E> first = h.next; // 原头节点next指向自己,这里指向自己在后边迭代器中nextNode方法中有用到,通过这种方式判断节点类型 h.next = h; // help GC // 头节点更新为第二个节点 head = first; // 保存第二个节点的值 E x = first.item; // 更新头节点head.item == null,之后这个节点将作为头节点 first.item = null; return x; }
/** * Locks to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlocks to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 可中断putLock锁 putLock.lockInterruptibly(); try { // 队列已满则入队操作线程阻塞等待 while (count.get() == capacity) { notFull.await(); } // 队列未满则入队操作 enqueue(node); // 原子类更新队列长度值,返回值为原count的值 c = count.getAndIncrement(); // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 队列原本有0条数据,现在有了1条数据,则唤醒消费线程进行消费 // 因为原本队列无元素,消费线程都被阻塞,只需要判断有一条数据的时候就可以 if (c == 0) signalNotEmpty(); }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 队列为空则等待 while (count.get() == 0) { notEmpty.await(); } // 出队操作 x = dequeue(); // 减1 c = count.getAndDecrement(); // 队列是否有可用空间 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 队列原本是满的状态,现在一条数据出队,则可以唤醒非满信号量,进行入队操作 if (c == capacity) signalNotFull(); return x; }
/** * Unlinks interior Node p with predecessor trail. */ void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. // 删除节点置空 p.item = null; // 删除节点前一个节点指向删除节点的后一个节点 trail.next = p.next; // p是最后一个节点,则删除p后最后一个节点为trail if (last == p) last = trail; // 删除p之前队列是已满状态则删除p后调用notFull.signal()唤醒入队线程操作 if (count.getAndDecrement() == capacity) notFull.signal(); }
/** * Returns {@code true} if this queue contains the specified element. * More formally, returns {@code true} if and only if this queue contains * at least one element {@code e} such that {@code o.equals(e)}. * * @param o object to be checked for containment in this queue * @return {@code true} if this queue contains the specified element */ public boolean contains(Object o) { if (o == null) return false; // 获取两个锁 fullyLock(); try { // 正常循环判断 for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } }
public int drainTo(Collection<? super E> c, int maxElements) { // 检查 if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; // 唤醒非满信号量为false boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; // 获取takeLock互斥锁 takeLock.lock(); try { // 获取能获取队列的正常长度 int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { // 将值放入c while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } // 返回获取的队列长度n return n; } finally { // Restore invariants even if c.add() threw // 重新保存常量 if (i > 0) { // assert h.item == null; head = h; // getAndAdd返回未执行前的值,与队列容量相等,则说明之前队列是已满状态,入队线程全部阻塞, // 而这里i > 0 则表明此时操作完队列未满可以唤醒入队线程 signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); // 唤醒入队线程 if (signalNotFull) signalNotFull(); } }
public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { /* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ private Node<E> current; private Node<E> lastRet; private E currentElement; Itr() { fullyLock(); try { // 保存头节点的下一个节点,头节点是无值的 current = head.next; if (current != null) // 获取第一个有值的数据 currentElement = current.item; } finally { fullyUnlock(); } } public boolean hasNext() { return current != null; } /** * Returns the next live successor of p, or null if no such. * * Unlike other traversal methods, iterators need to handle both: * - dequeued nodes (p.next == p) * - (possibly multiple) interior removed nodes (p.item == null) */ // 找到迭代的下一个节点 private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; // p.next == p 说明已经出队,上边方法dequeue中有提到 // 这里直接使用head.next即可 if (s == p) return head.next; // 节点为空或者节点值不为空则返回这个节点 // 节点为空说明是队列尾,直接返回即可 // 节点值不为空说明已找到下一个节点,同样返回 if (s == null || s.item != null) return s; // s.item == null 可能节点被删除了,则继续判断下一个节点 p = s; } } public E next() { fullyLock(); try { if (current == null) throw new NoSuchElementException(); E x = currentElement; // 保存上次迭代的值 lastRet = current; // 计算保存下次迭代值 current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } // 调用迭代的remove删除的是lastRet对应的值 public void remove() { if (lastRet == null) throw new IllegalStateException(); fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { // 删除p,调整链表 unlink(p, trail); break; } } } finally { fullyUnlock(); } } }
ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { pool.submit(() -> System.out.println(Thread.currentThread().getName())); }
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());