屏障是一个多线程协调的基本机制。
A barrier is a way of forcing asynchronous threads to act almost as if they were synchronous.
在Java里屏障的实现是 CyclicBarrier
。它和 CountDownLatch
的区别常常被人津津乐道。实际上, CountDownLatch
只能完成一个原子计数器的作用,正如它的名字一样——门栓。不过,你有没有想过,为什么 CountDownLatch
使用的是AQS,而不是一个原子类 AtomicInteger
呢?
但是,屏障应该被允许多次使用,也就是 CyclicBarrier
提供的 reset
。假设线程在使用 CountDownLatch
时,最后一个线程会重置计数,那么此时就算其他等待的线程被 signal
了也没有意义,因为条件不满足。
解决这个问题的方法是使用反向语义(Sense-Reversing)屏障。简单来说,就是给不同的阶段定一个语义,比如这里需要两个值,可以用布尔值,然后每个线程都会持有一个 ThreadLocal
的本地值。这样一来,不需要在计数器上自旋,只需要在sense上自旋即可。反向语义(Sense-Reversing)屏障很适合在缓存一致架构(cache-coherent architectures)上运行。在这种架构下,所有线程都在本地缓存自旋,直到有线程修改sense并广播到总线。不过,它仍然有之前的缺点,所有线程一开始会争用计数器。
CyclicBarrier
采用的就是这种模式。它用 Generation
表示阶段的语义,同时 Generation.broken
表示当前阶段是否结束。比如说 reset
,就分为结束旧阶段和开始新阶段两步:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } 复制代码
减少争用的一种方法是使用类似于 ForkJoinPool
的组合树。但这样做你无法使用一个全局的计数器。每个节点的计数器取决于树的基数,即孩子个数。当所有孩子节点开始等待之后,父节点开始等待。而通知是反向流动的:根发现所有任务已经完成,传播对应的sense给所有子节点直到叶子。
class Node { AtomicInteger count; Node parent; volatile boolean sense; // construct root node public Node() { sense = false; parent = null; count = new AtomicInteger(radix); } public Node(Node parent) { this(); this.parent = parent; } public void await() { boolean mySense = threadSense.get(); int position = count.getAndDecrement(); if (position == 1) { // I'm last if (parent != null) { // root? parent.await(); } count.set(radix); // reset counter sense = mySense; } else { while (sense != mySense) {}; } threadSense.set(!mySense); } } 复制代码
对组合树有一个简单的优化策略:如果是一个二叉树,不需要节点持有计数器,只需要持有另一个节点引用,然后直接修改它的flag。这被叫做竞赛屏障(Tournament Tree Barrier)。
组合树也许可以有效降低争用,但是增加了时延。另外,它的非叶结点都是构造的,不是线程独占的,这可能会影响缓存的效率。解决方法是给每个节点分配一个线程,这叫做静态树屏障。
class Node { final int children; // number of children final Node parent; AtomicInteger childCount; // number of children incomplete public Node(Node parent, int count) { this.children = count; this.childCount = new AtomicInteger(count); this.parent = parent; } public void await() { boolean mySense = threadSense.get(); while (childCount.get() > 0) {}; // spin until children done childCount.set(children); // prepare for next round if (parent != null) { // not root? parent.childDone(); // indicate child subtree completion while (sense != mySense) {}; // wait for global sense to change } else { sense = !sense; // am root: toggle global sense } threadSense.set(!mySense); // toggle sense } public void childDone() { childCount.getAndDecrement(); } } 复制代码
尽管示例中实现为自旋等待,实际中代码还是会选择使用锁——比如 CyclicBarrier
就使用了可重入锁。而屏障确实是一个使用锁的典型场景,因为线程并没有频繁的操作对应的数据结构。