转载

Java AQS 的胡言乱语

前言

适合读者:3 年以上经验的同学

谈到并发编程,基本上都会想到JDK 的 JUC 工具包,它包含 锁,并发工具类,原子类,线程池,还有阻塞队列,这是从网上找的一个大致的知识体系。

Java AQS 的胡言乱语

相信这些工具读者都见过并使用过一部分了,比如 CountDownLatch,线程池,原子类,但是可能不了解其中的原理,而面试可能要求更高一点,要求说出其原理,或者经常有这么一问,如果是你,你会怎么去实现。

本文主要讲 AQS 的实现,需要你有如下基础

  • 队列 使用双向链表实现,添加节点和删除节点的操作
  • 在多线程并发情况下,双向链表的添加或删除节点会有线程安全问题,导致死循环或节点为空,要求能想到这种场景
  • 设计模式,主要是模板方法模式,这个简单
  • volatile 关键字的理解,主要是变量在主存和线程副本之间立即可见,它并不能保证原子性,如果想深究,需要看到 JVM 内存模型
  • 线程的六种状态,主要是要区别 waiting 和 timed-waiting , sleep 方法和 wait 和 Condition 方法的 await 都是使线程进入 timed-waiting ,UNSAFE.park(thread) 是调用本地方法,不需要获取锁即可使线程进入 waiting 状态
  • CAS 全称 compareAndSwap ,它的参数一般是 (对象,比较的属性,预期值,目标值),意思是对这个对象的这个属性做变更操作,如果预期值和内存值一致,则将内存值修改为目标值,它是一条 cpu 原语,能保证原子性
  • 中断 (interrupt) 的意思,它只会中断阻塞,即退出阻塞状态,并不会中断线程的运行,可能我们经常的被它抛出的 InterruptException 给误解了。
  • 最好有使用过 CountDownLatch ReentrantLock 或 AtomicXX 之类的类

主要内容

本文参考自 https://www.cnblogs.com/waterystone/p/4920797.html 个人不太喜欢贴源码的风格,把一个很简单的东西说得很复杂了。

AQS是AbstractQueuedSynchronizer的简称,AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,如下图所示。

就是多个线程对共享资源 state 的获取与释放,AQS 在顶层维护了一个线程队列,它实现了线程的出队,入队,唤醒,和节点状态的维护,使用模板方法模式,为子类提供了一些有用的方法,子类只要实现资源的获取与释放即可。

它有两对方法分别用于独占资源和共享资源 acquire-release,acquireShared-releaseShared

下面 -- 标记的都是方法名,可以自己去源码中查看,如果当前方法不清楚含义可以来这里看

下面的流程一定要对照源码 AbstractQueuedSynchronizer 来查看,不然可能不知道我在说什么,这里说的是独占模式和共享模式资源的获取和释放,这个框架只会来管你有几个资源,不会管你资源的其它属性。

获取独占资源的流程是这样子的 -- acquire

  1. 先尝试获取资源,如果获取到资源了,直接返回 -- tryAcquire
  2. 如果未获取到资源,则创建一个节点添加到队列尾 -- addWaiter
  3. 以自旋的方式判断是否排队排到了第一个,但虽然你排到了第一个,也要由子类的 tryAcquire 来决定你能不能获取资源 -- acquireQueued
  4. 如果不能获取资源,则先让前面的弄完了通知我一下 -- shouldParkAfterFailedAcquire,我先休息下(这时候线程阻塞处于 waiting 状态) -- parkAndCheckInterrupt

释放独占资源的流程是这样子的 -- release

  1. 先尝试释放资源 -- tryRelease
  2. 如果释放成功,从队列后面往前找,找到队列最前面那个正在等待的家伙,叫醒它; 这里如果认真看了一定会有一个疑问,为什么要从队列后面找,从前面找不是更快吗,文章末尾给出答案 ,这里你理解流程和使用 -- unparkSuccessor

tryAcquireShared 的含义是,如果返回的结果小于 0 ,表示没有可用资源,如果等于 0 表示最后一个可用资源,如果大于 0 ,表示获取成功并且还有可用资源

获取共享资源流程是这样子的 -- acquireShared

  1. 先尝试获取共享资源,如果成功直接返回 -- tryAcquireShared
  2. 如果获取失败,以共享模式添加到队尾 -- addWaiter
  3. 以自旋的方式判断是否排队排到了第一个,但虽然你排到了第一个,也要由子类的 tryAcquireShared 来决定你能不能获取资源 -- doAcquireShared
  4. 如果获取到资源了,但是资源还有剩余,叫醒后面的弟兄 -- setHeadAndPropagate
  5. 如果不能获取资源,则先让前面的弄完了通知我一下 -- shouldParkAfterFailedAcquire,我先休息下(这时候线程阻塞处于 waiting 状态) -- parkAndCheckInterrupt

释放共享资源流程

  1. 先尝试释放资源 -- tryReleaseShared
  2. 如果释放成功,从队列后面往前找,找到队列最前面那个正在等待的家伙,叫醒它 --doReleaseShared

节点状态 waitStatus

​ 这里我们说下Node。Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

注意:负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。

在独占模式中只需要关注 3 个状态 > 0 的为取消,0 是默认就是这个状态 < 0 是可唤醒后继节点

学习这个不能只看这个队列,因为它是抽象出来的一个框架,需要结合具体的使用示例,如 CountDownLatch ,去看它是如何工作的。

CountDownLatch 构造了一个有 n 个资源的同步队列,使用共享模式,因为会有多个线程同时访问资源,每当 countDown 的时候,去释放了一个资源,然后在主线程 await 的时候去获取一个共享资源,但这个获取资源是只有资源量为 0 的时候才是成功的,核心代码就一句话。

protected int tryAcquireShared(int acquires) {
 return (getState() == 0) ? 1 : -1;
}

再看看独占锁的使用 ReentrantLock ,它使用独占资源,同一时刻只能有一个线程访问共享资源,增加了一个属性 OwnerThread ,当再次是同一个线程获取锁的时候是可重入的,有公平锁和非公平锁的实现,非公平锁在进行 lock 的时候 ,会先去试图将 state 设置为 1 ,预期为 0 ,如果成功,则成功插队,公平锁在 tryAcquire 增加了一个判断 hasQueuedPredecessors 如果有线程在排队就乖乖排队吧,并且没有在 lock 的时候尝试去占用资源 。

这里解释下在释放资源的时候 unparkSuccessor 方法为什么要从队尾开始找,从前面开始找不是更快吗

要理解这个,你首先得理解双向队列是如何插入节点的,假设有这样一个队列

head = A <---> B = tail

需要往队列尾加入节点 C ,应该是这样操作的

C.prev = tail;
tail.next = C;
tail = C;

在多线程并发情况下,这里多步操作并不是原子性的,tail 属于临界资源随时可能被其它线程修改成其它指向。

假设这种场景 C 在入队的时候 ,进行 tail.next =C 这一步前 ,D 也在入队,如果 C 先完成,这时队列结构会变成这样子

head = A <---> B <---> C

​ B <---> D = tail

AQS 使用了自旋和 CAS 来解决这个麻烦 ,它的代码是这样子的,在 enq 函数中

for (;;) {
     Node t = tail;
     if (t == null) { // Must initialize
         if (compareAndSetHead(new Node()))
             tail = head;
     } else {
         node.prev = t;
         if (compareAndSetTail(t, node)) {
             t.next = node;
             return t;
         }
     }
 }

这个函数的主要作用是当那[一瞬间]在队列为空的时候进入这个方法创建队列,有可能在正想初始化队列的时候已经被其它线程初始化了,所以初始化时使用了 CAS compareAndSetHead(),然后再把 tail 的指向和 head 的指向一致,这里因为另一个线程不能初始化成功,所以这里来说是正确的。

再看 else ,当其它线程已经初始化完成 ,tail 不为空的时候会进入这里,这个 t 只是表示这一瞬间的尾结点,尾节点可能被其它线程给修改了,使用 CAS compareAndSetTail 保证了只有当尾节点是这个瞬时尾结点的时候才能修改 node.prev=t ,保证了节点的前置结点一定是正确的,但后置节点就不一定了,如果在 t.next=node 赋值之前 tail 被修改,则队列会很可能变成这种样子

假设并发加入了节点 C,D,E

head = A <---> B <---> C <--- D <--- E = tail

​ B ---> D C ---> E

从此图可以看出,最终于,从后往前找链表是连续的,但从前往后找是不一定的,回答了最开始的问题。

写得有点晦涩,我也是想以最通俗易懂的方式写出来,但表达水平有限,望大神指正。

一点小推广

创作不易,希望可以支持下我的开源软件,及我的小工具,欢迎来 gitee 点星,fork ,提 bug 。

Excel 通用导入导出,支持 Excel 公式

博客地址: https://blog.csdn.net/sanri1993/article/details/100601578

gitee: https://gitee.com/sanri/sanri-excel-poi

使用模板代码 ,从数据库生成代码 ,及一些项目中经常可以用到的小工具

博客地址: https://blog.csdn.net/sanri1993/article/details/98664034

gitee: https://gitee.com/sanri/sanri-tools-maven

原文  http://www.cnblogs.com/sanri1993/p/12093697.html
正文到此结束
Loading...