转载

并发编程技术十之j.u.c中的BlockingQueue

针对JDK中的j.u.c我们已经讲了三篇文章了,计划我是分了五篇文章来细讲j.u.c中的api。今天 我们讲的BlockingQueue是第四篇,也是在使用中最频发,面试必提的一个话题。

我在网上搜了个用列表来讲解队列, 通过这个列表一眼就能了解jdk 在设计 API 时每个队列的特性,也能看出他们的相同点和不同点

队列

有界性

数据结构

ArrayBlockingQueue

bounded( 有界 )

加锁

arrayList

LinkedBlockingQueue

optionally-bounded

加锁 linkedList

PriorityBlockingQueue

unbounded 加锁 heap

DelayQueue

unbounded 加锁 heap

SynchronousQueue

bounded 加锁

LinkedTransferQueue

unbounded 加锁 heap

LinkedBlockingDeque

unbounded

无锁

heap

阻塞队列的概念

ArrayBlockingQueue :是一个用数组实现的有界阻塞队列,此队列按照先进先出( FIFO )的原则对元素进行排序。支持公平锁和非公平锁

LinkedBlockingQueue :一个由链表结构组成的有界队列,此队列的长度为 Integer.MAX_VALUE 。此队列按照先进先出的顺序进行排序。

PriorityBlockingQueue :一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现 compareTo() 方法来指定元素排序规则,不能保证同优先级元素的顺序

DelayQueue :一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

SynchronousQueue :一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。支持公平锁和非公平锁。 SynchronousQueue 的一个使用场景是在线程池里 LinkedTransferQueue :一个由链表结构组成的无界阻塞队列,相当于其它队列, LinkedTransferQueue 队列多了 transfer tryTransfer 方法

LinkedBlockingDeque :一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半

下面我们对每个类中的API 进行了解和分析

ArrayBlockingQueue

ArrayBlockingQueue继承  AbstractQueue 实现 BlockingQueue


 

public class ArrayBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/** The queued items */

final Object[] items;

/** items index for next take, poll, peek or remove */

int takeIndex;

/** items index for next put, offer, or add */

int putIndex;

/** Number of elements in the queue */

int count;

/** Main lock guarding all access */

final ReentrantLock lock;

/** Condition for waiting takes */

private final Condition notEmpty;

/** Condition for waiting puts */

private final Condition notFull;

transient Itrs itrs = null;

...

}

主要实现通过 items 数组来实现数据结构。通过 takeIndexputIndex 组合来标识下标, 通过 ReentrantLock 来进行加锁,通过 Condition 来对队列为元素的为空或不满 队列的阻塞

主要的方法

  • add //添加元素,满时会 抛异常

  • offer //添加元素,满时直接返回false

  • put //添加队列满时阻塞

  • poll //移除  队列第 一个元素,为空时直接返回null

  • take // 移除队列第一个元素,为空时等待

  • remove  // 移除队列第一个元素,为空时异常

  • remove(Object) //移除指定元素,若是第一个直接移除, 否则下面的元素上移,致空最后一个。

  • peek // 获取队列中第一个元素

  • size //当前队列数

通过以上方法可以分为以下几类

  • 添加元素

  • 移除元素

  • 元素个数

LinkedBlockingQueue

是一个链表队列那么实现必须使用Node来实现,看下类的定义


 

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {


/**

* Linked list node class

*/

static class Node<E> {

E item;

Node<E> next;

Node(E x) { item = x; }

}

/** The capacity bound, or Integer.MAX_VALUE if none */

private final int capacity;

/** Current number of elements */

private final AtomicInteger count = new AtomicInteger();

/**

* Head of linked list.

* Invariant: head.item == null

*/

transient Node<E> head;

/**

* Tail of linked list.

* Invariant: last.next == null

*/

private transient Node<E> last;

/** Lock held by take, poll, etc */

private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */

private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */

private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */

private final Condition notFull = putLock.newCondition();

...

}

这里的阻塞也是通过Condition来实现

主要方法

  • add //添加 元素,满时抛异常

  • put //添加元素,满时阻塞

  • offer// 添加元素,满时返回false

  • poll //移除元素,队列为空时返回null

  • remove //移除元素,队列为空 抛异常

  • remove(Object) //移除指定元素,移除时需要在链表中查找元素,然后在移除

  • take //移除 元素为空阻塞

  • size //获取  元素个数

  • drainTo //一次性从队列中获取所有( 可指定个数)可用元素,获取后在队列移除

  • iterator //转化成迭代器

  • remainingCapacity 获取 剩余队列大小

  • sliterator //转化迭代器( JDK1.8)

PriorityBlockingQueue

PriorityBlockingQueue是通过Comparator接口实现对象比较


 

public class PriorityBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

private static final long serialVersionUID = 5595510919245408276L;

}

主要方法

  • add //添加元素,  满时跑异常

  • offer //添加元素,满时返回false

  • put //添加元素,满时阻塞

  • poll //移除 元素,队列为空时返回null

  • take // 移除元素, 队列为空阻塞

  • remove //移除元素,队列为空时抛异常

  • remove(Object)

  • comparator //返回此队列元素进行排序的比较器

  • drainTo // 一次性从队列中 获取所有(可指定范围)元素

  • iterator //转化换代器

  • peek //获取队列中第一个元素

  • remainingCapacity //获取剩余 队列大小

  • spliterator //转化换代器(jdk1.8)

  • size 返回队列元素个数

DelayQueue

内部使用是非线程安全的优先队列 PriorityQueue


 

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>

implements BlockingQueue<E> {


private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();

}

主要方法

  • add  //添加元素

  • offer //添加元素

  • put //添加元素,满时阻塞

  • take  //移除元素,为空阻塞;延时时阻塞;有leader线程时阻塞;自己为leader时阻塞

  • poll   //移除元素,为空返回null

  • remove //移除元素

  • size   //返回队列元素个数

  • remainingCapacity

  • iterator  //转化成迭代器

  • drainTo  //获取所有(可指定个数)可用元素

  • clear  //清除队列

  • toArray //转化为数组

SynchronousQueue

内部定义抽象静态类Transferer,通过两个内部类来实现公平(TransferQueue)和非公平(TransferStack)模式下的队列操作。


 

public class SynchronousQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/**

* Shared internal API for dual stacks and queues.

*/

abstract static class Transferer<E> {

abstract E transfer(E e, boolean timed, long nanos);

}


/** The number of CPUs, for spin control */

static final int NCPUS = Runtime.getRuntime().availableProcessors();

static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

static final int maxUntimedSpins = maxTimedSpins * 16;

static final long spinForTimeoutThreshold = 1000L;

...

}

TransferStack 内部通过单向链表SNode实现双重栈,数据是 LIFO的 顺序。TransferQueue是通过内部定义QNode 实现,数据是FIFO的顺序。

主要方法

  • put添加元素后阻塞

  • offer  添加元素,直接返回false,若其它线程正好取出返回true

  • take 移除元素,若未有元素时阻塞

  • poll 移除元素,直接返回null,若其它线程正好添加返回元素

  • 其它方法都是默认值。没有实际意义!

LinkedTransferQueue

LInkedTransferQueue是TransferQueue 的实现类,是一个无界队列, 具有先进先出FIFO特性


 

public class LinkedTransferQueue<E> extends AbstractQueue<E>

implements TransferQueue<E>, java.io.Serializable {

}

主要方法

  • transfer  当存在一个正在等待获取的消费线程,则立即传输数据。否则会插入到队列尾部并阻塞等待消费线程

  • tryTransfer 肖存在一个正在等待获取的消费线程,则立即传输,否则返回false

  • hasWaitingConsumer 判断是否存在消费者线程

  • getWaitingConsumerCount 获取所有等待获取元素的消费线程数量

LinkedBlockingDeque

在分析LinkedBlockingDeque一般与LinkedBlockingQueue一起分析。这两个比较相似。在上面刚刚分析了LinkedBlockingQueue它是链表通过Node来实现,但入队和出队只能一端。而LInkedBlockingDeque也是链表,它可以通过两端同时操作(入队和出队)。

原因是 LinkedBlockingDeque 中的 Node是 双向链表, LinkedBlockingQueue 是单向链表


 

public class LinkedBlockingDeque<E>

extends AbstractQueue<E>

implements BlockingDeque<E>, java.io.Serializable {


/** Doubly-linked list node class */

static final class Node<E> {

E item;

Node<E> prev;

Node<E> next;


Node(E x) {

item = x;

}

}

}

主要方法

  • add /addFirst/addLast

  • offer/offerFirst/offerLast

  • poll/pollFirst/pollLast

  • put/putFirst/putLast

  • remove/removeFirst/removeLast/pop

  • getFirst/getLast

以上是根据个人理解做了分析,如有不正确请留言讨论。

----------------------------------------------------------

再次感谢,欢迎关注微信公众号“零售云技术”,文章持续更新,或留言讨论

并发编程技术十之j.u.c中的BlockingQueue

原文  http://mp.weixin.qq.com/s?__biz=MzU5MjgxNjAwMQ==&mid=2247483862&idx=1&sn=56e551c1a741b8582f1b74a5d3d52060
正文到此结束
Loading...