假设有5个人约好一起去旅行,那么一般来说有2种组织出行方式,一种是自己组织自由行,另一种是跟团旅行。
在Java的JUC工具包中,有两个工具类可以类比这两种旅行方式,分别是CyclicBarrier和CountDownLatch。
旅行出发当天,5个人(按照约好的时间和地点)相互等待。先到的人,等待未到的人,一旦人齐了,集体成员就同时出发。
有多个工作者线程,其中先准备好的线程,阻塞等待(await())未准备好的线程。一旦所有线程都准备好了,各个线程就同时执行任务。
CyclicBarrier一般应用于对公平性要求比较高,特别是需要同时执行的场景。
游戏类有很多适合的场景。例如剪子石头布比输赢,掷骰子比大小。
以掷骰子为例,先点击开始按钮一方的投掷线程,将会等待其他投掷线程。一旦所有游戏方都点击了开始按钮,所有投掷线程就同时运行并投掷出骰子。
旅行出发或结束当天,司机(按照约好的时间和地点)等待5名团员。每名团员到达后都进行签到(意味着未到人数减一),一旦人齐了,司机就立马开车。
有一到多个等待者线程,等待(await())一到多个被等待者线程做好准备或完成任务(countDown())。一旦所有被等待者线程都准备好或完成任务了,等待者线程就马上执行任务。
由于CountDownLatch多了一个countDown()方法,可以更灵活的使用。因此,其应用场景也相对广泛一些。
通常来说,CountDownLatch更适合用于等待一批线程准备好或完成指定任务,然后再进行下一步操作的场景。也就是说,等待者线程与被等待者线程之间存在 先后依赖 的执行顺序。
数据聚合。启动多个线程去抓取网页数据、查询接口数据或执行计算任务,另一个线程等待前者完成后,进行数据聚合相关操作。
投票选举主节点。候选节点启动多个线程,请求其他节点同意选自己为主节点。候选节点等待所有线程返回投票结果,如果收到半数以上同意选票则提升自己为主节点。
public class CyclicBarrierTest1 { private static int SIZE = 5; private static CyclicBarrier cb; public static void main(String[] args) { cb = new CyclicBarrier(SIZE); // 新建5个任务 for (int i = 0; i < SIZE; i++) new InnerThread().start(); } static class InnerThread extends Thread { public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); Thread.sleep(2000); // 将cb的参与者数量加1 cb.await(); // cb的参与者数量等于5时,才继续往后执行 System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 执行结果: Thread-1 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-0 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. Thread-3 continued. Thread-4 continued. Thread-0 continued. Thread-2 continued. Thread-1 continued. 复制代码
CyclicBarrier支持在所有线程都准备好之后,执行特定的操作(构造函数的第2个参数指定的操作)。
public class CyclicBarrierTest2 { private static int SIZE = 5; private static CyclicBarrier cb; public static void main(String[] args) { cb = new CyclicBarrier(SIZE, new Runnable() { public void run() { System.out.println("CyclicBarrier's parties is: " + cb.getParties()); } }); // 新建5个任务 for (int i = 0; i < SIZE; i++) new InnerThread().start(); } static class InnerThread extends Thread { public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); Thread.sleep(2000); // 将cb的参与者数量加1 cb.await(); // cb的参与者数量等于5时,才继续往后执行 System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 执行结果: Thread-2 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-1 wait for CyclicBarrier. Thread-0 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. CyclicBarrier's parties is: 5 Thread-4 continued. Thread-2 continued. Thread-3 continued. Thread-0 continued. Thread-1 continued. 复制代码
public class CountDownLatchTest1 { private static int LATCH_SIZE = 5; private static CountDownLatch doneSignal; public static void main(String[] args) { try { doneSignal = new CountDownLatch(LATCH_SIZE); // 创建5个任务,即被等待线程 for (int i = 0; i < LATCH_SIZE; i++) new InnerThread().start(); new Thread(new Runnable() { public void run() { System.out.println("submain await begin."); try { // 子线程等待 doneSignal.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("submain await finished."); } }).start(); // "主线程"等待线程池中5个任务的完成 System.out.println("main await begin."); doneSignal.await(); System.out.println("main await finished."); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 被等待线程 * */ static class InnerThread extends Thread { public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); // 线程签到,计数器减1 doneSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 执行结果: main await begin. submain await begin. Thread-2 sleep 1000ms. Thread-0 sleep 1000ms. Thread-3 sleep 1000ms. Thread-1 sleep 1000ms. Thread-4 sleep 1000ms. main await finished. submain await finished. 复制代码
想像一种场景,当线程即是等待者线程,同时又是被等待者线程时,会是什么效果。答案是,可以实现CyclicBarrier的相互等待,线程同时执行的效果。
public class CountDownLatchTest3 { private static int SIZE = 5; private static CountDownLatch doneSignal; public static void main(String[] args) { doneSignal = new CountDownLatch(SIZE); // 新建5个任务 for (int i = 0; i < SIZE; i++) new InnerThread().start(); } static class InnerThread extends Thread { public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CountDownLatch."); Thread.sleep(2000); // 签到,计数器减1 doneSignal.countDown(); // 等待所有线程就绪 doneSignal.await(); // cb的参与者数量等于5时,才继续往后执行 System.out.println(Thread.currentThread().getName() + " continued."); } catch (InterruptedException e) { e.printStackTrace(); } } } } 执行结果: Thread-0 wait for CountDownLatch. Thread-3 wait for CountDownLatch. Thread-1 wait for CountDownLatch. Thread-4 wait for CountDownLatch. Thread-2 wait for CountDownLatch. Thread-2 continued. Thread-1 continued. Thread-4 continued. Thread-0 continued. Thread-3 continued. 复制代码
因此,如果要实现多个线程同时开始执行,可以使用CyclicBarrier或CountDownLatch来实现,前者显然更加直接明了。同时,还可以使用锁(synchronized、Lock)结合计数器进行实现。思路都是类似的:线程先相互等待,如果都就绪了,就开始执行。
最后,还要再说明一下,文章中提到的 旅行模式 ,其实是我为了方便理解记忆而引入的一个概念,重在理解。
题图:timgsa.baidu.com