CountDownLatch相当于一个门闩,门闩上挂了N把锁。只有N把锁都解开的话,门才会打开。怎么理解呢?我举一个赛跑比赛的例子,赛跑比赛中必须等待所有选手都准备好了,裁判才能开发令枪。选手才可以开始跑。CountDownLatch当中主要有两个方法,一个是await()会挂上锁阻塞当前线程,相当于裁判站在起始点等待,等待各位选手准备就绪,一个是countDown方法用于解锁,相当于选手准备好了之后调用countDown方法告诉裁判自己准备就绪,当所有人都准备好了之后裁判开发令枪。
public class TestCountDownLatch { public static void main(String[] args) { // 需要等待两个线程,所以传入参数为2 CountDownLatch latch = new CountDownLatch(2); // 该线程运行1秒 new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1号选手准备就绪!用时1秒!"); latch.countDown(); } }).start(); // 该线程运行3秒 new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("2号选手准备就绪!用时3秒!"); latch.countDown(); } }).start(); try { System.out.println("请1号选手和2号选手各就各位!"); // 主线程在此等待两个线程执行完毕之后继续执行 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 两个线程执行完毕后,主线程恢复运行 System.out.println("裁判发枪,1号选手和2号选手开跑!"); } }
请1号选手和2号选手各就各位! 1号选手准备就绪!用时1秒! 2号选手准备就绪!用时3秒! 裁判发枪,1号选手和2号选手开跑!
如果去掉CountDownLatch的效果呢?运行结果就会变成如下:
请1号选手和2号选手各就各位! 裁判发枪,1号选手和2号选手开跑! 1号选手准备就绪!用时1秒! 2号选手准备就绪!用时3秒!
裁判就会在选手还未准备就绪的时候开发令枪,这就乱套了。
其实CountDownLatch一个最简单的用处就是计算多线程执行完毕时的时间。像刚才的例子当中两个线程并行执行了共花费了3秒钟。
CyclicBarrier就像一个栅栏,将各个线程拦住。Cyclic是循环的英文,表明该工具可以进行循环使用。CyclicBarrier(N)的构造参数表明该一共有几个线程需要互相等待。它相当于N个选手约定进行多次比赛,每次比赛完都要在起跑点互相等待。读者可能会马上疑惑这不是和CountDownLatch一样吗?不一样。因为CountDownLatch是裁判等待选手,是调用await()方法的线程,等待调用countDown()方法的各个线程。而CyclicBarrier是选手等待选手,是调用await()方法的线程互相等待,等待其他线程都运行好之后,再开始下一轮运行。
我们举一个例子,两个选手进行比赛,一共进行三轮比赛。
public class TestCyclicBarrier { // 1号选手跑的轮数 public static int countA = 1; // 2号选手跑的轮数 public static int countB = 1; public static void main(String[] args) { // 填入2,代表2个线程互相等待 CyclicBarrier barrier = new CyclicBarrier(2); new Thread(new Runnable() { @Override public void run() { // 一共跑三轮 for (int i = 0; i < 3; i++) { System.out.println("1号选手开始跑!当前第" + countA++ + "轮比赛!"); // 1号选手跑得慢,每次跑三秒 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("1号选手抵达终点!"); // 调用等待方法,在此等待其他选手 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { // 一共等待三轮 for (int i = 0; i < 3; i++) { System.out.println("2号选手开始跑!当前第" + countB++ + "轮比赛!"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("2号选手抵达终点!"); // 调用等待方法,在此等待其他选手 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }).start(); } }
1号选手开始跑!当前第1轮比赛! 2号选手开始跑!当前第1轮比赛! 2号选手抵达终点! 1号选手抵达终点! 1号选手开始跑!当前第2轮比赛! 2号选手开始跑!当前第2轮比赛! 2号选手抵达终点! 1号选手抵达终点! 1号选手开始跑!当前第3轮比赛! 2号选手开始跑!当前第3轮比赛! 2号选手抵达终点! 1号选手抵达终点!
每轮比赛1号选手和2号选手都会回到起跑线互相等待,再开启下一轮比赛。
如果不加CyclicBarrier呢?
1号选手开始跑!当前第1轮比赛! 2号选手开始跑!当前第1轮比赛! 2号选手抵达终点! 2号选手开始跑!当前第2轮比赛! 2号选手抵达终点! 2号选手开始跑!当前第3轮比赛! 1号选手抵达终点! 1号选手开始跑!当前第2轮比赛! 2号选手抵达终点! 1号选手抵达终点! 1号选手开始跑!当前第3轮比赛! 1号选手抵达终点!
此时2号选手就直接跑完三轮比赛,不等1号选手了。
Semaphore英文的字面意思是信号量。它的工作机制是每个线程想要获取运行的机会的话,都必须获取到信号量。acquire()方法阻塞的获取信号量,release()释放信号量。举个例子,假设我们去迪士尼游玩,但是迪士尼担心游客很多的话,影响大家的游玩体验,于是规定每个小时只能卖出两张门票。这样就可以控制在游乐园当中的游客数量了。
public class TestSemaphore { public static void main(String[] args) { Semaphore semaphore = new Semaphore(0); System.out.println("顾客在售票处等候中"); new Thread(new Runnable() { @Override public void run() { for (; ; ) { try { Thread.sleep(500); // 等待出票 semaphore.acquire(); System.out.println("顾客拿到门票入场!"); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 3; i++) { try { // 等待一小时再发门票 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 一次性发出两张门票 System.out.println("售票处第" + (i + 1) + "小时售出两张票!"); semaphore.release(); semaphore.release(); } } }).start(); System.out.println("售票处开始售票!"); } }
顾客在售票处等候中... 售票处开始售票! 售票处第1小时售出两张票! 顾客拿到门票入场! 顾客拿到门票入场! 售票处第2小时售出两张票! 顾客拿到门票入场! 顾客拿到门票入场! 售票处第3小时售出两张票! 顾客拿到门票入场! 顾客拿到门票入场!
Exchanger提供了让两个线程互相交换数据的同步点。Exchanger有点像2个线程的CyclicBarrier,线程之间都是互相等待,区别在于Exchanger多了交换的操作。举个例子好比毒贩和吸毒者交易一样,在某个时间点一手交钱一手交货。
public class TestExchanger { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); new Thread(new Runnable() { @Override public void run() { String drug = "毒品"; System.out.println("我是毒贩,我带着" + drug + "过来了!"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("毒贩达到交易地点"); try { System.out.println("我是毒贩,换回了" + exchanger.exchange(drug)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { String money = "一千块钱"; System.out.println("我是吸毒者,我带着" + money + "过来了"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("吸毒者到达交易地点"); try { System.out.println("我是吸毒者,换回了" + exchanger.exchange(money)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }
我是毒贩,我带着毒品过来了! 我是吸毒者,我带着一千块钱过来了 毒贩达到交易地点 吸毒者到达交易地点 我是吸毒者,换回了毒品 我是毒贩,换回了一千块钱