转载

java并发编程 -CountDownLatch和CyclicBarrier在内部实现和场景上的区别

前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。

内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。

 public class  {
     //Synchronization control For CountDownLatch.
     //Uses AQS state to represent count.
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;
    ... ...//
 }
 public class CyclicBarrier {
    /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which {@code count} applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
     */
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    ... ... //
 }

实战 - 展示各自的使用场景

/**
 *类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
 */
public class UseCountDownLatch {
   
    static CountDownLatch latch = new CountDownLatch(6);

    /*初始化线程*/
    private static class InitThread implements Runnable{

        public void run() {
           System.out.println("Thread_"+Thread.currentThread().getId()
                 +" ready init work......");
            latch.countDown();
            for(int i =0;i<2;i++) {
               System.out.println("Thread_"+Thread.currentThread().getId()
                     +" ........continue do its work");
            }
        }
    }

    /*业务线程等待latch的计数器为0完成*/
    private static class BusiThread implements Runnable{

        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(int i =0;i<3;i++) {
               System.out.println("BusiThread_"+Thread.currentThread().getId()
                     +" do business-----");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            public void run() {
               SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                     +" ready init work step 1st......");
                latch.countDown();
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                     +" ready init work step 2nd......");
                latch.countDown();
            }
        }).start();
        new Thread(new BusiThread()).start();
        for(int i=0;i<=3;i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }
        latch.await();
        System.out.println("Main do ites work........");
    }
}
/**
 *类说明:共4个子线程,他们全部完成工作后,交出自己结果,
 *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
 */
class UseCyclicBarrier {
    private static CyclicBarrier barrier
            = new CyclicBarrier(4,new CollectThread());

    //存放子线程工作结果的容器
    private static ConcurrentHashMap<String,Long> resultMap
            = new ConcurrentHashMap<String,Long>();

    public static void main(String[] args) {
        for(int i=0;i<4;i++){
            Thread thread = new Thread(new SubThread());
            thread.start();
        }

    }

    /*汇总的任务*/
    private static class CollectThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
               result.append("["+workResult.getValue()+"]");
            }
            System.out.println(" the result = "+ result);
            System.out.println("do other business........");
        }
    }

    /*相互等待的子线程*/
    private static class SubThread implements Runnable{
        @Override
        public void run() {
           long id = Thread.currentThread().getId();
            resultMap.put(Thread.currentThread().getId()+"",id);
            try {
                   Thread.sleep(1000+id);
                   System.out.println("Thread_"+id+" ....do something ");
                barrier.await();
               Thread.sleep(1000+id);
                System.out.println("Thread_"+id+" ....do its business ");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

两者总结

1.  Cyclicbarrier 结果汇总的 Runable 线程可以重复被执行,通过多次触发 await() 方法, countdownlatch 可以调用 a wait() 方法多次; cyclicbarrier 若没有结果汇总,则调用一次 await() 就够了;

2.  New cyclicbarrier(threadCount) 的线程数必须与实际的用户线程数一致;

3.  协调线程同时运行: countDownLatch 协调工作线程执行,是由外面线程协调 ;cyclicbarrier 是由工作线程之间相互协调运行;

4.  从构造函数上看出: countDownlatch 控制运行的计数器数量和线程数没有关系; cyclicbarrier 构造中传入的线程数等于实际执行线程数 ;

5.  countDownLatch 在不能基于执行子线程的运行结果做处理,而 cyclicbarrier 可以 ;

6.     就使用场景而言,countdownlatch 更适用于 框架加载前的一系列初始化工作等场景; cyclicbarrier 更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;

原文  https://blog.51cto.com/14815984/2496009
正文到此结束
Loading...