public class CyclicBarrier 复制代码
//独占锁 private final ReentrantLock lock = new ReentrantLock(); //等待的条件 private final Condition trip = lock.newCondition(); //线程等待的数量,重置count private final int parties; //被唤醒时,优先执行的任务 private final Runnable barrierCommand; //描述更新换代,重置? //Generation中的broken表示这一次是否完成,默认false。count为0时设置为true private Generation generation = new Generation(); //记录当前需要等待到来的线程数,等于0表示到下一代,通过parties来重置 private int count; 复制代码
从字段属性可以看出
//传入线程数量 public CyclicBarrier(int parties) { //调用下面的构造方法 this(parties, null); } //传入线程数量和要处理的业务 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } 复制代码
从构造方法可以看出
//设置为新生代,唤醒线程并重置状态 private void nextGeneration() { //唤醒所有等待condition的线程 trip.signalAll(); //重置count count = parties; //重置generation,新生代 generation = new Generation(); } 复制代码
//设置当前代为完成状态 private void breakBarrier() { //设置当前代为完成状态 generation.broken = true; //重置count count = parties; //唤醒所有等待condition的线程 trip.signalAll(); } 复制代码
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //加锁 final ReentrantLock lock = this.lock; lock.lock(); try { //获取当前代 final Generation g = generation; if (g.broken) //如果当前代已经处于完成状态,抛出异常 throw new BrokenBarrierException(); if (Thread.interrupted()) { //如果当前线程已经被中断 //设置当前代为完成状态 breakBarrier(); //抛出异常 throw new InterruptedException(); } //count减1 int index = --count; if (index == 0) { // tripped //如果count为0,表示所有线程已经就绪 boolean ranAction = false; try { //先运行优先执行的任务 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //设置为新生代,唤醒线程并重置状态 nextGeneration(); return 0; } finally { if (!ranAction) //运行优先执行的任务抛出异常的情况 //设置当前代为完成状态 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //未设置超时 //直接进入条件等待 trip.await(); else if (nanos > 0L) //设置超时,进入有超时时间的条件等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //抛出异常 if (g == generation && ! g.broken) { //当前代不变且为完成 //设置当前代为完成状态 breakBarrier(); throw ie; } else { //当前代已经改变,中断当前线程 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { //超时 //设置当前代为完成状态 breakBarrier(); throw new TimeoutException(); } } } finally { //释放锁资源 lock.unlock(); } } 复制代码
//获取设置等待的线程数 public int getParties() { return parties; } 复制代码
//进入条件等待 public int await() throws InterruptedException, BrokenBarrierException { try { //调用dowait方法 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //进入有超时时间的条件等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { //调用dowait方法 return dowait(true, unit.toNanos(timeout)); } 复制代码
//获取当前代状态 public boolean isBroken() { //上锁 final ReentrantLock lock = this.lock; lock.lock(); try { //返回当前代状态 return generation.broken; } finally { //释放锁资源 lock.unlock(); } } 复制代码
//重置当前代状态 public void reset() { //上锁 final ReentrantLock lock = this.lock; lock.lock(); try { //设置当前代为完成状态 breakBarrier(); // break the current generation //设置为新生代代,唤醒线程并重置状态 nextGeneration(); // start a new generation } finally { //释放锁 lock.unlock(); } } 复制代码
//获取等待中的线程数 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { //parties 需要的线程总数 //count 还需要等待到来的线程数 return parties - count; } finally { lock.unlock(); } } 复制代码