针对JDK中的j.u.c我们已经讲了三篇文章了,计划我是分了五篇文章来细讲j.u.c中的api。今天 我们讲的BlockingQueue是第四篇,也是在使用中最频发,面试必提的一个话题。
我在网上搜了个用列表来讲解队列, 通过这个列表一眼就能了解jdk 在设计 API 时每个队列的特性,也能看出他们的相同点和不同点
队列 |
有界性 |
锁 |
数据结构 |
ArrayBlockingQueue |
bounded( 有界 ) |
加锁 |
arrayList |
LinkedBlockingQueue |
optionally-bounded |
加锁 | linkedList |
PriorityBlockingQueue |
unbounded | 加锁 | heap |
DelayQueue |
unbounded | 加锁 | heap |
SynchronousQueue |
bounded | 加锁 | 无 |
LinkedTransferQueue |
unbounded | 加锁 | heap |
LinkedBlockingDeque |
unbounded | 无锁 |
heap |
阻塞队列的概念
ArrayBlockingQueue :是一个用数组实现的有界阻塞队列,此队列按照先进先出( FIFO )的原则对元素进行排序。支持公平锁和非公平锁
LinkedBlockingQueue :一个由链表结构组成的有界队列,此队列的长度为 Integer.MAX_VALUE 。此队列按照先进先出的顺序进行排序。
PriorityBlockingQueue :一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现 compareTo() 方法来指定元素排序规则,不能保证同优先级元素的顺序
DelayQueue :一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
SynchronousQueue :一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。支持公平锁和非公平锁。 SynchronousQueue 的一个使用场景是在线程池里 LinkedTransferQueue :一个由链表结构组成的无界阻塞队列,相当于其它队列, LinkedTransferQueue 队列多了 transfer 和 tryTransfer 方法
LinkedBlockingDeque :一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半
下面我们对每个类中的API 进行了解和分析
ArrayBlockingQueue
ArrayBlockingQueue继承 AbstractQueue 实现 BlockingQueue
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
transient Itrs itrs = null;
...
}
主要实现通过 items 数组来实现数据结构。通过 takeIndex 和 putIndex 组合来标识下标, 通过 ReentrantLock 来进行加锁,通过 Condition 来对队列为元素的为空或不满 队列的阻塞
主要的方法
add //添加元素,满时会 抛异常
offer //添加元素,满时直接返回false
put //添加队列满时阻塞
poll //移除 队列第 一个元素,为空时直接返回null
take // 移除队列第一个元素,为空时等待
remove // 移除队列第一个元素,为空时异常
remove(Object) //移除指定元素,若是第一个直接移除, 否则下面的元素上移,致空最后一个。
peek // 获取队列中第一个元素
size //当前队列数
通过以上方法可以分为以下几类
添加元素
移除元素
元素个数
LinkedBlockingQueue
是一个链表队列那么实现必须使用Node来实现,看下类的定义
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* Linked list node class
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
...
}
这里的阻塞也是通过Condition来实现
主要方法
add //添加 元素,满时抛异常
put //添加元素,满时阻塞
offer// 添加元素,满时返回false
poll //移除元素,队列为空时返回null
remove //移除元素,队列为空 抛异常
remove(Object) //移除指定元素,移除时需要在链表中查找元素,然后在移除
take //移除 元素为空阻塞
size //获取 元素个数
drainTo //一次性从队列中获取所有( 可指定个数)可用元素,获取后在队列移除
iterator //转化成迭代器
remainingCapacity 获取 剩余队列大小
sliterator //转化迭代器( JDK1.8)
PriorityBlockingQueue
PriorityBlockingQueue是通过Comparator接口实现对象比较
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = 5595510919245408276L;
}
主要方法
add //添加元素, 满时跑异常
offer //添加元素,满时返回false
put //添加元素,满时阻塞
poll //移除 元素,队列为空时返回null
take // 移除元素, 队列为空阻塞
remove //移除元素,队列为空时抛异常
remove(Object)
comparator //返回此队列元素进行排序的比较器
drainTo // 一次性从队列中 获取所有(可指定范围)元素
iterator //转化换代器
peek //获取队列中第一个元素
remainingCapacity //获取剩余 队列大小
spliterator //转化换代器(jdk1.8)
size 返回队列元素个数
DelayQueue
内部使用是非线程安全的优先队列 PriorityQueue
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
主要方法
add //添加元素
offer //添加元素
put //添加元素,满时阻塞
take //移除元素,为空阻塞;延时时阻塞;有leader线程时阻塞;自己为leader时阻塞
poll //移除元素,为空返回null
remove //移除元素
size //返回队列元素个数
remainingCapacity
iterator //转化成迭代器
drainTo //获取所有(可指定个数)可用元素
clear //清除队列
toArray //转化为数组
SynchronousQueue
内部定义抽象静态类Transferer,通过两个内部类来实现公平(TransferQueue)和非公平(TransferStack)模式下的队列操作。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
...
}
TransferStack 内部通过单向链表SNode实现双重栈,数据是 LIFO的 顺序。TransferQueue是通过内部定义QNode 实现,数据是FIFO的顺序。
主要方法
put添加元素后阻塞
offer 添加元素,直接返回false,若其它线程正好取出返回true
take 移除元素,若未有元素时阻塞
poll 移除元素,直接返回null,若其它线程正好添加返回元素
其它方法都是默认值。没有实际意义!
LinkedTransferQueue
LInkedTransferQueue是TransferQueue 的实现类,是一个无界队列, 具有先进先出FIFO特性
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
}
主要方法
transfer 当存在一个正在等待获取的消费线程,则立即传输数据。否则会插入到队列尾部并阻塞等待消费线程
tryTransfer 肖存在一个正在等待获取的消费线程,则立即传输,否则返回false
hasWaitingConsumer 判断是否存在消费者线程
getWaitingConsumerCount 获取所有等待获取元素的消费线程数量
LinkedBlockingDeque
在分析LinkedBlockingDeque一般与LinkedBlockingQueue一起分析。这两个比较相似。在上面刚刚分析了LinkedBlockingQueue它是链表通过Node来实现,但入队和出队只能一端。而LInkedBlockingDeque也是链表,它可以通过两端同时操作(入队和出队)。
原因是 LinkedBlockingDeque 中的 Node是 双向链表, LinkedBlockingQueue 是单向链表
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/** Doubly-linked list node class */
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
}
主要方法
add /addFirst/addLast
offer/offerFirst/offerLast
poll/pollFirst/pollLast
put/putFirst/putLast
remove/removeFirst/removeLast/pop
getFirst/getLast
以上是根据个人理解做了分析,如有不正确请留言讨论。
----------------------------------------------------------
再次感谢,欢迎关注微信公众号“零售云技术”,文章持续更新,或留言讨论