从类名可知,LinkedBlockingQueue是基于链表实现的阻塞队列。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
类特点:
Collection
和 Iterator
接口的可选方法;
这是 two lock queue
算法的变体。putLock守卫元素的put、offer操作,且与等待存入的条件关联。takeLock原理类似。putLock和takeLock都依赖的 count
变量,被维护为一个原子变量,以避免多数情况下需同时请求两个锁。
为了最小化put时需获取takeLock,使用了层叠式通知。当put操作注意到至少一个take可以启动,就会通知获取者。如果有多个item在信号后进队,获取者将依次通知其他获取者。因此,有对称的取操作通知存操作。有些操作如remove和iterators会同时请求两个锁。
读取者和写入者间可见性如下提供:
当元素已进入队列,putLock已被获取,且count变量也更新。随后,读取者通过获取putLock或获取takeLock,得到入队元素的可见性,然后读取 n = count.get();
,令前n个元素变得可见。
为实现弱一致性迭代器,显然要从前导出队节点上保持所有节点的GC可达性。这会引起两个问题:
然而,只有未删除节点需要从已出队节点可达,GC不需要理解可达性的类型。我们使用一些小手段连接一个刚刚被出队的节点。
源码来自JDK10。
单向链表节点类,查找元素需要从链表头开始依次遍历节点
static class Node<E> { E item; // 节点数据 Node<E> next; // 下一节点 Node(E x) { item = x; } // 构造方法,存入节点的内容 }
队列容量,默认为Integer.MAX_VALUE
private final int capacity;
已存元素总数
private final AtomicInteger count = new AtomicInteger();
链表头节点
transient Node<E> head;
链表尾节点
private transient Node<E> last;
此锁被take,poll等操作持有
private final ReentrantLock takeLock = new ReentrantLock();
takes的等待队列
private final Condition notEmpty = takeLock.newCondition();
此锁被put,offer等持有
private final ReentrantLock putLock = new ReentrantLock();
puts的等待队列
private final Condition notFull = putLock.newCondition();
唤醒notEmpty队列上正在等待获取元素的线程,此方法由put/offer调用。
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
唤醒notFull队列上正在等待存入元素的线程,此方法由take/poll调用。
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
元素队尾进队
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
队头元素出队
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // 帮助GC head = first; E x = first.item; first.item = null; return x; }
上锁禁止存取操作
void fullyLock() { putLock.lock(); takeLock.lock(); }
解锁允许存取操作
void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
默认构造方法,最大容量为Integer.MAX_VALUE
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
构造方法,初始化头结点引用和为节点引用,使用指定capacity
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); // 自定义队列capacity this.capacity = capacity; // 头指针和尾指针指向同一个空节点 last = head = new Node<E>(null); }
通过集合实例初始化类,最大容量为Integer.MAX_VALUE
public LinkedBlockingQueue(Collection<? extends E> c) { // capacity设置为Integer.MAX_VALUE this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; // 没有竞争,但对可见性来说有必要 putLock.lock(); try { int n = 0; // 依次遍历集合c for (E e : c) { if (e == null) // 集合元素为空抛出NullPointerException throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); // 在集合c中取元素,用元素创建节点存入队列 enqueue(new Node<E>(e)); // 添加元素递增 ++n; } // 最后把总添加元素更新至原子值count count.set(n); } finally { putLock.unlock(); } }
获取队已存元素数量
public int size() { return count.get(); }
剩余可用队列容量
public int remainingCapacity() { return capacity - count.get(); }
插入到队列尾部,直到成功插入才返回成功
public void put(E e) throws InterruptedException { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 此锁可以被中断 putLock.lockInterruptibly(); try { // 队列已满则原地等待直到队列出现空余 while (count.get() == capacity) { notFull.await(); } // 元素插入到队列尾部 enqueue(node); // 已保存元素数量递增 c = count.getAndIncrement(); // 检查队列是否已满 if (c + 1 < capacity) // 通知其他线程继续插入元素 notFull.signal(); } finally { // 解除锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); }
在队列尾部插入指定元素,如果在指定超时时间内插入成功返回true,否则返回false
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // e为空抛出NullPointerException if (e == null) throw new NullPointerException(); // 根据超时值和时间单位转换为纳秒时长 long nanos = unit.toNanos(timeout); int c = -1; // 获取putLock final ReentrantLock putLock = this.putLock; // 获取已有元素总数 final AtomicInteger count = this.count; // 锁putLock设置为可中断 putLock.lockInterruptibly(); try { // 队列已满则原地等待直到队列出现空余 while (count.get() == capacity) { if (nanos <= 0L) // 到达超时时间,退出方法并返回false return false; // 时间倒计时 nanos = notFull.awaitNanos(nanos); } // 元素进入队列 enqueue(new Node<E>(e)); // 队列元素总数递增 c = count.getAndIncrement(); if (c + 1 < capacity) // 唤醒其他写入线程 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
把指定元素插入到队尾,如果插入成功返回true,队列已满插入失败返回false。当使用有容量限制的队列时,这个方法是比add方法更好,因为add元素添加失败会抛出异常。存入元素e为空时抛出NullPointerException。
public boolean offer(E e) { // 存入元素不能为null if (e == null) throw new NullPointerException(); // 获取队列元素总数 final AtomicInteger count = this.count; // 队列已满,退出 if (count.get() == capacity) return false; int c = -1; // 创建新节点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 获取不可中断锁 putLock.lock(); try { // 还有剩余空间 if (count.get() < capacity) { // 向队列存入元素 enqueue(node); // 队列保存元素总数递增 c = count.getAndIncrement(); // 队列没满,通知其他线程写入 if (c + 1 < capacity) notFull.signal(); } } finally { // 解锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
获取元素
public E take() throws InterruptedException { E x; int c = -1; // 获取队列元素总数 final AtomicInteger count = this.count; // 获取takeLock final ReentrantLock takeLock = this.takeLock; // 上锁,设置为可中断 takeLock.lockInterruptibly(); try { // 如果队列为空,则等待其他线程通知 while (count.get() == 0) { notEmpty.await(); } // 队列有可用元素,队头元素出列 x = dequeue(); // 队列元素总数递减 c = count.getAndDecrement(); // 如果队列还有元素,通知其他线程取数据 if (c > 1) notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
在指定等待超时时间内获取元素,到达超时时间没有则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; // 转换超时时间为纳秒 long nanos = unit.toNanos(timeout); // 获取队列元素总数 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 上锁takeLock,设置为可中断 takeLock.lockInterruptibly(); try { // 当队列为空,在超时时间内等待 while (count.get() == 0) { if (nanos <= 0L) // 到达超时时间,返回null return null; // 超时时间倒数 nanos = notEmpty.awaitNanos(nanos); } // 队头元素出列 x = dequeue(); // 队列元素总数递减 c = count.getAndDecrement(); // 队列还有可用元素,通知其他线程获取数据 if (c > 1) notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
获取元素
public E poll() { // 获取元素数量 final AtomicInteger count = this.count; // 队列中没有元素则返回null if (count.get() == 0) return null; // 队列中有元素,开始以下逻辑 E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { // 元素从对头出队 x = dequeue(); // 队列元素数量递减 c = count.getAndDecrement(); // 队列还有可用元素,通知其他线程获取数据 if (c > 1) notEmpty.signal(); } } finally { // 解锁 takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
返回队列头节点包含的数据,此操作不会改变队列节点的数量或顺序
public E peek() { // 队列没有节点直接返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 返回对头元素的内容,否则返回null return (count.get() > 0) ? head.next.item : null; } finally { // 解锁 takeLock.unlock(); } }
把节点p从列表中解除链接,pred是p的上一个节点
void unlink(Node<E> p, Node<E> pred) { // assert putLock.isHeldByCurrentThread(); // assert takeLock.isHeldByCurrentThread(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; // 节点p的内容置空 pred.next = p.next; // 操作p前后节点,令p解除链接 if (last == p) // 如果p是尾节点,则调整尾指针的指向 last = pred; if (count.getAndDecrement() == capacity) notFull.signal(); }
如果队列中存在指定元素,则把该元素从队列中移除。即使队列中存在多个相同元素,此方法只会移除其中一个。移除成功返回true,否则返回false。
public boolean remove(Object o) { // 元素为null直接返回false if (o == null) return false; fullyLock(); try { for (Node<E> pred = head, p = pred.next; p != null; pred = p, p = p.next) { // 在链表上逐个查找元素是否匹配 if (o.equals(p.item)) { // 找到匹配元素则把该元素解除链接 unlink(p, pred); // 元素返回true return true; } } // 没有移除元素 return false; } finally { fullyUnlock(); } }
检查是否包含指定元素
public boolean contains(Object o) { // 元素为null直接返回false if (o == null) return false; fullyLock(); try { // 遍历队列逐个查找元素 for (Node<E> p = head.next; p != null; p = p.next) // 找到匹配元素 if (o.equals(p.item)) return true; // 找不到匹配元素 return false; } finally { fullyUnlock(); } }
返回一个数组,此数组包含队列的所有元素,且数组元素的顺序和队列的元素的顺序一致。每次返回的数组为不同对象,修改数组是安全的。
public Object[] toArray() { fullyLock(); try { // 获取队列中元素个数 int size = count.get(); // 通过元素数量构造数组 Object[] a = new Object[size]; int k = 0; // 遍历队列,一次拷贝元素引用到数组对应索引 for (Node<E> p = head.next; p != null; p = p.next) a[k++] = p.item; // 返回数组 return a; } finally { fullyUnlock(); } }
返回包含队列所有元素的数组,数组元素顺序和队列元素顺序一致。如果传入数组空间足够,返回的数组就是传入的数组(运行时类型也一致)。否则方法内部创建类型相同新数组,数组长度和队列元素数量相等。如果传入的数组保存队列所有元素后还有空余,这个空余会被置null。
@SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { fullyLock(); try { // 获取队列元素总数 int size = count.get(); // 队列元素总数超过传入数组的长度 if (a.length < size) // 创建新的数组,长度为队列元素总数 a = (T[])java.lang.reflect.Array.newInstance (a.getClass().getComponentType(), size); int k = 0; // 依次把队列的元素存入数组中 for (Node<E> p = head.next; p != null; p = p.next) a[k++] = (T)p.item; // 如果数组还有空余空间,则该位置设为null if (a.length > k) a[k] = null; // 返回数组 return a; } finally { fullyUnlock(); } }
移除队列中所有元素,移除完成后队列元素为空
public void clear() { // 上锁 fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; // 置空节点的数据 } head = last; // 由于队列元素全清空了,所以头指针和为指针引用相同 // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) notFull.signal(); } finally { fullyUnlock(); } }
移除队列所有元素,并把这些元素添加到指定集合c中
public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); }
移除队列最多maxElements个元素,并把这些元素添加到指定集合c中
public int drainTo(Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); // 不能把本队列的元素添加到自己队列上 if (c == this) throw new IllegalArgumentException(); // maxElements须为正数 if (maxElements <= 0) return 0; boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; // takeLock上锁 takeLock.lock(); try { // 计算需要转移多少个队列元素 int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { while (i < n) { // 从列表取节点 Node<E> p = h.next; // 节点数据添加到数组中 c.add(p.item); // 置空节点的数据 p.item = null; // 引用移动到下一个节点 h.next = h; h = p; // 转移节点数递增 ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; // 已经转移了i个元素,队列的头引用需要向后移动i个位置 head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }
在任何元素没有完全上锁的情况下遍历。此种遍历必须处理以下两种情况:
Node<E> succ(Node<E> p) { if (p == (p = p.next)) p = head.next; return p; }
返回队列当前元素顺序的迭代器,元素顺序按照队列头到队列尾
public Iterator<E> iterator() { return new Itr(); }
弱一致性迭代器。懒更新祖先域提供预计O(1)的remove(),最差情况下为O(n),不管何时保存的祖先节点被并发删除。
private class Itr implements Iterator<E> { private Node<E> next; // Node holding nextItem private E nextItem; // next item to hand out private Node<E> lastRet; private Node<E> ancestor; // Helps unlink lastRet on remove() Itr() { fullyLock(); try { if ((next = head.next) != null) nextItem = next.item; } finally { fullyUnlock(); } } public boolean hasNext() { return next != null; } public E next() { Node<E> p; if ((p = next) == null) throw new NoSuchElementException(); lastRet = p; E x = nextItem; fullyLock(); try { E e = null; for (p = p.next; p != null && (e = p.item) == null; ) p = succ(p); next = p; nextItem = e; } finally { fullyUnlock(); } return x; } public void forEachRemaining(Consumer<? super E> action) { // A variant of forEachFrom Objects.requireNonNull(action); Node<E> p; if ((p = next) == null) return; lastRet = p; next = null; final int batchSize = 64; Object[] es = null; int n, len = 1; do { fullyLock(); try { if (es == null) { p = p.next; for (Node<E> q = p; q != null; q = succ(q)) if (q.item != null && ++len == batchSize) break; es = new Object[len]; es[0] = nextItem; nextItem = null; n = 1; } else n = 0; for (; p != null && n < len; p = succ(p)) if ((es[n] = p.item) != null) { lastRet = p; n++; } } finally { fullyUnlock(); } for (int i = 0; i < n; i++) { @SuppressWarnings("unchecked") E e = (E) es[i]; action.accept(e); } } while (n > 0 && p != null); } public void remove() { Node<E> p = lastRet; if (p == null) throw new IllegalStateException(); lastRet = null; fullyLock(); try { if (p.item != null) { if (ancestor == null) ancestor = head; ancestor = findPred(p, ancestor); unlink(p, ancestor); } } finally { fullyUnlock(); } } }
Spliterators.IteratorSpliterator的自定义变体
private final class LBQSpliterator implements Spliterator<E> { static final int MAX_BATCH = 1 << 25; // max batch array size; Node<E> current; // current node; null until initialized int batch; // batch size for splits boolean exhausted; // true when no more nodes long est = size(); // size estimate LBQSpliterator() {} public long estimateSize() { return est; } public Spliterator<E> trySplit() { Node<E> h; if (!exhausted && ((h = current) != null || (h = head.next) != null) && h.next != null) { int n = batch = Math.min(batch + 1, MAX_BATCH); Object[] a = new Object[n]; int i = 0; Node<E> p = current; fullyLock(); try { if (p != null || (p = head.next) != null) for (; p != null && i < n; p = succ(p)) if ((a[i] = p.item) != null) i++; } finally { fullyUnlock(); } if ((current = p) == null) { est = 0L; exhausted = true; } else if ((est -= i) < 0L) est = 0L; if (i > 0) return Spliterators.spliterator (a, 0, i, (Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT)); } return null; } public boolean tryAdvance(Consumer<? super E> action) { Objects.requireNonNull(action); if (!exhausted) { E e = null; fullyLock(); try { Node<E> p; if ((p = current) != null || (p = head.next) != null) do { e = p.item; p = succ(p); } while (e == null && p != null); if ((current = p) == null) exhausted = true; } finally { fullyUnlock(); } if (e != null) { action.accept(e); return true; } } return false; } public void forEachRemaining(Consumer<? super E> action) { Objects.requireNonNull(action); if (!exhausted) { exhausted = true; Node<E> p = current; current = null; forEachFrom(action, p); } } public int characteristics() { return (Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); } }
返回基于此队列元素的可分割迭代器。返回的可分割迭代器是弱一致性的。此迭代器可报告Spliterator#CONCURRENT、Spliterator#ORDERED、Spliterator#NONNULL。
public Spliterator<E> spliterator() { return new LBQSpliterator(); }
返回活节点p的前导节点,以便解链接p
Node<E> findPred(Node<E> p, Node<E> ancestor) { // assert p.item != null; if (ancestor.item == null) ancestor = head; // Fails with NPE if precondition not satisfied for (Node<E> q; (q = ancestor.next) != p; ) ancestor = q; return ancestor; }
移除所有集合c的元素,若集合c对象为空则抛出NullPointerException
public boolean removeAll(Collection<?> c) { Objects.requireNonNull(c); return bulkRemove(e -> c.contains(e)); }
仅保留所有集合c的元素,若集合c对象为空则抛出NullPointerException
public boolean retainAll(Collection<?> c) { Objects.requireNonNull(c); return bulkRemove(e -> !c.contains(e)); }
实现批量移除方法
@SuppressWarnings("unchecked") private boolean bulkRemove(Predicate<? super E> filter) { boolean removed = false; Node<E> p = null, ancestor = head; Node<E>[] nodes = null; int n, len = 0; do { // 1. Extract batch of up to 64 elements while holding the lock. long deathRow = 0; // "bitset" of size 64 fullyLock(); try { if (nodes == null) { if (p == null) p = head.next; for (Node<E> q = p; q != null; q = succ(q)) if (q.item != null && ++len == 64) break; nodes = (Node<E>[]) new Node<?>[len]; } for (n = 0; p != null && n < len; p = succ(p)) nodes[n++] = p; } finally { fullyUnlock(); } // 2. Run the filter on the elements while lock is free. for (int i = 0; i < n; i++) { final E e; if ((e = nodes[i].item) != null && filter.test(e)) deathRow |= 1L << i; } // 3. Remove any filtered elements while holding the lock. if (deathRow != 0) { fullyLock(); try { for (int i = 0; i < n; i++) { final Node<E> q; if ((deathRow & (1L << i)) != 0L && (q = nodes[i]).item != null) { ancestor = findPred(q, ancestor); unlink(q, ancestor); removed = true; } } } finally { fullyUnlock(); } } } while (n > 0 && p != null); return removed; }
上一篇
压缩Jekyll的HTML代码