延时队列 DelayQueue
搞定单机版的超时订单
那么什么时候需要用延时队列呢?常见的延时任务场景 举栗子:
:seedling: :seedling: :seedling: :seedling: :seedling: :seedling: :seedling: :seedling: :seedling: :seedling:
DelayQueue
,万祖归宗,万法同源,学会了最基础的 Queue
,就不愁其他的了
延时队列,首先,它是一种 队列 ,队列意味着内部的元素是 有序 的,元素 出队 和 入队 是有方向性的,元素从一端进入,从另一端取出。
其次, 延时队列 ,最重要的特性就体现在它的 延时属性 上,跟普通的队列不一样的是, 普通队列 中的元素总是等着希望被早点取出处理,而 延时队列 中的元素则是希望被在 指定时间得到取出和处理 ,所以 延时队列 中的元素是都是 带时间属性 的,通常来说是需要被处理的消息或者任务。
一言以蔽之曰 : 延时队列就是用来存放需要在指定时间被处理的元素的队列。
1) DelayQueue 是谁,上族谱
看的出来到DelayQueue
这一代已经第五代传人了,
要知道 DelayQueue
自幼生在八戒家,长大就往外面拉,熊熊烈火它不怕,水是水来渣是渣。
不过它真的是文韬武略,有一把 ReentrantLock
就是它的九齿钉耙,抗的死死の捍卫着自己的 PriorityQueue
.
有典故曰:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { // 用于控制并发的 可重入 全局 锁 private final transient ReentrantLock lock = new ReentrantLock(); // 根据Delay时间排序的 无界的 优先级队列 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 用于优化阻塞通知的线程元素leader,标记当前是否有线程在排队(仅用于取元素时) private Thread leader = null; // 条件,用于阻塞和通知的Condition对象,表示现在是否有可取的元素 private final Condition available = lock.newCondition(); /** * 省洛方法代码..... 你们懂我的省洛吗? */ 复制代码
锁,队列,状态(条件)
Condition
进行条件的判断-->进行线程之间的通信和唤起 PriorityQueue
作为一个容器,容器里面的元素都应该实现 Delayed
接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件,最先过期的元素放在优先级最高。 DelayQueue
是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。 2) 优先级队列 PriorityQueue
因为我们的 DelayQueue
里面维护了一个优先级的队列 PriorityQueue
简单的看下:
//默认容量11 private static final int DEFAULT_INITIAL_CAPACITY = 11; //存储元素的地方 数组 transient Object[] queue; // non-private to simplify nested class access //元素个数 private int size = 0; //比较器 private final Comparator<? super E> comparator; 复制代码
3) DelayQueue的方法简介
leader
置为空,并唤醒等待在条件 available
上的线程; public boolean add(E e) { return offer(e);} public void put(E e) { offer(e);} public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e);} public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); //加锁 因为优先队列线程不安全 try { q.offer(e); //判断优先级 进行入队 if (q.peek() == e) { //-----[1] //leader记录了被阻塞在等待队列头生效的线程 新增一个元素到队列头, //表示等待原来队列头生效的阻塞的线程已经失去了阻塞的意义 //,此时需要获取新的队列头进行返回了 leader = null; //获取队列头的线程被唤起,主要有两种场景: //1. 之前队列为空,导致被阻塞的线程 //2. 之前队列非空,但是队列头没有生效(到期)导致被阻塞的线程 available.signal(); } return true; //因为是无界队列 所以添加元素肯定成功 直到OOM } finally { lock.unlock(); //释放锁 } } 复制代码
offer()
方法,首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,peek并不一定是当前添加的元素,如果[1]为true,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的线程,通知他们队列里面有元素了。
请看我详细的注释,绝不是蜻蜓点水
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //获取锁 lock.lockInterruptibly(); //可中断锁 可以自行了解一下 嘻嘻嘻嘻... try { for (;;) {//会写死循环的都是高手 E first = q.peek();//get队头元素 if (first == null) // 队列头为空,则阻塞,直到新增一个入队为止(1) available.await(); else { long delay = first.getDelay(NANOSECONDS);//获取剩余时间 if (delay <= 0) // 若队列头元素已生效,则直接返回(2) return q.poll(); first = null; // don't retain ref while waiting 等待的时候不能引用,表示释放当前引用的(3) if (leader != null) // leader 非空时,表示有其他的一个线程在出队阻塞中 (4.1) // 此时挂住当前线程,等待另一个线程出队完成 available.await(); else { //标识当前线程处于等待队列头生效的阻塞中 (4.2.1) Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待队列头元素生效(4.2.2) available.awaitNanos(delay); } finally { //最终释放当前的线程 设置leader为null (4.2.3) if (leader == thisThread) leader = null; } } } } //(5) } finally { if (leader == null && q.peek() != null) // 当前线程出队完成,通知其他出队阻塞的线程继续执行(6) available.signal(); lock.unlock();//解锁结束 } } 复制代码
那么,下面的结论肉眼可见:
leader = thisThread leader=null
4)Delayed
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); } 复制代码
据情报显示: Delayed
是一个继承自 Delayed
的接口,并且定义了一个 Delayed
方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。
很简答就是定义了一个,一个哈,一个表延迟的接口,就是个规范接口,目的就是骗我们去实现它的方法.哼~
说了那么多废话,让我想起了那句名言:
一切没有代码实操的讲解都是耍流氓
至今深深的烙在我心中,所以我一定要实战给你们看,显得我不是流氓...
实战以 订单下单后三十分钟内未支付则自动取消 为业务场景
该场景的代码逻辑分析如下:
那么我们写代码一定要通用,先来写个通用的 Delayed
通用...嗯! 泛型的
import lombok.Getter; import lombok.Setter; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author LiJing * @ClassName: ItemDelayed * @Description: 数据延迟实现实例 用以包装具体的实例转型 * @date 2019/9/16 15:53 */ @Setter @Getter public class ItemDelayed<T> implements Delayed { /**默认延迟30分钟*/ private final static long DELAY = 30 * 60 * 1000L; /**数据id*/ private Long dataId; /**开始时间*/ private long startTime; /**到期时间*/ private long expire; /**创建时间*/ private Date now; /**泛型data*/ private T data; public ItemDelayed(Long dataId, long startTime, long secondsDelay) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + (secondsDelay * 1000); this.now = new Date(); } public ItemDelayed(Long dataId, long startTime) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + DELAY; this.now = new Date(); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } } 复制代码
public interface DelayOrder<T> { /** * 添加延迟对象到延时队列 * * @param itemDelayed 延迟对象 * @return boolean */ boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed); /** * 根据对象添加到指定延时队列 * * @param data 数据对象 * @return boolean */ boolean addToDelayQueue(T data); /** * 移除指定的延迟对象从延时队列中 * * @param data */ void removeToOrderDelayQueue(T data); } 复制代码
@Slf4j @Lazy(false) @Component public class DelayOwnOrderImpl implements DelayOrder<Order> { @Autowired private OrderService orderService; @Autowired private ExecutorService delayOrderExecutor; private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>(); /** * 初始化时加载数据库中需处理超时的订单 * 系统启动:扫描数据库中未支付(要在更新时:加上已支付就不用更新了),未过期的的订单 */ @PostConstruct public void init() { log.info("系统启动:扫描数据库中未支付,未过期的的订单"); List<Order> orderList = orderService.selectFutureOverTimeOrder(); for (Order order : orderList) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); this.addToOrderDelayQueue(orderDelayed); } log.info("系统启动:扫描数据库中未支付的订单,总共扫描了" + orderList.size() + "个订单,推入检查队列,准备到期检查..."); /*启动一个线程,去取延迟订单*/ delayOrderExecutor.execute(() -> { log.info("启动处理的订单线程:" + Thread.currentThread().getName()); ItemDelayed<Order> orderDelayed; while (true) { try { orderDelayed = DELAY_QUEUE.take(); //处理超时订单 orderService.updateCloseOverTimeOrder(orderDelayed.getDataId()); } catch (Exception e) { log.error("执行自营超时订单的_延迟队列_异常:" + e); } } }); } /** * 加入延迟消息队列 **/ @Override public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) { return DELAY_QUEUE.add(orderDelayed); } /** * 加入延迟消息队列 **/ @Override public boolean addToDelayQueue(Order order) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); return DELAY_QUEUE.add(orderDelayed); } /** * 从延迟队列中移除 主动取消就主动从队列中取出 **/ @Override public void removeToOrderDelayQueue(Order order) { if (order == null) { return; } for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) { ItemDelayed<Order> queue = iterator.next(); if (queue.getDataId().equals(order.getId())) { DELAY_QUEUE.remove(queue); } } } } 复制代码
解释一番上面的写的东东
delayOrderExecutor @PostConstruct take()
id
那今日份的讲解就到此结束,具体的代码请移步我的 gitHub的mybot项目Master分支 查阅,fork体验一把,或者评论区留言探讨,写的不好,请多多指教~~