同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。在容器中,有些也可以作为同步工具类,其它类型的同步工具类还包括闭锁(Latch)、信号量(Semaphore)以及)栅栏(Barrier)。阻塞队列(eg: BlockQueue)是一种独特的类:它们不仅能作为保存对象的容器,还能协调生产者和消费者之间的控制流,因为它提供的 take
和 put
等方法将会阻塞,直到队列达到期望的状态。所有的同步工具类都包含一些特定的属性:它们封装了一些状态,这些状态将决定同步工具类的线程是继续执行还是等待,此外还提供了一些方法对其状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有线程通过。当闭锁到达结束状态后,将不会再次改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其它活动都完成后才继续执行。比如:
CountDownLatch
是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待事件的数量。 countDown()
方法递减计数器,表示有一个事件已经发生了,而 await()
方法等待计数器达到 0 ,这表示所有需要等待的事件都已经发生。如果计数器的值非 0 ,那么 await()
方法会一直阻塞到计数器的值为 0 ,或者等待线程中断,或者等待超时。
CountDownLatch
被用来同步一个或多个任务,强制它们等待由其它任务执行的一组操作完成。你可以向 CountDownLatch
对象设置一个初始计数值,任何在这个对象上调用 await()
的方法都将阻塞,直到这个计数值到达 0。其它任务在结束工作时,可以在该对象上调用 countDown()
方法来减小这个计数值。 CountDownLatch
被设计为只触发一次,计数值不能重置。如果你需要重置计数值的版本,请看下文的 CyclicBarrier
。把大象放入冰箱的例子:
/** * @author mghio * @date: 2019-11-03 * @version: 1.0 * @description: 同步工具类 —— CountDownLatch * @since JDK 1.8 */ public class CountDownLatchDemo { private static CountDownLatch countDownLatch1 = new CountDownLatch(1); private static CountDownLatch countDownLatch2 = new CountDownLatch(1); public static void main(String[] args) { final Thread thread1 = new Thread(() -> { System.out.println("step 1:打开冰箱门..."); // 对 countDownLatch1 倒计时 -1 countDownLatch1.countDown(); }); final Thread thread2 = new Thread(() -> { try { // 等待 countDownLatch1 倒计时,计时为 0 则往下运行 countDownLatch1.await(); System.out.println("step 2:把大象放入冰箱..."); // 对 countDownLatch2 倒计时 -1 countDownLatch2.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); final Thread thread3 = new Thread(() -> { try { // 对 countDownLatch2 倒计时,计时为 0 则往下进行 countDownLatch2.await(); System.out.println("step 3:关上冰箱门..."); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("------- 把大象放入冰箱 --------"); thread3.start(); thread1.start(); thread2.start(); } }
以上代码输出结果:
------- 把大象放入冰箱 -------- step 1:打开冰箱门... step 2:把大象放入冰箱... step 3:关上冰箱门...
FutureTask
也可以用作闭锁。它实现了 Future
的语义,表示一种抽象可生成结果的计算。 FutureTask
表示的计算是通过 Callable
来实现的,相当于一种可生成结果的 Runnable
,并且可以处于这三种状态: 等待运行(Waiting to run)
、 正在运行(Running)
和 运行完成(Completed)
。其中 执行完成
表示计算的所有可能结束方式,包括正常结束、由于取消结束和由于异常结束等。当 FutureTask
进入完成状态后,它就会永远停在这个状态上。 get()
方法的行为取决于任务的状态。如果此时任务已经完成,那么 get()
方法会立即返回结果,否则将会阻塞直到任务进入到完成状态,然后返回结果或者抛出异常。 FutureTask
将计算结果从执行计算的线程传递到获取这个结果的线程,而 FutureTask
的规范确保了这种传递过程能实现结果的安全发布。
FutureTask
在 Executor
框架中表示异步任务,除此之外还可以用来表示一些耗时比较长的计算,这些计算可以在使用计算结果之前启动。以下示例使用其执行一个异步任务:
/** * @author mghio * @date: 2019-11-03 * @version: 1.0 * @description: 同步工具类 —— FutureTask * @since JDK 1.8 */ public class FutureTaskDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("--------- 进入主线程执行任务"); ExecutorService threadPool = Executors.newCachedThreadPool(); System.out.println("--------- 提交异步任务"); FutureTask<String> future = new FutureTask<>(() -> "成功获取 future 异步任务结果"); threadPool.execute(future); System.out.println("--------- 提交异步任务之后,立马返回到主线程继续往下执行"); Thread.sleep(1000); System.out.println("--------- 此时需要获取上面异步任务的执行结果"); boolean flag = true; while (flag) { if (future.isDone() && !future.isCancelled()) { String futureResult = future.get(); System.out.println("--------- 异步任务返回的结果是:" + futureResult); flag = false; } } if (!threadPool.isShutdown()) { threadPool.shutdown(); } } }
以上代码输出结果为:
--------- 进入主线程执行任务 --------- 提交异步任务 --------- 提交异步任务之后,立马返回到主线程继续往下执行 --------- 此时需要获取上面异步任务的执行结果 --------- 异步任务返回的结果是:成功获取 future 异步任务结果
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行指定操作的数量。计数信号量还可以用来实现某种资源池或者对容器施加边界。 Semaphore
中管理着一组虚拟的许可( permit
),许可的初始数量可以通过构造函数来指定,在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire()
将阻塞直到有许可或者直到终端或者直到超时。 release()
方法将返回一个许可给信号量。 Semaphore
可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将 Semaphore
的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用 acquire()
方法获取一个许可,在将资源返回给池之后调用 release()
方法释放许可,那么 acquire()
方法将一直阻塞直到资源池不为空。以下示例将使用 Semaphore
将 HashSet
容器变成有界的阻塞容器,信号量的计数值会初始化为容器容量的最大值。 add
操作在向底层容器添加一个元素之前,首先要获取一个许可。如果 add
操作没有添加任何元素,那么会立刻释放许可。同样 remove
操作会释放一个许可,使更多的元素能够添加到容器中。底层的 Set
实现并不知道关于边界的任何信息。
/** * @author maguihai * @date: 2019-11-03 * @version: 1.0 * @description: 同步工具类 —— Semaphore * @since JDK 1.8 */ public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<>()); this.sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) { sem.release(); } } } public boolean remove(T o) { boolean wasRemoved = set.remove(o); if (wasRemoved) { sem.release(); } return wasRemoved; } }
我们已经看到通过 闭锁
来启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于:所有线程都必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人 6:00 在 KFC 碰头,到了以后要等其它人,之后再讨论下一步要做的事情”。 CyclicBarrier
适用于这样的情况:你希望创建一组任务,他们并行执行工作,然后再运行下一个步骤之前等待,知道所有任务都完成(有点儿像线程的 join
方法)。它使得所有的并行任务都将处于栅栏处列队,因此可以一致的向前移动。这和上文的 CountDownLatch
非常像,只是 CountDownLatch
只是触发一次的事件,而 CyclicBarrier
可以重复使用。
CyclicBarrier
可以使一定数量的参与方反复地在栅栏位置汇聚,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程达到栅栏位置时将调用 await()
方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有栅栏都到达栅栏了位置,那么栅栏将打开,此时所有的线程都被释放,而栅栏将被重置以便下次使用。如果对 await()
方法调用超时,或者线程被中断,那么栅栏就认为是被打破了,所有阻塞 await()
的调用都将终止并抛出 BrokenBarrierException
。如果成功通过栅栏,那么 await()
将为每一个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。 CyclicBarrier
还可以使你将一个栅栏操作传递给构造函数,这个一个 Runnable
,当成功通过栅栏时会(在一个子任务线程中)执行它,但是它在阻塞线程被释放前是不能执行的。使用示例:
/** * @author mghio * @date: 2019-11-03 * @version: 1.0 * @description: 同步工具类 —— CyclicBarrier * @since JDK 1.8 */ public class CyclicBarrieDemo { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); // 创建 CyclicBarrier 对象并设置 3 个公共屏障点 final CyclicBarrier cb = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { Runnable runnable = () -> { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("线程 " + Thread.currentThread().getName() + " 即将到达集合地点1,当前已有 " + cb.getNumberWaiting() + " 个已经到达,正在等候"); // 到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行 cb.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println("线程 " + Thread.currentThread().getName() + " 即将到达集合地点2,当前已有 " + cb.getNumberWaiting() + " 个已经到达,正在等候"); cb.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println("线程 " + Thread.currentThread().getName() + " 即将到达集合地点3,当前已有 " + cb.getNumberWaiting() + " 个已经到达,正在等候"); cb.await(); } catch (Exception e) { e.printStackTrace(); } }; service.execute(runnable); } service.shutdown(); } }
以上代码运行结果:
线程 pool-1-thread-3 即将到达集合地点1,当前已有 0 个已经到达,正在等候 线程 pool-1-thread-1 即将到达集合地点1,当前已有 1 个已经到达,正在等候 线程 pool-1-thread-2 即将到达集合地点1,当前已有 2 个已经到达,正在等候 线程 pool-1-thread-3 即将到达集合地点2,当前已有 0 个已经到达,正在等候 线程 pool-1-thread-2 即将到达集合地点2,当前已有 1 个已经到达,正在等候 线程 pool-1-thread-1 即将到达集合地点2,当前已有 2 个已经到达,正在等候 线程 pool-1-thread-3 即将到达集合地点3,当前已有 0 个已经到达,正在等候 线程 pool-1-thread-2 即将到达集合地点3,当前已有 1 个已经到达,正在等候 线程 pool-1-thread-1 即将到达集合地点3,当前已有 2 个已经到达,正在等候