J.U.C包中提供了一些非常有用的工具类。在合适的场景下使用它们往往能够达到事半功倍的效果。比如Atomic工具类、Exchanger、CountDownLatch、CyclicBarrier、Semaphore这些。
Atomic工具类能够实现原子操作数据。从数据类型的角度来看,可以分为:基本数据类型、数组、引用类型、引用类型属性的原子更新操作。它的底层原理其实就是对于Unsafe类和volatile的封装。
像AtomicInteger、AtomicLong等内部源码都是差不多的。值得注意的是像AtomicBoolean/AtomicChar等的实现原理其实是内部将boolean、char、byte、等数据类型向上转型为了int类型,本质上和AtomicInteger差别不大。
Exchanger可以交换线程间的数据,是一个线程间协作工作的工具类。它提供了一个同步点,用于线程间交换数据。Exchanger只能交换2个线程间的数据。理应如此:生活中,不论是交换利益、交换物品,这些行为都是发生在两者之间。
在实际的场景中,Exchanger十分适用于两个任务之间有数据上的相互依赖的场景。比如交易系统中的对账功能。我们以一个实际例子来看一看Exchanger的使用:
public class ExchangerTest { public static final Exchanger<Integer> transExchanger = new Exchanger<>(); public static void main(String[] args) { Thread th1 = new Thread(() -> { // 伪代码 Integer transA = 1000; try { Integer transFromB = transExchanger.exchange(transA); System.out.println("我是系统A:我系统中统计的交易额为:" + transA + " 我在交易系统中产生的交易额为:" + transFromB); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread th2 = new Thread(() -> { // 伪代码 Integer transB = 1001; try { Integer transFromA = transExchanger.exchange(transB); System.out.println("我是交易系统:系统A统计出的交易额为:" + transB + " 系统A实际在我这里产生的交易额为:" + transFromA); } catch (InterruptedException e) { e.printStackTrace(); } }); th1.start(); th2.start(); } } 复制代码
执行结果如下:
我是交易系统:系统A统计出的交易额为:1001 系统A实际在我这里产生的交易额为:1000
我是系统A:我系统中统计的交易额为:1000 我在交易系统中产生的交易额为:1001
CountDownLatch的作用是用于多线程间同步完成任务。值得注意的是它只能使用一次,原因在于CountDownLatch设置锁资源之后,只提供了countDown()方法来释放锁资源,和await()方法来等待锁资源释放完毕。并没有重置锁资源的功能。如果我们理解AQS源码的话,那么读源码和对于CountDownLatch的使用就再简单不过了。
public class CountDownLatchTest { public static final CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) { Thread th1 = new Thread(() -> { System.out.println("th1 run."); countDownLatch.countDown(); }); Thread th2 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("th2 run."); countDownLatch.countDown(); }); Thread th3 = new Thread(() -> { System.out.println("th3 waiting until count down 0."); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("th3 last to run."); }); th1.start(); th2.start(); th3.start(); } } 复制代码
执行结果:
th1 run. th3 waiting until count down 0. th2 run. th3 last to run.
由于CountDownLatch只能使用一次,因此它适用于那些一次性任务。比如一些框架的初始化。
CyclicBarrier通常被翻译为回栏栅。它的功能其实是类似于CountDownLatch。不同之处在于:
我们分析一个例子如下:
public class CyclicBarrierTest { // 回栏珊,注册一个额外的线程 public static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> { System.out.println("令牌已经为空了,准备唤醒等待中的线程."); }); public static void main(String[] args) { // 创建一个固定大小的线程池 ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 4; i++) { executorService.submit(() -> { try { String thName = Thread.currentThread().getName(); System.out.println(thName + " 执行完毕,等待通知..."); cyclicBarrier.await(); System.out.println(thName + " 收到通知."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { // ignore } }); } executorService.shutdown(); } } 复制代码
执行结果如下:
网络上有很多针对于CyclicBarrier和CountDownLatch的区别的文章,说法总体相近。其实这二者最大的区别就在于前者支持重置资源,而后置不支持。前者提供的功能更丰富。因此前者更加适用于复杂的业务场景。如果Java并发的基础牢固,那么阅读具体的源码也是比较简单的!
Semaphore翻译过来是信号量。它的功能主要是控制并发线程数,特别适用于流量的控制,尤其是那些比较珍贵的公共资源。比如:数据库的连接,IO操作等。
我们举个例子如下:
public class SemaphoreTest { public static void main(String[] args) { Semaphore semaphore = new Semaphore(1); for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "获取令牌成功"); Thread.sleep(100); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } } } 复制代码
Semaphore提供了acquire()方法来获取令牌,使用release来释放令牌。它也是基于AQS实现的。
本篇文章并没有过多的去解读源码层面。截止到本篇文章,其实不难发现,如果掌握了Java并发的基本思想和底层基础知识后,对于应用层面的解读阅读大多数其实是比较简单的。这也许就是 知其所以然 的好处罢!