CountDownLatch是Java并发包下的一个工具类,latch是门闩的意思,顾名思义,CountDownLatch就是有一个门闩挡住了里面的人(线程)出来,当count减到0的时候,门闩就打开了,人(线程)就可以出来了。下面从源码的角度看看CountDownLatch究竟是如何实现的。
CountDownLatch类中有一个静态内部类 Sync ,它继承自 AbstractQueuedSynchronizer ,所以可以看出,CountDownLatch的功能还是通过AQS来实现的。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 复制代码
可以看出要使用CountDownLatch就需要指定count值,且必须大于0,而这个count值最终是赋值给了AQS的state,可以看下面 new Sync(count)的源码,它实际上是set的state值,而这个state是AQS中的属性
Sync(int count) { setState(count); } 复制代码
它是调用内部类Sync的releaseShared方法,这个方法会先去通过cas的方式修改state值,如果state修改之前就是0或者修改之后不等于0,那就什么都不需要操作了;如果修改之后state=0,那么就去执行doReleaseShared方法。
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // state=0或者修改之后state<>0,返回false for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; //修改之后state值=0,返回true if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
//这个方法countDown会调用,await方法在被唤醒后也会调用doReleaseShared private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果状态是signal(-1),cas的方式把它改为0 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //cas成功修改的话,则去唤醒h的下一个节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果h和head相同了,则跳出循环 if (h == head) // loop if head changed break; } } 复制代码
await方法是调用该方法的线程处于等待状态(state>0),下面从源码分心一下await方法是如何实现的。
await方法最终调用到了AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //如果state=0,则tryAcquireShared方法返回1,则不用等待(也就验证了countdown减到0,才释放线程) if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 复制代码
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //把新node加到node的尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //如果node的prev是head的话 if (p == head) { //看state是否=0,如果等于0,则r=1,否则r=-1 int r = tryAcquireShared(arg); if (r >= 0) { //state=0的时候,把node置为head,然后去唤醒head的下一个节点 //setHeadAndPropagate方法参照下面的解析 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //如果p的status是signal(-1)的话,则执行parkAndCheckInterrupt方法,将该线程挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //调用doReleaseShared,去唤醒新head的下一个节点 doReleaseShared(); } } 复制代码
下面举个例子说明CountDownLatch的用法,而且如果大家是想通过debug的方式跟踪CountDownLatch是如何实现的,那么在断点处的suspend一定要改为Thread,因为在await的时候,线程挂起,而在countDown的时候,首先把head的next节点(暂时称作A节点)唤醒,而此时A节点在await挂起的线程就被唤醒了,继续往下执行,由于await方法会调用setHeadAndPropagate方法,setHeadAndPropagate方法会调用doReleaseShared(countDown也是调用这个方法唤醒线程),所以除了调用countDown的线程,被唤醒的线程也会去唤醒它的下一个节点,所以doReleaseShared方法是被多线程调用的,因此在debug的时候一定要把suspend改为Thread才能看到效果。
public void test() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; i++) { new Thread(() -> { try { System.out.println("子线程" + Thread.currentThread().getName() + "await 前"); countDownLatch.await(); System.out.println("子线程" + Thread.currentThread().getName() + "await 后"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } Thread.sleep(30000); countDownLatch.countDown(); System.out.println("完成"); } 复制代码