概述
前文「 JDK源码分析-PriorityQueue 」分析了优先队列 PriorityQueue,它既不是阻塞队列,而且线程不安全。本文分析线程安全的阻塞优先队列 PriorityBlockingQueue。它的继承结构如下:
PriorityBlockingQueue 与 PriorityQueue 的内部结构类似,也是物理上由数组、逻辑上由堆结构实现的,并且使用 ReentrantLock 实现线程安全。除此之外, 二者大部分操作都是类似的。
因此,有了前文的铺垫,这里相对更容易理解一些。下面分析其代码实现。
代码分析
主要成员变量
// 内部数组的默认初始化容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 内部数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 保存元素的内部数组
private transient Object[] queue;
// 队列中元素的数量
private transient int size;
// 队列中元素的比较器
private transient Comparator<? super E> comparator;
// 互斥锁(保证线程安全)
private final ReentrantLock lock;
// 表示队列非空的条件
private final Condition notEmpty;
// 扩容时使用的自旋锁,通过 CAS 获取(后面分析)
private transient volatile int allocationSpinLock;
// 一个普通的优先队列,主要用于序列化和反序列化
private PriorityQueue<E> q;
构造器
// 构造器 1:使用默认的初始化容量创建一个对象
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// 构造器 2:使用给定的容量创建一个对象
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
// 构造器 3:使用给定的容量和比较器创建一个对象
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
上面几个构造器都是比较简单的赋值。除此之外,还有一个用给定集合初始化的构造器,如下:
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
// 是否需要堆化
boolean heapify = true; // true if not known to be in heap order
// 是否需要筛选空值
boolean screen = true; // true if must screen for nulls
// 给定集合为 SortedSet
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false; // 已经有序,不需要再堆化
}
// 给定集合为 PriorityBlockingQueue
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false; // 不需要筛选判空
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false; // 不需要堆化
}
// 集合转为数组
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
// 集合内所有元素都不能为空
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify(); // 堆化
}
堆化操作 heapify 代码如下:
private void heapify() {
Object[] array = queue;
int n = size;
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
// 根据比较器(Comparator)是否为空,采用不同的策略
// PS: 二者操作基本一样,只是 Comparator 和 Comparable 的区别
if (cmp == null) {
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
siftDownUsingComparator 代码如下:
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
// 数组的中间位置
int half = n >>> 1;
while (k < half) {
// 获取索引为 k 的节点左子节点索引
int child = (k << 1) + 1;
// 获取 child 的值
Object c = array[child];
// 获取索引为 k 的节点右子节点索引
int right = child + 1;
// 比较左右子节点的值,取较小的一个
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
// 给定的元素 x 与其较小的子节点的值比较,若 x 不大于子节点的值,停止交换
if (cmp.compare(x, (T) c) <= 0)
break;
// 将 x 与其较小的子节点互换位置
array[k] = c;
k = child;
}
array[k] = x;
}
}
该方法与 PriorityQueue 中的 siftDownUsingComparator 方法操作几乎完全一致,可参考前文的分析,这里不再赘述( siftDownComparable 方法亦是如此)。
入队方法 :add(E), put(E), offer(E, timeout, TimeUnit), offer(E)
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
上述三个方法内部都是通过 offer(e) 方法实现的,因此只需分析 offer(e) 方法即可:
public boolean offer(E e) {
// 插入元素不能为空
if (e == null)
throw new NullPointerException();
// 获取锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 如果容量不够,则进行扩容(注意这里是一个循环)
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap); // 尝试扩容
try {
Comparator<? super E> cmp = comparator;
// 根据 Comparator 是否为空采用不同的堆化策略
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 有新元素插入了,唤醒 notEmpty 条件下等待的线程(消费者)
notEmpty.signal();
} finally {
// 释放锁
lock.unlock();
}
return true;
}
下面分析一下扩容操作 tryGrow:
private void tryGrow(Object[] array, int oldCap) {
// 释放锁
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 尝试以 CAS 方式修改 allocationSpinLock 的值(将 0 改为 1)
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 若旧容量 n 较小(小于 64),则扩容为 2 * n + 2,否则扩容为 1.5 * n
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 创建一个新数组
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 将 allocationSpinLock 重置为 0
allocationSpinLock = 0;
}
}
// newArray 为空表示未进行上述扩容操作,则当前线程让出 CPU 时间
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 尝试获取锁
lock.lock();
// 到这里表示扩容成功
// queue == array 保证老数据复制一次
if (newArray != null && queue == array) {
// 扩容后的新数组
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
这个扩容方法比较有意思:它刚开始会释放锁,而后再重新获取锁。
1. 为什么刚开始要释放锁?
由于该锁是全局的, 其他大部分公有(public)方法也会用到;而扩容操作又相对比较耗时,若这里不释放,则某个线程扩容时其他方法调用可能会阻塞。
2. 释放锁之后如何保证线程安全?
这就用到了成员变量 allocationSpinLock,使用了 Unsafe 类的 CAS 操作。它尝试将 allocationSpinLock 的值设置为 1,而一旦操作成功,其他线程就无法进入,直到该线程将它重置为 0. 这就保证了同一时间内只能有一个线程在扩容。
3. 在释放锁后的扩容操作中,先后可能会有多个线程扩容,也即会产生 多个新容量的空数组 ( 此时 它们都未指向 原先的数组 queue),如何避免老数据多次复制到新数组呢?
代码里用到了 queue == array 这个判断。
比如线程 T1 和 T2 都对原数组进行了扩容,得到了两个 newArray,在后面复制老数据时,若其中一个线程已经对 queue 重新赋值并复制后,由于 queue 已经改变, 后面的线程就不会再复制一次了。
出队方法 :poll(), take(), peek()
// 出队
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
// 出队(队列为空时阻塞)
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
// 有超时等待的出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
可以看到这几个出队的操作都加了锁,内部都调用了 dequeue 方法:
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 取数据中的第一个元素
E result = (E) array[0];
// 获取最后一个元素
E x = (E) array[n];
// 将最后一个元素置空,并恢复堆结构
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
该方法与 PriorityQueue 的出队操作 poll() 类似,也不再赘述。
小结
1. PriorityBlockingQueue 是优先队列的阻塞 方式 实现,它 与 PriorityQueue 内部结构类似,即物理结构是可变数组、逻辑结构是堆;
2. PriorityBlockingQueue 内部元素不能为空,且可比较,使用 ReentrantLock 保证 线程安全。
参考链接:
https://juejin.im/post/5cc258796fb9a03228616e6e
https://blog.csdn.net/codejas/article/details/89190774
相关阅读:
JDK源码分析-PriorityQueue
JDK源码分析-ReentrantLock
Stay hungry, stay foolish.