阻塞队列是一个支持两个附加操作的队列,这两个附加操作支持阻塞的插入和移除方法
①.支持阻塞的插入方法: 当队列满时,队列会阻塞插入元素的线程,直至队列不满
②.支持阻塞的移除方法: 当队列空时,获取元素的线程会等待队列变为非空
在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如下
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
ArrayBlockingQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的有界阻塞队列
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
DelayQueue:使用优先级队列实现的无界阻塞队列
SynchronousQueue:不存储元素的阻塞队列
LinkedTransferQueue:由链表结构组成的无界阻塞队列
LinkedBlockingDeque:由链表结构组成的双向阻塞队列
ArrayBlockingQueue是一个用 数组 实现的有界阻塞队列,队列按照 先进先出(FIFO) 原则对元素进行排序。默认采用 不公平 访问,因为公平性通常会降低吞吐量。
private static final long serialVersionUID = -817911632652898426L; /** 数组用来维护ArrayBlockingQueue中的元素 */ final Object[] items; /** 出队首位置索引 */ int takeIndex; /** 入队末位置索引 */ int putIndex; /** 元素个数 */ int count; final ReentrantLock lock; /** 出队等待队列 */ private final Condition notEmpty; /** 入队等待队列 */ private final Condition notFull; 复制代码
ArrayBlockingQueue提供了很多方法入队: add()、offer()、put() 等。我们以阻塞式方法为主,put()方法其源码如下
public void put(E e) throws InterruptedException { // 校验元素是否为空 checkNotNull(e); final ReentrantLock lock = this.lock; // 响应中断式获取同步,若线程被中断会抛出异常 lock.lockInterruptibly(); try { // 当队列已满,将线程添加到notFull等待队列中 while (count == items.length) notFull.await(); // 若没有满,进行入队 enqueue(e); } finally { lock.unlock(); } } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } 复制代码
当队列满时,会调用Condition的await()方法将线程添加到等待队列中。若队列未满调用enqueue()进行入队操作( 所有入队方法最终都将调用该方法在队列尾部插入元素 )
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 入队 items[putIndex] = x; // 当数组添加满后,重新从0开始 if (++putIndex == items.length) putIndex = 0; // 元素个数+1 count++; // 唤醒出队等待队列中的线程 notEmpty.signal(); } 复制代码
出队方法有: poll()、remove(),take() 等,take()方法其源码如下
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 响应中断式获取同步,若线程被中断会抛出异常 lock.lockInterruptibly(); try { // 若队列空,将线程添加到notEmpty等待队列中 while (count == 0) notEmpty.await(); // 获取数据 return dequeue(); } finally { lock.unlock(); } } 复制代码
当队列为空,会调用condition的await()方法将线程添加到notEmpty等待队列中,若队列不为空则调用dequeue()获取数据
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取数据 E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; // 元素个数-1 count--; if (itrs != null) itrs.elementDequeued(); // 通知入队等待队列中的线程 notFull.signal(); return x; } 复制代码
从源码中可以发现ArrayBlockingQueue通过condition的等待唤醒机制完成可阻塞式的入队和出队
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
/** 容量 */ private final int capacity; /** 元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** 头节点 */ transient Node head; /** 尾节点 */ private transient Node last; /** 出队锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 出队等待队列 */ private final Condition notEmpty = takeLock.newCondition(); /** 入队锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 入队等待队列 */ private final Condition notFull = putLock.newCondition(); 复制代码
从属性上来看LinkedBlockingQueue维护 两个锁 在入队和出队时保证线程安全, 两个锁降低线程由于线程无法获取lock而进入WAITING状态的可能性提高了线程并发执行的效率 ,并且 count属性使用AtomicInteger原子操作类 (可能两个线程一个出队一个入队操作count,各自的锁显然起不到用处)
public void put(E e) throws InterruptedException { // 若新增元素为null抛异常 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; // 获取当前元素个数 final AtomicInteger count = this.count; // 响应中断式获取锁,若线程被中断会抛出异常 putLock.lockInterruptibly(); try { // 若当前队列已满,将线程添加到notFull等待队列中 while (count.get() == capacity) { notFull.await(); } // 若没有满,进行入队 enqueue(node); // 元素个数+1 c = count.getAndIncrement(); // 若当前元素个数+1还未到定义的最大容量,则唤醒入队等待队列中的线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } 复制代码
public E take() throws InterruptedException { E x; int c = -1; // 获取当前元素个数 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 响应中断式获取锁,若线程被中断会抛出异常 takeLock.lockInterruptibly(); try { // 若当前队列为空,则将线程添加到notEmpty等待队列中 while (count.get() == 0) { notEmpty.await(); } // 获取数据 x = dequeue(); // 当前元素个数-1 c = count.getAndDecrement(); // 若队列中还有元素,唤醒阻塞的出队线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } 复制代码
PriorityBlockingQueue是一个支持优先级的 无界阻塞队列 ,虽然无界但由于资源耗尽,尝试的添加可能会失败(导致OutOfMemoryError ),默认情况下元素采取自然顺序升序排序,也可以通过构造函数来指定Comparator来对元素进行排序,需要注意的是PriorityBlockingQueue 不能保证同优先级元素的顺序
/** 默认容量 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** 最大容量 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** 内置数组 */ private transient Object[] queue; /** 元素个数 */ private transient int size; /** 比较器,为空则自然排序 */ private transient Comparator comparator; private final ReentrantLock lock; /** 出队等待队列 */ private final Condition notEmpty; /** 用于CAS扩容时用 */ private transient volatile int allocationSpinLock; private PriorityQueue q; 复制代码
可以发现PriorityBlockingQueue只有 一个condition ,因为PriorityBlockingQueue是一个无界队列,插入始终成功, 也正因为此所以其入队用lock.lock()方法不响应中断,而出队用lock.lockInterruptibly()响应中断式获取锁
public void put(E e) { // 不需要阻塞 offer(e); // never need to block } public boolean offer(E e) { // 判空 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); int n, cap; Object[] array; // 若大于等于当前数组长度则扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { // 获取比较器 Comparator cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 元素个数+1 size = n + 1; // 唤醒 notEmpty.signal(); } finally { lock.unlock(); } return true; } 复制代码
tryGrow扩容
private void tryGrow(Object[] array, int oldCap) { // 必须先释放锁 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // CAS设置占用 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 新容量 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 新容量若超过最大值 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 若新容量大于旧容量且当前数组相等,创建新容量数组 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // CAS设置allocationSpinLock失败,表明有其他线程也正在扩容,让给其他线程处理 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 获取锁 lock.lock(); if (newArray != null && queue == array) { queue = newArray; // 数组复制 System.arraycopy(array, 0, newArray, 0, oldCap); } } 复制代码
从源码中可以发现为了尽可能提高并发效率,先释放锁在计算新容量时利用CAS设置allocationSpinLock来保证线程安全,再最后获取锁进行数组复制扩容。扩容完后,根据比较器的排序规则进行新增
siftUpComparable(),比较器comparator为null时采取自然排序调用此方法
private static void siftUpComparable(int k, T x, Object[] array) { Comparable key = (Comparable) x; // 若当前元素个数大于0,即队列不为空 while (k > 0) { // (n - 1) / 2 int parent = (k - 1) >>> 1; // 获取parent位置上的元素 Object e = array[parent]; // 从队列的最后往上调整堆,直到不小于其父节点为止 if (key.compareTo((T) e) >= 0) break; // 如果当前节点小于其父节点,则将其与父节点进行交换,并继续往上访问父节点 array[k] = e; k = parent; } array[k] = key; } 复制代码
此方法为建堆过程,假定PriorityBlockingQueue内部数组如下:
转换为堆(堆是一种二叉树结构):
往其添加元素2,k为当前元素个数12,计算parent为5,e为6,e大于2,交换位置
第二次循环,k=5,parent=2,e=5,5>2交换位置
第三次循环,k=2,parent=0,e=1,1<2退出循环,第2个位置给新元素2
其主要思路末位置寻找其父节点,若新增元素小于父节点则将其与父节点进行交换,并继续往上访问父节点,直到大于等于其父节点为止
siftUpUsingComparator(),当比较器不为null,采用指定比较器,调用此方法
private static void siftUpUsingComparator(int k, T x, Object[] array, Comparator cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } 复制代码
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } 复制代码
获取锁后,调用dequeue()
private E dequeue() { // 若队列为空,返回null int n = size - 1; if (n < 0) return null; else { Object[] array = queue; // 出队元素,首元素 E result = (E) array[0]; // 最后一个元素 E x = (E) array[n]; array[n] = null; Comparator cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } } 复制代码
自然排序处理siftDownComparable()
private static void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable key = (Comparable)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { // 左节点 int child = (k << 1) + 1; // assume left child is least Object c = array[child]; // 右节点 int right = child + 1; if (right < n && ((Comparable) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } 复制代码
指定排序siftDownUsingComparator()
private static void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } } 复制代码
以上面最后一个图为基础出队第一个元素
第一次循环:k=0,n=12,half=6,child=1,c为图中节点3,right=2,经过子节点比较找出较小值2,2与末尾值节点6相比,末位置更大,首位置与右子节点交换位置
第二次循环:k=2,child=5,c为图中节点5,right=6,经过子节点比较找出较小值5,5与末位置节点6相比,末位置更大,与左子节点交换位置
第三次循环:k=5,child=11,c为图中节点8,right=12,经过子节点比较找出较小值末位置节点6相比
其主要思路:首位置寻找其子节点,找出两个子节点的较小的与末尾位置节点比较若末尾节点小,则将其置入首位置,否则首位置与较小子节点替换位置,以此略推继续往下找
DelayQueue是一个 支持延时获取元素的无界阻塞队列 ,队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素, 只有在延迟期满时才能从队列中提取元素 ,可以将其应用在缓存、定时任务调度等场景
DelayQueue队列中的元素必须实现Delayed接口,我们先看Delayed接口继承关系
从图中我们可以知道,实现Delayed接口,我们必须实现其自定义的getDelay()方法以及继承过来的compareTo()方法
private final transient ReentrantLock lock = new ReentrantLock(); /** 优先级队列 */ private final PriorityQueue q = new PriorityQueue(); private Thread leader = null; private final Condition available = lock.newCondition(); 复制代码
public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 向PriorityQueue添加元素 q.offer(e); // 若当前元素 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } } 复制代码
其添加操作基于PriorityQueue的offer方法
public boolean offer(E e) { // 判空 if (e == null) throw new NullPointerException(); // 修改次数 modCount++; int i = size; // 判断是否需要扩容 if (i >= queue.length) grow(i + 1); // 元素个数+1 size = i + 1; // 若队列为空,首元素置为e if (i == 0) queue[0] = e; else siftUp(i, e); return true; } private void siftUp(int k, E x) { if (comparator != null) siftUpUsingComparator(k, x); // 自然排序 else siftUpComparable(k, x); } /** * 自然排序 */ private void siftUpComparable(int k, E x) { Comparable key = (Comparable) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; } /** * 指定比较器 */ private void siftUpUsingComparator(int k, E x) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; } 复制代码
PriorityQueue的自然排序或指定比较器处理新增操作与PriorityBlockingQueue的逻辑差不多,这里就不再过多分析,但是从源码我们发现了modCount,表明PriorityQueue是线程不安全的,但是由于DelayQueue可以依靠ReentrantLock来确保同步安全。新增完后会判断新增元素是否为队列首元素,若是将leader设置为空,并唤醒所有等待线程
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 死循环 for (;;) { // 获取队列首元素,若队列为空返回null E first = q.peek(); // 若队列为空 if (first == null) available.await(); else { // 获取剩余延迟时间 long delay = first.getDelay(NANOSECONDS); // 若小于0表明已过期,出队 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // 若leader!= null 表明有其他线程正在操作 if (leader != null) available.await(); else { // 否则将leader置为当前线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 指定时间等待 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } 复制代码
整体出队逻辑不再多述,来说下leader和first
从源码我们可以看到leader属性在put()与take()方法中都有出现,其作用在于减少不必要的竞争,若leader不为空说明已经有线程正在操作,直接一直等待即可没必要再争。举个例子假定有线程A、B、C依次要出队,线程A先获取锁由于首元素未过期,指定剩余时间等待,若不采用leader直接一直等待,线程B和C也指定时间等待,那么会造成三个线程同时竞争首元素,本来A→B→C的顺序可能导致乱序不是线程所想要的元素
《java并发编程的艺术》
cmsblogs.com/?p=2407