转载

Java并发——阻塞队列原理解析

在前文中 非阻塞队列之ConcurrentLinkedQueue源码解析 中,深度解析了非阻塞队列的源码。本篇内容将对于阻塞队列的原理、4中处理方式以及7中阻塞队列进行详细解析。

什么是阻塞队列

首先,再一次申明,队列必须是线程安全的,否则将毫无意义。阻塞队列最大的特征就是提供两种阻塞操作:

  • 阻塞的插入元素:当队列满时,队列会阻塞插入元素的线程,直到队列非满;
  • 阻塞的获取元素:对队列空时,队列会阻塞获取元素的线程,直到队列非空。

说到这里,其实要研究Java中阻塞队列的核心问题就付出水面了:

  • 阻塞队列如何实现阻塞操作的?
  • 如何在达到一定条件时唤醒相关线程的?
  • 如何保证线程安全的插入元素和获取元素?
    其实这就回到了并发要解决的本质。在Java并发——线程安全一文中对线程安全和如何实现线程安全有非常清晰的阐述。
    要实现以上几种功能的方案有很多:采用Object.wait/notify或者基于AQS。Java中大多数的阻塞队列采用的基于AQS实现的ReentrantLock和Condition的方式实现线程安全的。阻塞队列的源码比起非阻塞队列的源码要简单很多,如果对于这些基本理念很熟悉的话,那么理解Java阻塞队列的源码就很简单了。

4种处理方式

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() pull() take() take(time, unit)
检查方法 element() peek() - -
  • 抛出异常:当队列满时,再添加元素的话将抛出IllegalStateException("Queue full")异常;当队列空时,在移除元素的话将抛出NoSuchElementException异常。
  • 返回特殊值:添加元素方法会返回boolean值表示添加成功与否,如果返回true表示添加成功,如果队列满了,同理,如果移除元素成功也将返回false。
  • 一直阻塞:当队列满时,队列会阻塞所有添加元素的线程,直到线程非满;当队列空时,队列会阻塞所有移除元素的线程,直到线程非空。
  • 超时退出:当阻塞超过一段时间之后,线程会自动退出。

注意,阻塞队列分为有界阻塞队列和无界阻塞队列,对于无界阻塞队列而言,永远不会出现队列满的情况,因此put/offer/take/pull这些方法不会出现阻塞的情况。当然无界并不意味着可以存放无限的元素,毕竟JVM内存是有界的!

在实际开发中,这四种处理方式改如何选择呢?

抛出异常:这种方式适用于“一次性”场景,比如中奖活动,规定只能有10名用户中奖,那么队列满之后,将直接抛出异常拒绝再添加中奖用户中队列中,然后触发派奖线程,派奖线程从队列中获取元素直到全部获取完毕抛出异常结束派奖。

返回特殊值:这种场景适用于高并发、耗时短的任务。由于任务执行耗时短,当添加或者移除失败时,可以采用自旋思想,自旋添加或者移除直到成功,这样做的好处是避免了线程调度的性能消耗。

一直阻塞:这种场景适用于高并发、耗时长的任务。由于耗时长,此时再采用自旋的方式显然不如阻塞线程。

超时退出:这种场景适用于高并发且允许操作失败的场景,比如用户行为收集等,虽然无法保证100%的收集,但是在大量数据下90%以上的收集率足够得到准确的数据分析结果了。相当于牺牲了一定的准确率以提升性能。

7种阻塞队列

ArrayBlockingQueue

基于数组实现的有界队列,FIFO。内部使用的是ReentrantLock + ConditionObject实现的同步机制。支持线程公平的访问队列(本质上是设置ReentrantLock的公平锁)

LinkedBlockingQueue

基于链表实现的有界队列,FIFO。内部使用的是ReentrantLock + ConditionObject实现的同步机制。但是它不支持设置公平锁。

PriorityBlockingQueue

是一个支持优先级的无界阻塞队列。默认情况下是按照元素添加的顺序升序排序的。也可以自定义类实现compareTo()方法来确定元素的排序规则。内部使用的是ReentrantLock + ConditionObject实现的同步机制。既然都已经支持优先级了,那么自然不需要公平竞争咯。

DelayQueue

延时队列。内部实际上是基于PriorityQueue实现的。队列中的元素必须实现Delayed接口,在创建元素时可以指定延时多久才能从队列中获取到当前元素。

DelayQueue非常有用!我们可以基于DelayQueue实现以下场景:

  • 缓存系统的设计:循环从延时队列中获取元素,如果能够获取到元素,说明这个元素的有效期到了;
  • 定时任务系统的设计:循环从延时队列中获取元素,一定获取到元素就执行相关的定时任务逻辑。在Java中,TimeQueue就是基于DelayQueue实现的。

Delayed接口的具体使用可以参考Java定时任务框架ScheduledThreadPoolExecutor中的ScheduledFutureTask。以后有机会可以进行定时任务系统专题研究。

SynchronousQueue

这是一个不存储元素的队列,需要注意的是每一个put操作都必须有对应的take操作,否则将会被阻塞不能够继续添加元素。这个队列可以看做是容量只有1的队列,非常适合一些传递性场景。它也是基于ReentrantLock和ConditionObject实现的。

LinkedTransferQueue

基于链表的无界阻塞队列,FIFO。相比于其他阻塞队列,它的特性就在于“transfer”。

  • transfer方法 如果有消费端正在等待接收元素(take()/poll()方法),transfer方法可以将元素立即传递给消费端。如果此时没有消费端,则transfer方法会将此元素放在队列的tail节点,并且阻塞直到此元素被消费。
  • tryTransfer方法 与transfer方法不同,此方法的目的是为了试探元素能否直接被消费端接收。如果没有消费端正在等待接收元素,此方法返回false。和transfer()方法不同,此方法会立即返回。

LinkedBlockingDeque

基于双向链表的阻塞队列。相比于其他阻塞队列,他的特性就在于“双向”。即:可以从队列的两端插入和移除元素。这样就相当于减少了一半的锁竞争,进一步提升了并发能力。LinkedBlockingDeque非常适用于高并发场景以及“工作窃取”模式中。

原文  https://juejin.im/post/5ef9eb4c6fb9a07eaf26a95b
正文到此结束
Loading...