刚一听到阻塞队列,就觉得它非常地高大上,非常地难!其实不然!为什么?因为当你有一点基本的数据结构基础再看阻塞队列的定义之后你就会发现就那么回事。好了,言归正传,队列?无非就是一种具有先进先出 (FIFO)
特性的数据结构嘛!其最基本的操作是 入队
和 出队
。
那上面是阻塞队列呢?我们来看下关于它的一番定义:
阻塞队列(BlockingQueue)是一个支持两个附加操作的一种特殊队列。这两个附加的操作是:
阻塞队列经常用于 生产者和消费者
的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素,所谓容器,就是我们之前文章讲到的临界区,是为了将生产者和消费者进行解耦而加入的。
那么我们就要开始问了,他的基本操作是怎样的呢?怎么实现队列的阻塞呢?下面是Java中阻塞队列支持的相关操作:
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
入队方法
|
add(e)
|
offer(e)
|
put(e)
|
offer(e,time,unit)
|
出队方法
|
remove()
|
poll()
|
take()
|
poll(time,unit)
|
检查方法
|
element()
|
peek()
|
不可用
|
不可用
|
从上表我们可以看出 put()
和 take()
方法当队列满或为空的情况下会一直阻塞,阻塞队列会提供对这两个操作的支持。
接下来我将列出 JDK
中对阻塞队列的相关实现,并见到那挑选其中的某个实现进行源码分析。
JDK
中阻塞队列有以下实现:
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。 LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。 PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。 DelayQueue:一个使用优先级队列实现的无界阻塞队列。 SynchronousQueue:一个不存储元素的阻塞队列。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
对了,小编说说我对有界和无界的理解,我也不清楚对还是不对,就那么回事,不对的话麻烦你评论告诉小编,灰常感谢!
从实现方面讲:
有界
: 指的是实现里头持有的资源(数组)是有大小的,即容量是有限的
无界
: 指的是持有一个无界的链表
从访问方式看:
无界
: 指的是不拒绝某些线程的访问
有界
: 指的是拒绝某些线程的访问
ArrayBlockingQueue.java
对于阻塞队列的学习,我们要时常在脑子里模拟并发对其操作的场景。让我们先来看看 ArrayBlockingQueue
中声明的相关成员变量:
/** 存放队列元素的数组 */ final Object[] items; /** 下一次调用 take, poll, peek 或者 remove 时元素的下标 */ int takeIndex; /** 下一次调用 put, offer, 或者 add 方法时元素的下标*/ int putIndex; /** 队列元素的大小,相当于ArrayList中的size int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook.经典的双条件算法 */ /** Main lock guarding all access */ final ReentrantLock lock; /** 判断是否为空的条件变量,用来表示队列空与非空的状态 */ private final Condition notEmpty; /** 判断是否满了条件变量 用来表示队列满或没满的状态 */ private final Condition notFull; /** * 当前活动迭代器的共享状态,或者如果已知不存在,则返回null。 允许队列操作更新迭代器状态。 * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ //在迭代器和它们的队列之间共享数据,允许在删除元素时修改队列以更新迭代器 transient Itrs itrs = null;
从上面可以看出, ArrayBlockingQueue
拥有一个存储元素的数组 items
及其相关的 出队
、 入队
下标及容量 count
,这些都是对基本的属性。再往下看可以看出队列中使用了经典的双条件算法,即拥有两个条件变量 Condition
类型的变量, Condition
是JDK提供的在基本同步方法 notify()、wait()、notifyAll()
的基础上进行优化的工具类,它提供了代替 wait(),notify()
等方法的相应版本 await()、signal()
方法。一般来说, Condition
的使用一般结合一个锁来实现, ArrayBlockingQueue
中使用了可重入锁 ReentrantLock
,即经典的一锁双条件。
如果还不理解,想想我们在生产者和消费者文中代码里写的,在调用 notify(),wait()
等方法时必须先在 synchnorized{}
同步块下获得锁,道理是一样的,你调用 await()、signal()
的时候也需要进行 lock()
获得锁)
而且 Condition
和 ReentrantLock
都是不可变的, final
修饰,多线程安全啦!
知道了其成员变量,我们再来看看其相应的构造方法:
/** * 使用给定的容量大小和默认的存取策略(FIFO)初始化一个ArrayBlockingQueue */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * 使用给定的容量大小和给定的存取策略初始化一个ArrayBlockingQueue * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified.(也就是下一步获得锁的还不指定是谁) * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { /** * 你容量不能为负数吧! */ if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //你初始化ArrayBlockingQueue的时候也要初始化你的成员变量吧!一所双条件很重要啊! lock = new ReentrantLock(fair); //Condition对象由锁来进行创建 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** * 通过给定的容量大小、存取策略,使用给定的Collection来初始化数据 * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { /** * 这一步同上 */ this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 这里只是为了可见性(为什么?),而不是指相互排斥,想想你刚创建这个对象,哪有什么互相排斥嘛! try { int i = 0; try { for (E e : c) { /** * 先检查后操作机制,是一种很好的编程规范 */ checkNotNull(e); items[i++] = e; } /** * 一般来说,c的大小是小于等于capacity的,否则报错了 */ } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
从其构造器可以看出,我们可以在初始化时指定一个容量大小,也可以通过传入一个 Collection
来初始化数据。同时我们也可以看出,在最后一个构造方法中使用了 checkNotNull()
方法,其实这是一种很有用的机制,优秀的框架一般都会这样子去写,比如Spring的 Asserts.java
,这也是一种断言机制,就是说我们很确定程序到达这一步一定是正确的,当然,如果不正确,那么肯定抛出异常啦!下面我们看看各种入队和出队的操作吧!
入队
put()
/** * 典型的生产者嘛!插入一个元素到尾部,一直等到(阻塞)直到已经满的队列变为非满状态 */ public void put(E e) throws InterruptedException { /** * 先检查后操作,如果是空,就抛出异常 */ checkNotNull(e); final ReentrantLock lock = this.lock; /** * 获得lock的锁,除非当前线程被中断了,也就是说当前的线程如果被中断,我们连锁都得不到,还抛出可怕的异常 */ lock.lockInterruptibly(); try { /** * 如果队列满了,肯定得阻塞嘛!难道满了还加?还阻不阻塞了 */ while (count == items.length) /** * 当调用await方法后,当前线程会释放lock锁并进入Condition变量的等待队列,而其他线程调用signal方法后,通知正在Condition变量等待队列的线程从await方法返回,并且在返回前已经获得了锁。 */ notFull.await(); //如果不满,那么就入队 enqueue(e); } finally { lock.unlock();//解锁 } }
put()
方法很显然就是典型生产者消费者模型中的生产者角色。只不过当满了的时候是通过调用 await()
的方法阻塞当前线程且释放锁,被阻塞的当前线程将进入 Condition
对象提供的等待队列中去排队,直到有元素从阻塞队列出队时,会调用 notFull.signal()
唤醒线程。
take()
public E take() throws InterruptedException { /** * 先加锁 */ final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { /** * 如果队列是空,那取个啥子,直接阻塞 */ while (count == 0) notEmpty.await(); //出队 return dequeue(); } finally { lock.unlock(); } }
可以看出出队的操作非常地简单粗暴,下面我们再看看两个常用的内部方法:
enqueue()
/** * 真正的入队操作,能执行到此方法,说明你已经获得锁了,且当前线程符合生产者消费者模型的要求(即put时未满,take时非空) */ private void enqueue(E x) { final Object[] items = this.items; //还记得putIndex指的是什么吗?指的就是下一个可以入队的元素下标 items[putIndex] = x; //改变相应的下标,这可能一眼看不懂,需要画图,其实是一个循环队列来着 if (++putIndex == items.length) putIndex = 0; count++; //通知阻塞的线程前来消费 notEmpty.signal(); }
dequeue()
/** * 真正的出队操作,能执行到此方法,说明你已经获得锁了,且当前线程符合生产者消费者模型的要求(即put时未满,take时非空) */ private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //置为null items[takeIndex] = null; //改变相应的下标,这可能一眼看不懂,需要画图,其实是一个循环队列来着 if (++takeIndex == items.length) takeIndex = 0; count--; //防止出队导致其他线程迭代失败的操作 if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
好了,源码分析就到这里啦!