概述
BlockingQueue 意为“阻塞队列”,它在 JDK 中是一个接口。
所谓阻塞,简单来说就是当某些条件不满足时,让线程处于等待状态。 例如经典的“生产者-消费者”模型,当存放产品的容器满的时候,生产者处于等待状态; 而当容器为空的时候,消费者处于等待状态。阻塞队列的概念与该场景类似。
BlockingQueue 的 继承关系如下:
可以看到 BlockingQueue 继承自 Queue 接口, 它的常用实现类有 ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue 等。 同前面一样,本文先分析 Bl ockingQueue 接口的方法定义,后文再分析其实现类的代码。
PS: 从这个继承体系也可以看出来,直接实现接口的是抽象类,而实现类则通常继承自抽象类。为什么要这样设计呢?因为有些接口的实现类会有多个,而这些类之间有一部分逻辑是相似或者相同的,因此就把这部分逻辑提取到抽象类中,避免代码冗余。
代码分析
BlockingQueue 的方法定义如下:
其方法简 单分析如下:
// 将指定元素插入到队列,若成功返回 true,否则抛出异常
boolean add(E e);
// 将指定元素插入到队列,若成功返回 true,否则返回 false
boolean offer(E e);
// 将指定元素插入到队列,若队列已满则等待
void put(E e) throws InterruptedException;
// 将指定元素插入到队列,若成功返回 true,否则返回 false,有超时等待
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException
// 获取并移除队列的头部,若为空则等待
E take() throws InterruptedException;
// 获取并移除队列的头部,有超时等待(若超时返回 null)
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 返回队列可以接收的容量,若无限制则返回 Integer.MAX_VALUE
int remainingCapacity();
// 删除指定的元素(如果存在),返回是否删除成功
boolean remove(Object o);
// 是否包含指定元素
public boolean contains(Object o);
// 从此队列中删除所有可用元素,并将它们添加到给定集合中
int drainTo(Collection<? super E> c);
// 从此队列中删除所有可用元素,并将它们添加到给定集合中(指定大小)
int drainTo(Collection<? super E> c, int maxElements);
主要方法小结如下:
Throws exceptions |
Special value |
Blocks |
Time out |
|
Insert |
add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
Remove |
remove() |
poll() |
take() |
poll(time, unit) |
Examine |
element() |
peek() |
- |
- |
Queue 接口前文「 JDK源码-Queue, Deque 」 已进行分析 ,这里不再赘述。
典型用法
生产者:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
// 将产品放入队列
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
消费者:
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
// 从队列中消费产品
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
测试类:
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
// 创建并启动一个生产者和两个消费者
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
PS: 上述代码是 BlockingQueu e 的文档提供的,仅供参考。
小结
BlockingQueue 是一个接口,它主要定义了阻塞队列的一些方法。阻塞队列在并发编程中使用较多,比如线程池。
相关阅读:
JDK源码-Queue, Deque
Stay hungry, stay foolish.