前言
前面四节学完了AQS最难的两种重入锁应用,下面两节进入实战学习,看看JUC包中其他的工具类是如何运用AQS实现特定功能的。今天一起看一下CountDownLatch。
CountDownLatch可以用来实现多个线程执行完一个功能后让另一个线程继续执行的功能。常见的场景比如大文件的处理,我们需要对一个或多个文件进行处理,处理完之后再统一入库,这时我们就可以用到CountDownLatch了。
一、使用样例
1 public static void main(String[] args) { 2 // 指定初始容量 3 CountDownLatch latch = new CountDownLatch(3); 4 // 启动三个线程,每个线程独自处理文件 5 for (int i = 0;i < 3; i++) { 6 new Thread(() -> { 7 System.out.println(Thread.currentThread().getName() + " 正在处理文件"); 8 try { 9 Thread.sleep(2000); 10 } catch (InterruptedException e) { 11 e.printStackTrace(); 12 } 13 System.out.println(Thread.currentThread().getName() + " 处理完毕"); 14 latch.countDown(); 15 }).start(); 16 } 17 try { 18 latch.await(); 19 } catch (Exception e) { 20 e.printStackTrace(); 21 } 22 System.out.println("所有文件处理完成后,统一入库"); 23 }
运行结果:
1 Thread-0 正在处理文件 2 Thread-2 正在处理文件 3 Thread-1 正在处理文件 4 Thread-0 处理完毕 5 Thread-2 处理完毕 6 Thread-1 处理完毕 7 所有文件处理完成后,统一入库
效果就是这样,下面我们一起看看它是如何实现的这种功能。
二、源码学习
1 public CountDownLatch(int count) { 2 if (count < 0) throw new IllegalArgumentException("count < 0"); 3 this.sync = new Sync(count); 4 }
继续追踪可以发现,就是将count赋值给AQS中的成员变量state,表示已经有3个线程占用了锁。
1 public void countDown() { 2 sync.releaseShared(1); 3 }
可以看到,countDown走的是释放共享锁的逻辑,从给state赋值也可以猜到用的是共享锁-有多个线程且state可赋大于0的值。继续看releaseShared逻辑:
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) { 3 doReleaseShared(); 4 return true; 5 } 6 return false; 7 }
可以看到就是读锁释放的逻辑,其中doReleaseShared方法实现逻辑相同就不看了,不同的是tryReleaseShared方法,下面跟进:
1 protected boolean tryReleaseShared(int releases) { 2 // Decrement count; signal when transition to zero 3 for (;;) { 4 int c = getState(); 5 if (c == 0) 6 return false; 7 int nextc = c-1; 8 if (compareAndSetState(c, nextc)) 9 return nextc == 0; 10 } 11 }
此方法在CountDownLatch中的内部类Sync中得到实现,逻辑为将state-1,并且如果是0的话返回true。返回true后在releaseShared方法中会进入if里面,走唤醒后续节点的逻辑doReleaseShared方法,在该方法中唤醒的main线程。main线程什么时候被挂起的?且看下面。
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 }
await调用了可响应中断的获取共享锁方法,继续查看:
1 public final void acquireSharedInterruptibly(int arg) 2 throws InterruptedException { 3 if (Thread.interrupted()) 4 throw new InterruptedException(); 5 if (tryAcquireShared(arg) < 0) 6 doAcquireSharedInterruptibly(arg); 7 }
此方法是AQS中的公用模板方法,不同点在于各实现类的实现逻辑,在CountDownLatch中对tryAcquireShared方法进行了实现,实现逻辑如下:
1 protected int tryAcquireShared(int acquires) { 2 return (getState() == 0) ? 1 : -1; 3 }
即如果state==0则能获取到锁,否则获取不到。获取不到进入下面的doAcquireSharedInterruptibly方法,最终会将head的waitStatus设置为-1,自己挂起等待唤醒。
三、总结
CountDownLatch是基于共享锁实现的并发控制功能,现在对总的实现逻辑做个梳理:首先在构造器初始化CountDownLatch的时候,就会给AQS中的state赋值,表示共享锁已经被获取了N次;然后每执行一次countDown则共享锁释放一次,直到释放完;await方法是加锁的逻辑,但加锁条件是state==0时才会加锁成功,否则挂起;最后,当通过countDown的调用将state减为0后,会唤醒处于阻塞状态的主线程,让其 获取到锁并执行。