整理了阻塞队列LinkedBlockingQueue的学习笔记,希望对大家有帮助。有哪里不正确,欢迎指出,感谢。
我们先来看看LinkedBlockingQueue的继承体系。 使用IntelliJ IDEA查看类的继承关系图形
LinkedBlockingQueue实现了序列化接口 Serializable,因此它有序列化的特性。 LinkedBlockingQueue实现了BlockingQueue接口,BlockingQueue继承了Queue接口,因此它拥有了队列Queue相关方法的操作。
类图来自Java并发编程之美
LinkedBlockingQueue主要特性:
//容量范围,默认值为 Integer.MAX_VALUE private final int capacity; //当前队列元素个数 private final AtomicInteger count = new AtomicInteger(); //头结点 transient Node<E> head; //尾节点 private transient Node<E> last; //take, poll等方法的可重入锁 private final ReentrantLock takeLock = new ReentrantLock(); //当队列为空时,执行出队操作(比如take )的线程会被放入这个条件队列进行等待 private final Condition notEmpty = takeLock.newCondition(); //put, offer等方法的可重入锁 private final ReentrantLock putLock = new ReentrantLock(); //当队列满时, 执行进队操作( 比如put)的线程会被放入这个条件队列进行等待 private final Condition notFull = putLock.newCondition(); 复制代码
LinkedBlockingQueue有三个构造函数:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } 复制代码
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); //设置队列大小 this.capacity = capacity; //new一个null节点,head、tail节点指向该节点 last = head = new Node<E>(null); } 复制代码
public LinkedBlockingQueue(Collection<? extends E> c) { //调用指定容量的构造器 this(Integer.MAX_VALUE); //获取put, offer的可重入锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; //循环向队列中添加集合中的元素 for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); //将队列的last节点指向该节点 enqueue(new Node<E>(e)); ++n; } //更新容量值 count.set(n); } finally { //释放锁 putLock.unlock(); } } 复制代码
static class Node<E> { // 当前节点的元素值 E item; // 下一个节点的索引 Node<E> next; //节点构造器 Node(E x) { item = x; } } 复制代码
LinkedBlockingQueue的节点符合单向链表的数据结构要求:
item表示当前节点的元素值,next表示指向下一节点的指针
入队方法,其实就是向队列的尾部插入一个元素。如果元素为空,抛出空指针异常。如果队列已满,则丢弃当前元素,返回false,它是 非阻塞的 。如果队列空闲则插入成功返回true。
public boolean offer(E e) { //为空直接抛空指针 if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //如果当前队列满了的话,直接返回false if (count.get() == capacity) return false; int c = -1; //构造新节点 Node<E> node = new Node<E>(e); 获取put独占锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { //判断队列是否已满 if (count.get() < capacity) { //进队列 enqueue(node); //递增元素计数 c = count.getAndIncrement(); //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程 if (c + 1 < capacity) notFull.signal(); } } finally { //释放锁 putLock.unlock(); } //如果容量为0,则 if (c == 0) //激活 notEmpty 的条件队列,唤醒被阻塞的线程 signalNotEmpty(); return c >= 0; } 复制代码
private void enqueue(Node<E> node) { //从尾节点加进去 last = last.next = node; } 复制代码
为了形象生动,我们用一张图来看看往队列里依次放入元素A和元素B。图片参考来源 【细谈Java并发】谈谈LinkedBlockingQueue
private void signalNotEmpty() { //获取take独占锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //唤醒notEmpty条件队列里被阻塞的线程 notEmpty.signal(); } finally { //释放锁 takeLock.unlock(); } } 复制代码
put方法也是向队列尾部插入一个元素。如果元素为null,抛出空指针异常。如果队列己满则阻塞当前线程,直到队列有空闲插入成功为止。如果队列空闲则插入成功,直接返回。如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。
public void put(E e) throws InterruptedException { ////为空直接抛空指针异常 if (e == null) throw new NullPointerException(); int c = -1; // 构造新节点 Node<E> node = new Node<E>(e); //获取putLock独占锁 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取独占锁,它跟lock的区别,是可以被中断 putLock.lockInterruptibly(); try { //队列已满线程挂起等待 while (count.get() == capacity) { notFull.await(); } //进队列 enqueue(node); //递增元素计数 c = count.getAndIncrement(); //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程 if (c + 1 < capacity) notFull.signal(); } finally { //释放锁 putLock.unlock(); } //如果容量为0,则 if (c == 0) //激活 notEmpty 的条件队列,唤醒被阻塞的线程 signalNotEmpty(); } 复制代码
从队列头部获取并移除一个元素, 如果队列为空则返回 null, 该方法是不阻塞的。
public E poll() { final AtomicInteger count = this.count; //如果队列为空,返回null if (count.get() == 0) return null; E x = null; int c = -1; //获取takeLock独占锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //如果队列不为空,则出队,并递减计数 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); ////容量大于1,则激活 notEmpty 的条件队列,唤醒被阻塞的线程 if (c > 1) notEmpty.signal(); } } finally { //释放锁 takeLock.unlock(); } if (c == capacity) //唤醒notFull条件队列里被阻塞的线程 signalNotFull(); return x; } 复制代码
//出队列 private E dequeue() { //获取head节点 Node<E> h = head; //获取到head节点指向的下一个节点 Node<E> first = h.next; //head节点原来指向的节点的next指向自己,等待下次gc回收 h.next = h; // help GC // head节点指向新的节点 head = first; // 获取到新的head节点的item值 E x = first.item; // 新head节点的item值设置为null first.item = null; return x; } 复制代码
为了形象生动,我们用一张图来描述出队过程。图片参考来源 【细谈Java并发】谈谈LinkedBlockingQueue
private void signalNotFull() { //获取put独占锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { ////唤醒notFull条件队列里被阻塞的线程 notFull.signal(); } finally { //释放锁 putLock.unlock(); } } 复制代码
获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。 该方法是不 阻塞的。
public E peek() { //队列容量为0,返回null if (count.get() == 0) return null; //获取takeLock独占锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; //判断first是否为null,如果是直接返回 if (first == null) return null; else return first.item; } finally { //释放锁 takeLock.unlock(); } } 复制代码
获取当前队列头部元素并从队列里面移除它。 如果队列为空则阻塞当前线程直到队列 不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; //获取takeLock独占锁 final ReentrantLock takeLock = this.takeLock; //获取独占锁,它跟lock的区别,是可以被中断 takeLock.lockInterruptibly(); try { //当前队列为空,则阻塞挂起 while (count.get() == 0) { notEmpty.await(); } //)出队并递减计数 x = dequeue(); c = count.getAndDecrement(); if (c > 1) //激活 notEmpty 的条件队列,唤醒被阻塞的线程 notEmpty.signal(); } finally { //释放锁 takeLock.unlock(); } if (c == capacity) //激活 notFull 的条件队列,唤醒被阻塞的线程 signalNotFull(); return x; } 复制代码
删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。
public boolean remove(Object o) { //为空直接返回false if (o == null) return false; //双重加锁 fullyLock(); try { //边历队列,找到元素则删除并返回true for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { //执行unlink操作 unlink(p, trail); return true; } } return false; } finally { //解锁 fullyUnlock(); } } 复制代码
void fullyLock() { //putLock独占锁加锁 putLock.lock(); //takeLock独占锁加锁 takeLock.lock(); } 复制代码
void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; if (last == p) last = trail; //如果当前队列满 ,则删除后,也不忘记唤醒等待的线程 if (count.getAndDecrement() == capacity) notFull.signal(); } 复制代码
void fullyUnlock() { //与双重加锁顺序相反,先解takeLock独占锁 takeLock.unlock(); putLock.unlock(); } 复制代码
基本流程
获取当前队列元素个数。
public int size() { return count.get(); } 复制代码
由于进行出队、入队操作时的 count是加了锁的,所以结果相比ConcurrentLinkedQueue 的 size 方法比较准确。
Java并发编程之美中,有一张图惟妙惟肖描述了它,如下图: