CountDownLatch闭锁相当于一扇门,在闭锁到达结束状态之前,这扇门 一直是关闭的 ,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态, 门永远保持打开状态
CountDownLatch通过内部类Sync实现方法,sync继承AQS重写模板中的方法。sync内部定义:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } /** * 获取同步状态 */ int getCount() { return getState(); } /** * 获取同步状态 */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /** * 释放同步状态 */ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } 复制代码
从源码中重写的方法可以得知,CountDownLatch中的sync采用 共享模式 。CountDownLatch示例:
public class TestHarness { public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { @Override public void run() { try { startGate.await(); try { System.out.println(Thread.currentThread().getName() + "开始执行"); task.run(); } finally { endGate.countDown(); System.out.println(Thread.currentThread().getName() + "执行结束"); } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); System.out.println("所有线程执行完毕,耗时:" + (end-start)); return end - start; } public static void main(String[] args) throws InterruptedException { System.out.println(timeTasks(10, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "————————work"); } })); } } 复制代码
运行结果:
Thread-0开始执行 Thread-3开始执行 Thread-0————————work Thread-0执行结束 Thread-1开始执行 Thread-2开始执行 Thread-7开始执行 Thread-7————————work Thread-7执行结束 Thread-9开始执行 Thread-9————————work Thread-9执行结束 Thread-8开始执行 Thread-8————————work Thread-8执行结束 Thread-2————————work Thread-2执行结束 Thread-6开始执行 Thread-1————————work Thread-6————————work Thread-6执行结束 Thread-5开始执行 Thread-5————————work Thread-5执行结束 Thread-3————————work Thread-3执行结束 Thread-4开始执行 Thread-1执行结束 Thread-4————————work Thread-4执行结束 所有线程执行完毕,耗时:2794976 2794976 复制代码
相对于CountDownLatch是 一次性对象,一旦进入终止状态,就不能被重置 ,CyclicBarrier可以反复使用。CyclicBarrier类似于闭锁,与 闭锁的关键区别在于,闭锁用于等待事件,栅栏用于等待其他线程 ,其作用是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } 复制代码
参数parties指栅栏拦截的线程数量
参数barrierAction指当这些线程都到达栅栏时优先会执行的线程
await()方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { final Generation g = generation; // 若栅栏处于断开状态,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 若线程中断,断开CyclicBarrier if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // count为0表明所有线程到达栅栏位置 if (index == 0) { // tripped boolean ranAction = false; try { // 若初始化时指定了所有线程到达栅栏时的任务,执行它 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 唤醒所有等待线程,开始新的generation nextGeneration(); return 0; } finally { // 若任务执行异常,断开CyclicBarrier if (!ranAction) breakBarrier(); } } // 循环所有线程到达栅栏或栅栏断开或线程中断或超时 for (;;) { try { // 一直等待 if (!timed) trip.await(); // 限时等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 若线程中断且栅栏没有断开,断开CyclicBarrier if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; // 若等待超时,断开CyclicBarrier if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放锁 lock.unlock(); } } 复制代码
其主要逻辑:若有线程未到达栅栏位置,到达栅栏位置的线程一直等待状态,直至发生以下场景:
①. 所有线程都到达栅栏位置
②. 有线程被中断
③. 线程等待超时
④. 有线程调用reset()方法,断开当前栅栏,将栅栏重置为初始状态
reset方法:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { // 断开当前栅栏 breakBarrier(); // break the current generation // 开始新的generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } 复制代码
public class CyclicBarrierTest { private static CyclicBarrier cyclicBarrier; static class CyclicBarrierThread extends Thread{ public void run() { System.out.println("运动员:" + Thread.currentThread().getName() + "到场"); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args){ cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("运动员全部到齐,比赛开始"); } }); for(int i = 0 ; i < 5 ; i++){ new CyclicBarrierThread().start(); } } } 复制代码
运动员:Thread-0到场 运动员:Thread-1到场 运动员:Thread-2到场 运动员:Thread-3到场 运动员:Thread-4到场 运动员全部到齐,比赛开始 复制代码
①.CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次
②.CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断
③.CountDownLatch倾向于一个线程等多个线程,CyclicBarrier倾向于多个线程互相等待