在上一篇文章中给大家介绍了牛批的AQS,大致讲解了JUC中同步的思路。本来还没想好这一篇应该写点什么,刚好上周某个同事的代码出现问题,排查后发现是使用阻塞队列不当导致的,所以本篇决定介绍下阻塞队列。
说来也是挺巧的,那天一位同事iMac换了Macbook Pro。然后像往常一样启动了各个服务,过了会电脑风扇疯狂工作发出响声,由于平常iMac上IDEA项目开的比较多占用较多内存时间长了也会卡顿,所以他并没有在意。但是之后一直是这样我们便觉得很奇怪,然后打开了他的活动监视器,发现某个Java进程竟然占用了百分之九十的CPU,然后确认是哪一个项目,最后通过jstack查看该项目中的线程情况,定位到了某个自定义线程,然后查看代码发现如下:
MyThreadPool.exportEnclosurePool.execute(() -> { while (true) { BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue(); while (!blockingQueue.isEmpty()) { System.out.println("开始消费"); EnclosureRequest one = null; try { one = blockingQueue.take(); ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList()); } catch (Exception e) { e.printStackTrace(); } } } } 复制代码
该同事的需求是做一个队列化附件导出的功能,因此他选择了生产者消费者模式,采用阻塞队列来实现;但是由于对此不太熟悉,所以写出了这段有问题的代码,导致死循环;万幸的是这段代码在测试分支上被我们发现了并没有上正式。正确的消费者代码实现如下:
MyThreadPool.exportEnclosurePool.execute(() -> { BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue(); while (true) { try { EnclosureRequest one = blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("开始消费"); ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList()); } } 复制代码
阻塞队列是一个插入和移除方法支持附加操作的队列;
四种处理方式:
处理方式/方法 | 插入方法 | 移除方法 |
---|---|---|
抛出异常 | add(e) | remove() |
返回boolean值 | offer(e) | poll() |
阻塞 | put(e) | take() |
超时退出 | offer(e,time,unit) | poll(time,unit) |
:bulb:小提示: 如果是无界阻塞队列,队列不可能出现满的情况,所以使用put()方法永远不会被阻塞,使用offer()方法永远返回true
LinkedBlockingQueue是一个由成员变量Node组成的单链表结构,默认容量为Integer的最大值,其内部还有两把ReentrantLock锁putLock、takeLock用于保证插入和删除的线程安全(其他阻塞队列中使用一个ReentrantLock锁),两个Condition等待队列notEmpty、notFull用于存放take()和put()阻塞的线程。这里我简单分析下它两个比较重要的方法put()和take()。
/** * 由Node节点组成单链表结构 */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 用于移除操作的锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 阻塞于take的等待队列 */ private final Condition notEmpty = takeLock.newCondition(); /** 用于插入操作的锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 阻塞于put的等待队列 */ private final Condition notFull = putLock.newCondition(); /** * 不指定容量默认是Integer的最大值 */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * 阻塞式插入元素(队列为满则阻塞) */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 获取插入锁(响应中断) putLock.lockInterruptibly(); try { // 如果当前队列长度到达容量上限则当前线程释放锁加入不为满等待队列中 while (count.get() == capacity) { notFull.await(); } // 将元素加入队尾 enqueue(node); // 当前队列长度加一(返回值是加一之前) c = count.getAndIncrement(); // 如果加入后队列长度小于容量上限则通知不为满等待队列中的线程 if (c + 1 < capacity) notFull.signal(); } finally { // 释放锁 putLock.unlock(); } // 如果在插入元素之前队列为空则通知不为空等待队列中的线程 if (c == 0) signalNotEmpty(); } /** * 阻塞式移除元素(队列为空则阻塞) */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取移除锁(响应中断) takeLock.lockInterruptibly(); try { // 如果当前队列为空则当前线程释放锁加入不为空等待队列 while (count.get() == 0) { notEmpty.await(); } // 移除队头元素 x = dequeue(); c = count.getAndDecrement(); // 如果移除之后还有元素则通知不为空等待队列中的线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 如果移除元素之前到达容量上线则通知不为满等待队列中的线程 if (c == capacity) signalNotFull(); return x; } 复制代码
需要注意的是put()操作将元素加入队列后释放锁是在判断容量是否小于上限通知notFull等待队列之后,通知notEmpty队列之前需要先获取takeLock,take()操作同理。
:bulb:小提示: LinkedBlockingQueue的put()和take()方法中和其他阻塞队列有个很大的区别。其他阻塞队列每次put()和take()都会去通知相应的等待队列,但是LinkedBlockingQueue只有在put前是空的去通知notEmpty,take前是满的去通知notFull等待队列,并且put后未满去通知notFull等待队列,take后未空去通知notEmpty等待队列。关于这点我个人的理解是由于LinkedBlockingQueue里分读写锁,如果每次take都通知notFull的话,需要另外去获取putLock产生竞争;用已经获取putLock的线程去唤醒notFull等待队列中线程减少了锁的竞争。其他阻塞队列中只有一把锁,所以通知不需要另外竞争锁。当然这只是我个人的看法而已,希望有了解的小伙伴指教。