①.支持阻塞的插入方法: 当队列满时,队列会阻塞插入元素的线程,直至队列不满
②.支持阻塞的移除方法: 当队列空时,获取元素的线程会等待队列变为非空
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
ArrayBlockingQueue是一个用 数组 实现的有界阻塞队列,队列按照 先进先出(FIFO) 原则对元素进行排序。默认采用 不公平 访问,因为公平性通常会降低吞吐量。
private static final long serialVersionUID = -817911632652898426L; /** 数组用来维护ArrayBlockingQueue中的元素 */ final Object[] items; /** 出队首位置索引 */ int takeIndex; /** 入队末位置索引 */ int putIndex; /** 元素个数 */ int count; final ReentrantLock lock; /** 出队等待队列 */ private final Condition notEmpty; /** 入队等待队列 */ private final Condition notFull; 复制代码
ArrayBlockingQueue提供了很多方法入队: add()、offer()、put() 等。我们以阻塞式方法为主,put()方法其源码如下
public void put(E e) throws InterruptedException { // 校验元素是否为空 checkNotNull(e); final ReentrantLock lock = this.lock; // 响应中断式获取同步,若线程被中断会抛出异常 lock.lockInterruptibly(); try { // 当队列已满,将线程添加到notFull等待队列中 while (count == items.length) notFull.await(); // 若没有满,进行入队 enqueue(e); } finally { lock.unlock(); } } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } 复制代码
当队列满时,会调用Condition的await()方法将线程添加到等待队列中。若队列未满调用enqueue()进行入队操作( 所有入队方法最终都将调用该方法在队列尾部插入元素 )
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 入队 items[putIndex] = x; // 当数组添加满后,重新从0开始 if (++putIndex == items.length) putIndex = 0; // 元素个数+1 count++; // 唤醒出队等待队列中的线程 notEmpty.signal(); } 复制代码
出队方法有: poll()、remove(),take() 等,take()方法其源码如下
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 响应中断式获取同步,若线程被中断会抛出异常 lock.lockInterruptibly(); try { // 若队列空,将线程添加到notEmpty等待队列中 while (count == 0) notEmpty.await(); // 获取数据 return dequeue(); } finally { lock.unlock(); } } 复制代码
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取数据 E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; // 元素个数-1 count--; if (itrs != null) itrs.elementDequeued(); // 通知入队等待队列中的线程 notFull.signal(); return x; } 复制代码
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
/** 容量 */ private final int capacity; /** 元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** 头节点 */ transient Node head; /** 尾节点 */ private transient Node last; /** 出队锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 出队等待队列 */ private final Condition notEmpty = takeLock.newCondition(); /** 入队锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 入队等待队列 */ private final Condition notFull = putLock.newCondition(); 复制代码
从属性上来看LinkedBlockingQueue维护 两个锁 在入队和出队时保证线程安全, 两个锁降低线程由于线程无法获取lock而进入WAITING状态的可能性提高了线程并发执行的效率 ,并且 count属性使用AtomicInteger原子操作类 (可能两个线程一个出队一个入队操作count,各自的锁显然起不到用处)
public void put(E e) throws InterruptedException { // 若新增元素为null抛异常 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; // 获取当前元素个数 final AtomicInteger count = this.count; // 响应中断式获取锁,若线程被中断会抛出异常 putLock.lockInterruptibly(); try { // 若当前队列已满,将线程添加到notFull等待队列中 while (count.get() == capacity) { notFull.await(); } // 若没有满,进行入队 enqueue(node); // 元素个数+1 c = count.getAndIncrement(); // 若当前元素个数+1还未到定义的最大容量,则唤醒入队等待队列中的线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } 复制代码
public E take() throws InterruptedException { E x; int c = -1; // 获取当前元素个数 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 响应中断式获取锁,若线程被中断会抛出异常 takeLock.lockInterruptibly(); try { // 若当前队列为空,则将线程添加到notEmpty等待队列中 while (count.get() == 0) { notEmpty.await(); } // 获取数据 x = dequeue(); // 当前元素个数-1 c = count.getAndDecrement(); // 若队列中还有元素,唤醒阻塞的出队线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } 复制代码
PriorityBlockingQueue是一个支持优先级的 无界阻塞队列 ,虽然无界但由于资源耗尽,尝试的添加可能会失败(导致OutOfMemoryError ),默认情况下元素采取自然顺序升序排序,也可以通过构造函数来指定Comparator来对元素进行排序,需要注意的是PriorityBlockingQueue 不能保证同优先级元素的顺序
/** 默认容量 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** 最大容量 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** 内置数组 */ private transient Object[] queue; /** 元素个数 */ private transient int size; /** 比较器,为空则自然排序 */ private transient Comparator comparator; private final ReentrantLock lock; /** 出队等待队列 */ private final Condition notEmpty; /** 用于CAS扩容时用 */ private transient volatile int allocationSpinLock; private PriorityQueue q; 复制代码
可以发现PriorityBlockingQueue只有 一个condition ,因为PriorityBlockingQueue是一个无界队列,插入始终成功, 也正因为此所以其入队用lock.lock()方法不响应中断,而出队用lock.lockInterruptibly()响应中断式获取锁
public void put(E e) { // 不需要阻塞 offer(e); // never need to block } public boolean offer(E e) { // 判空 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); int n, cap; Object[] array; // 若大于等于当前数组长度则扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { // 获取比较器 Comparator cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 元素个数+1 size = n + 1; // 唤醒 notEmpty.signal(); } finally { lock.unlock(); } return true; } 复制代码
private void tryGrow(Object[] array, int oldCap) { // 必须先释放锁 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // CAS设置占用 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 新容量 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 新容量若超过最大值 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 若新容量大于旧容量且当前数组相等,创建新容量数组 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // CAS设置allocationSpinLock失败,表明有其他线程也正在扩容,让给其他线程处理 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 获取锁 lock.lock(); if (newArray != null && queue == array) { queue = newArray; // 数组复制 System.arraycopy(array, 0, newArray, 0, oldCap); } } 复制代码
private static void siftUpComparable(int k, T x, Object[] array) { Comparable key = (Comparable) x; // 若当前元素个数大于0,即队列不为空 while (k > 0) { // (n - 1) / 2 int parent = (k - 1) >>> 1; // 获取parent位置上的元素 Object e = array[parent]; // 从队列的最后往上调整堆,直到不小于其父节点为止 if (key.compareTo((T) e) >= 0) break; // 如果当前节点小于其父节点,则将其与父节点进行交换,并继续往上访问父节点 array[k] = e; k = parent; } array[k] = key; } 复制代码
private static void siftUpUsingComparator(int k, T x, Object[] array, Comparator cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } 复制代码
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } 复制代码
private E dequeue() { // 若队列为空,返回null int n = size - 1; if (n < 0) return null; else { Object[] array = queue; // 出队元素,首元素 E result = (E) array[0]; // 最后一个元素 E x = (E) array[n]; array[n] = null; Comparator cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } } 复制代码
private static void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable key = (Comparable)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { // 左节点 int child = (k << 1) + 1; // assume left child is least Object c = array[child]; // 右节点 int right = child + 1; if (right < n && ((Comparable) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } 复制代码
private static void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } } 复制代码
DelayQueue是一个 支持延时获取元素的无界阻塞队列 ,队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素, 只有在延迟期满时才能从队列中提取元素 ,可以将其应用在缓存、定时任务调度等场景
private final transient ReentrantLock lock = new ReentrantLock(); /** 优先级队列 */ private final PriorityQueue q = new PriorityQueue(); private Thread leader = null; private final Condition available = lock.newCondition(); 复制代码
public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 向PriorityQueue添加元素 q.offer(e); // 若当前元素 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } } 复制代码
public boolean offer(E e) { // 判空 if (e == null) throw new NullPointerException(); // 修改次数 modCount++; int i = size; // 判断是否需要扩容 if (i >= queue.length) grow(i + 1); // 元素个数+1 size = i + 1; // 若队列为空,首元素置为e if (i == 0) queue[0] = e; else siftUp(i, e); return true; } private void siftUp(int k, E x) { if (comparator != null) siftUpUsingComparator(k, x); // 自然排序 else siftUpComparable(k, x); } /** * 自然排序 */ private void siftUpComparable(int k, E x) { Comparable key = (Comparable) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; } /** * 指定比较器 */ private void siftUpUsingComparator(int k, E x) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; } 复制代码
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 死循环 for (;;) { // 获取队列首元素,若队列为空返回null E first = q.peek(); // 若队列为空 if (first == null) available.await(); else { // 获取剩余延迟时间 long delay = first.getDelay(NANOSECONDS); // 若小于0表明已过期,出队 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // 若leader!= null 表明有其他线程正在操作 if (leader != null) available.await(); else { // 否则将leader置为当前线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 指定时间等待 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } 复制代码