DelayQueue 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。基于 PriorityQueue
实现的延迟队列,用 ReentrantLock
提供线程安全性。
其元素必须实现 Delayed
接口。
该类可用来实现定时调度的功能,当前时间与任务的下次执行时间的距离作为延迟时间。
实现上采用 Leader_Follower 模式 的变体进行优化:leader 进行限时等待,其他线程作为 follower 无限等待。leader 在等待的过程中可能插入一个更快到期的元素,那么旧 leader 就会被作废,如果又有一个线程来获取,那么它会作为新的 leader 根据新的队列头元素进行限时等待。
public interface Delayed extends Comparable<Delayed> { // 返回与此对象相关的剩余延迟时间,以给定的时间单位表示。 long getDelay(TimeUnit unit); } public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); // 该变量被赋值给等待元素的队列头结点。这是 Leader_Follower 模式的变体, // 用于减少限时等待。当一个线程成为 leader 时,它只等待下一个延迟到达, // 但其他的线程都是无限等待的。 // leader 线程必须在从 take() 或 poll(...) 方法返回前通知其他线程, // 除非其他线程在这个过程中成为了 leader 。 // 每当队列的头被一个更早到期的元素取代时,leader 字段设置为 null 表示作废, // 同时,一些等待线程、不一定必须是当前的 leader 被通知。 // 因此等待线程必须准备好在等待过程中获取和失去 leadership。 private Thread leader = null; // 当一个新的元素在队列头部变的可用或新的线程可能需要成为 leader 时发出通知。 private final Condition available = lock.newCondition(); public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { // 新元素成为了队列的头,作废已有的 leader,通知等待线程。 // 问题1:为啥要废弃 leader? leader = null; available.signal(); } return true; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); // 队列为空,只能无限等待 else { long delay = first.getDelay(NANOSECONDS); // 队列头到期了 // 此时不一定是 leader 获得了队列头元素 if (delay <= 0) return q.poll(); // 在等待的过程中不持有引用。 // 问题2:为啥专门放弃引用? first = null; if (leader != null) // 如果已经有 leader 了,作为 follower 进行无限等待 available.await(); else { // 队列里有未到期的元素、且没有 leader,自己成为 leader。 Thread thisThread = Thread.currentThread(); leader = thisThread; // 作为 leader 进行限时等待 try { available.awaitNanos(delay); } finally { // 作废 leader。 if (leader == thisThread) // 问题3:为啥在这里作废 leader? leader = null; } } } } } finally { // leader 不为空说明 leader 线程还在限时等待,不需要唤醒 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } }
问题1:此时可能是新元素比原来的 leader 的等待时间更短,原来的 leader 失去了 leadership。此处作废是为了让旧 leader 等待的元素之前的元素能够尽快被处理。比如旧 leader 进行限时等待 1000ms,此时连续进来 30ms/50ms 的两个元素;如果此时不作废,后续就有线程来获取元素,会因为有旧 leader 进入无限等待;作废后,后续的线程可能只需要限时等待 30ms,提高了延迟队列的准确性。
问题2:因为在等待过程中,这个元素可能被其他线程处理、需要进行垃圾回收,防止被这个线程的栈引用了而没法垃圾回收。
问题3:首先,leader 可能在等待的过程中变了,因此需要先判断 leader == thisThread
。如果在 return 语句前进行作废,对于获取竞争不激烈的场景是不需要作废,这样会多出一些没必要的判断 leader == thisThread
。另外是限时等待到时后应该都能获得元素,线程到时唤醒后作废 leader 也是合理的。
引入 leader_follower 模式是为了优化在高并发下的等待,尽量用无限等待替代限时等待,也防止了多个线程同时竞争头结点。
实现上做到了 GC 友好。
欢迎关注我的微信公众号: coderbee笔记 ,可以更及时回复你的讨论。