CountdownLatch 是 JDK 并发包中提供的并发工具类,其允许一个或多个线程等待其他线程完成操作。常用作将一个任务拆分成多个子任务同时执行,只有子任务都执行完毕主线程才往下执行。
public class App implements Runnable { private CountDownLatch countDownLatch; public App (CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public static void main( String[] args ) { // 指定同时运行3个子任务 int count = 3; CountDownLatch countDownLatch = new CountDownLatch(count); for (int i = 0; i < count; i++) { new Thread(new App(countDownLatch), "Thread-" + i).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { } System.out.println("count down over !"); } @Override public void run() { try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + " - 执行完毕." ); } catch (InterruptedException e) { } finally { countDownLatch.countDown(); } } } 复制代码
运行结果如下:
Thread-2 - 执行完毕. Thread-0 - 执行完毕. Thread-1 - 执行完毕. count down over ! 复制代码
从结果中可以看出 main 主线程会在 3 个子线程处理完毕之后才继续执行。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 复制代码
CountDownLatch 与其他同步组件一样,内部类 Sync 继承了 AQS,构造的时候会指定子任务个数 count , 也即是同步状态初始值。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 复制代码
接下来看下 sync 获取共享同步状态的实现
protected int tryAcquireShared(int acquires) { // state == 0 的时候返回 1,反之返回 -1 // state != 0 说明还有子任务未处理完 return (getState() == 0) ? 1 : -1; } 复制代码
从实现可以看出 await() 方法执行时,当子任务未处理完毕时(state != 0),调用线程会被添加到同步队列而阻塞等待。
public void countDown() { sync.releaseShared(1); } 复制代码
接下来看下 sync 的共享同步状态值释放
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 每次释放的时候,也就是子任务完成的时候计数值减一 int nextc = c-1; // 更新 state 值 if (compareAndSetState(c, nextc)) // 子任务均处理完毕后,返回 true; 也就是真正的释放 // 将唤醒阻塞在同步队列的线程 return nextc == 0; } } 复制代码
从实现可以看出,每次子任务在调用 countDown 时,会将同步状态值减一,当所有子任务均完成时 (state = 0) 此时会唤醒阻塞在同步队列的节点。
子任务在进行 countDown 操作时,最好是在 finally 块处理; 避免出现子任务处理异常,导致主线程一直阻塞的问题。