AQS
是什么,相信大家都不陌生这个题目,那么 AQS
到底是什么呢? AQS的全称是 Abstract Queued Synchronizer , 从字面意思理解也就是 抽象队列同步器 ,实际上 AQS
确实就是 排队同步队列 , 也是一个 抽象 类,需要 自定义 同步队列中 可执行权 的 获取和释放中的逻辑(重新定义获取和释放语义) ,也就是重写 tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
等方法,当然也可以 自定义方法 来通过调用 AQS
提供的 判断方法进行逻辑判断 。在 JDK9
之前 AQS
是依赖于 CAS
的,其底层是通过 Unsafe
的 compareAndSwap*
方法实现同步更改,在之后则是使用 VarHandle
, 也替代了 Unsafe
。 说白了 AQS 利用 VarHandle 保证操作的原子性 。
大白话就可以理解为: 表示某件事情同一时间点仅有一人可以进行操作,如有多人则需要排队等待, 等到当前操作人完成后通知下一个人。
在源码中 AbstractQueuedSynchronizer
继承了 AbstractOwnableSynchronizer
, 同时也就继承了 exclusiveOwnerThread
属性,也就是 独占模式同步器的拥有者 ** , 也就意味着 该线程是当前正在执行的线程**。
在 AQS
中有几个重点方法,分别是: acquire
acquireInterruptibly
tryAcquireNanos
release
acquireShared
acquireSharedInterruptibly
tryAcquireSharedNanos
releaseShared
下面逐一分析。
在分析源代码之前,先来看一张图来了解一下 AQS排队同步队列
和 Node
节点中的 waitStatus
状态 。
waitStatus
状态都分为是什么
等待超时
或者 被中断
,需要从同步队列中剔除,节点进入该状态以后不会再发生变化了。 官方解释就是 Acquires in exclusive mode, ignoring interrupts 获取独占模式并忽略interrupt(中断) , 翻译成大白话就是就可以理解为获取 独占模式 , 看一下源码
public final void acquire(int arg) { // 判断线程是否有可继续执行的权限, 如果没有则创建node 加入到队列中 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
在 acquire
方法中分别调用了 tryAcquire
acquireQueued
和 addWaiter
方法,其中 tryAcquire
方法是需要自定义(重写) 获取、 执行权限 的逻辑,这里我们以 AbstractQueuedSynchronizer
的实现 ReentrantLock
为例,简单分析一下,先看 tryAcquire
方法
protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取当前线程的重入次数 如果是 0 则代表第一次 int c = getState(); if (c == 0) { // 判断是否存在队列 && 可以获取到可执行权 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 设置独占模式同步器的拥有者 也就是是哪个线程持有 setExclusiveOwnerThread(current); return true; } } // 如果进入线程是 持有可执行权的线程 则做重入 + 1 操作 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 复制代码
tryAcquire
方法核心代码就是 判断执行权限 ,这里就不具体分析了,会在下一篇文章中进行ReentrantLock的源码分析,接下来重点看 acquireQueued
和 addWaiter
方法。
private Node addWaiter(Node mode) { // 通过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占 Node node = new Node(mode); for (;;) { Node oldTail = tail; // 如果 tail 不是 null 则 设置 新建Node的前驱节点(prev) 指向 tail节点 反之 初始化同步队列 if (oldTail != null) { // 设置 新建Node的前驱节点(prev) 指向 tail节点 node.setPrevRelaxed(oldTail); // 重新设置 tail 节点 指向 新建的Node节点 // 白话就是 队列中的最后一个节点 == tail节点 if (compareAndSetTail(oldTail, node)) { // 设置 未更改时的 tail节点 中 next 节点, 指向 新建Node节点 oldTail.next = node; return node; } } else { // 初始化同步队列器 initializeSyncQueue(); } } } 复制代码
// 如果未存在同步队列 则初始化同步队列 private final void initializeSyncQueue() { Node h; // 设置 AQS head节点 为一个新建节点 if (HEAD.compareAndSet(this, null, (h = new Node()))) // 赋值操作 tail = h; } 复制代码
在 addWaiter
和 initializeSyncQueue
方法中,核心就是新建 Node 节点并通过 acquireQueued
方法将节点加入到 AQS
中,接下来分析一下 addWaiter
具体做了什么
通过构造方法创建新的Node节点,并通过入参 mode
指定Node节点的模式,共享或独占。当然这里是设置的独占模式。
循环操作新建Node节点并将 新建节点 和 tail
节点建立关系。首先判断 tail
是否是null,如果是则 步骤3
,反之 步骤4
如果 tail
节点不为null, 首先将新建的 Node节点
中的 前驱节(prev)
点设置为当前的 tail
节点,然后通过 VarHandle
将 AQS
的 tail
节点改为 新建的Node
节点,如果修改成功则将上一步 未更改时的 tail
节点 (也就是代码中的oldTail) 中的 next
指向 新建的Node节点
,反之则可能因为并发操作导致 tial
节点已经被其他线程变更,需要再次循环操作直至成功。
如果 tial
节点是null, 则需要实例化同步对列,也就是 AQS
, 通过调用 initializeSyncQueue
进行初始化操作,通过 VarHandle
设置 AQS
的 head
指向一个新建节点 (new Node) , 然后将 head
节点的 引用 赋值给 tail
节点。 这里注意一下,是将 head
节点的 引用
赋值给 tail
节点, 也就是这时候 head
节点 和 tail
节点是同时指向一块内存地址 , 这里的用意就是在新建队列的时候, head
节点和新建节点的 prev
节点要保持是同一个引用 ,因为在后续的判断中, 获取可执行权的条件就是 AQS
的 head
节点是否等于当前节点的 prev
节点。
因为 addWaiter
方法中是一个循环,在 创建队列后 需要将队列新建的Node节点做关联,所以还需要在执行一次 步骤3
addWaiter
方法分析完后,再来看一下 acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) { // 线程中断状态 boolean interrupted = false; try { for (;;) { // 获取通过 addWaiter 创建的Node方法 final Node p = node.predecessor(); // 判断 新建的Node节点是否等于head节点 && 可以获取到 可执行权 if (p == head && tryAcquire(arg)) { // 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head // 节点设置为当前线程创建的Node节点,可以保证head节点永远都可以和后续节点有关联关系 setHead(node); // 设置 next p.next = null; // help GC // 返回 return interrupted; } // 判断Node节点的线程是否符合被 wait ,在这里用的是 park if (shouldParkAfterFailedAcquire(p, node)) // 将线程 wait 并且线程被唤醒后 判断线程是否被中断 // |= 操作等于 interrupted = interrupted | parkAndCheckInterrupt() // |(按位或) 会将 | 两边的值进行二进制计算,运算规则一个为真即为真, 例如 1|1 = 1 或 1|0 = 1, interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } 复制代码
acquireQueued
也是核心方法,在其中会对线程进行 LockSupport.park
进行控制,其实现方式是 循环 ,下面就具体分析一下
当前线程
所创建的 Node
节点中的 前置节点(prev)
。 前置节点(prev)
是否 等于 AQS
的 head
节点 && 可以获取到 可执行权
,如果这两个条件成立则看 步骤3
,反之看 步骤4
, 如果满足这两个条件,也就代表着 head
节点 所对应的线程
已经执行完成并且做了释放**(release方法)**操作。 步骤2
条件成立,也就是 线程被唤醒后并获取到了可执行权 ,则将 head
节点设置为 当前线程创建的Node节点 。 步骤2
条件不成立,则判断 Node
节点所对应的线程的状态是否符合改为 wait
状态。这个逻辑在 shouldParkAfterFailedAcquire
方法中, 接下来看一下。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 如果前置节点的 waitStatus == Signal 也就是 == -1 // 如果是 满足 线程 wait 条件 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // 如果状态 > 0 也就是1 也就是线程已经被中断了 // 在这里就会判断 前置节点的前置节点 是否还是被中断,如果是 循环继续判断前置节点, // 如果不是 则将前置节点的next节点改为 入参的 node 节点 然后 返回false 继续循环判断 // 这里的作用就是 排除掉已经被中断的线程 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 否则设置 状态为 -1 等待唤醒状态 再次进来以后就会被 wait pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; } 复制代码
在 shouldParkAfterFailedAcquire
方法中主要就是判断节点所属的线程是否符规则,也就是更改为 wait
状态
waitStatus
是否是 SIGNAL,如果是满足条件,返回 true
,线程将会 wait
。 waitStatus
是否大于 0, 也就是1 ,如果条件成立,则代表 当前线程节点 的 前置节点 所对应的线程已经被中断了,需要重新指定当前线程节点的前置节点(prev),通过循环的方式找到前置节点的节点,如果依然被中断,则继续循环,直到找到未中断线程所对应的Node节点为止。如果条件不成立则将 waitStatus
状态改为 SIGNAL
返回false, 再通过 acquireQueued
方法中的循环在执行一次 。 prev
节点中的 waitStatus
状态,是因为只有 前置节点(prev)
的 waitStatus
等于 SIGNAL 也就是 -1
时, 就代表当前线程新建的Node节点的线程处于等待状态 , 在当前节点的前置节点(prev)
的线程释放了同步状态或被取消,将会通知当前节点,使当前节点的线程得以运行 到这里我们整个的 acquire
方法就解析完了,接下来分享 release,有获取才有释放,会在release讲完后为大家分享一下 acquire 到 release的整个流程。
release
字面一次就是释放,释放通过 acquire
获取的独占模式,可以让 AQS
后续节点所对应的线程可以得到执行权,下面就看一下 release 方法
public final boolean release(int arg) { // 步骤 1 2 3 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
在 release
方法中首先会调用 tryRelease
方法,这里 tryRelease
方法将会有子类实现,先以 RenntrantLook
为例,这里就不展示代码了,就简单描述一下逻辑
state
减去 arg
, state
代表重入次数。 步骤1
结果是0,则将 独占模式同步器的拥有者 改为null并返回true。 步骤1
结果不是0, 则重新设置 state,返回false,表示还不可以释放。 接下来判断 AQS
的 head
节点不是null并且 waitStatus
状态不等于0,代表 释放成功 ,然后进入 unparkSuccessor 方法,进行对下一个Node节点所对应的线程进行唤醒操作。
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 如果head的waitStatus<0 则将head的waitStatus改为0 int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 如果 head 节点的 next 节点 == null 或者 节点 的状态 大于0 也就是1 也就是 下一个节点所对应的线程被中断了 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 将会循环整个同步队列,从tail节点开始 往前循环,直到只找到 waitStatus <= 0 的Node节点 for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } // 如果节点不是 null 则唤醒该节点的线程 if (s != null) LockSupport.unpark(s.thread); } 复制代码
接下来分析一下在 unparkSuccessor 方法中都做了什么
waitStatus
<0, 则将 head
的 waitStatus
改为0。 head
节点的 next
节点等于null 或者 waitStatus
状态 大于0也就是1, 表示 head
节点所对应的 next
节点所对应的线程已经被中断了,将会循环整个同步队列,从 tail
节点开始往前循环,直到找到 最前面的一个 waitStatus <= 0 的Node节点 。 步骤2
条件不满足 则代表 head 的 next 节点不是null 或 waitStatus状态不等于1,调用 unpark
方法唤醒线程。 至此 release
方法就解析完成了,很简单,核心功能仅仅是如果符合规则,则调用 unpark
方法唤醒 AQS
队列中下一个节点所对应的线程。下面就分析一下 acquire 和 releae 整个流程。
分析一次 acquire
和 release
的整体流程
接下来分析 acquireInterruptibly
方法, acquireInterruptibly
方法和 acquire
其实是一样的,只不过多判断了一下是否被中断。
acquireInterruptibly
方法就是 可中断的获取可执行权 ,具体流程和 acquire
相似。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } 复制代码
在 acquireInterruptibly
方法中,首先会通过 Thread.interrupted()
方法判断线程是否被中断,如已经被中断,则抛出 InterruptedException
, 反之则调用 tryAcquire
方法,判断是否 获取到执行权 ,如果未获取到则调用 doAcquireInterruptibly
方法进行创建 AQS
和 新的Node节点
,并将 新建的Node节点
和 AQS
的 head
节点进行关联。 到这里可能就会想到,这不是和 acquire
方法是一样的嘛,没错,就是一样。看一下源码
private void doAcquireInterruptibly(int arg) throws InterruptedException { // 新建 Node 节点 并将节点 和 AQS 队列简历关联 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } } 复制代码
看到源码是不是很熟悉,这不就是上边我们分析过的 acquire
方法嘛,唯一和 acquire
方法不同的就是,如果线程在被唤醒以后,也就是 head
节点的线程调用了 release
释放了可执行权,并且通过 LockSupport.park
方法唤醒了 head 的 next节点所属的线程时, head
的 next
节点所属的线程已经被中断了就会抛出 InterruptedException
异常。
这里就不进行 addWaiter 方法 和 parkAndCheckInterrupt 方法的源码展示了,如果还不明白就看一下上边 acquire
方法的源码分析。
tryAcquireNanos
方法的含义就是 可超时的获取执行权 ,如果设置的 超时时间 到了,还未获取到可执行权,则直接返回 false 。这里的超时时间单位是 纳秒 ns
, 1秒(s)=1000000000纳秒(ns)
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } 复制代码
看到 tryAcquireNanos
方法会想到什么? 看到方法上的 throws InterruptedException
就一下想到了上面刚刚刚说的 acquireInterruptibly
方法, 支持可中断的获取执行权 。首先这里会先调用 tryAcquire
方法获取执行权,如果可以获取到执行权则直接返回,反之则调用 doAcquireNanos(arg, nanosTimeout)
方法进行 新建 Node 节点
并和 AQS
的 head
节点进行关联, 并且会将节点加入到 AQS的队列中 , 然后将节点所属的线程放入等待队列中 。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 判断超时时间 是否小于等于 0 如果这 则直接返回 false if (nanosTimeout <= 0L) return false; // 使用当前时间的 纳秒 + 超时时间的纳秒 = 未来超时的超时时间,用来做parkNanos, // 就相当于 Object.wait(long timeoutMillis) 方法 final long deadline = System.nanoTime() + nanosTimeout; //通过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { // 判断 新建的Node节点是否等于head节点 && 可以获取到'可执行权' final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head // 节点设置为当前线程创建的Node节点,可以保证head节点永远都可以和后续节点有关联关系 setHead(node); p.next = null; // help GC return true; } // 判断计算过的 deadline 时间 - 当前时间 是否小于0 是则 超时时间已过,返回false nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 判断Node节点的线程是否符合被 wait ,在这里用的是 park 并且 纳秒必须大于 1000 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); // 判断线程是否被中断 如果中断则抛出 InterruptedException 异常 if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } } 复制代码
如果在 acquire
和 release
分析中理解其中原理是不是觉得这里很简单,这里也不列举已经分析过的方法了,直接说出不同点
LockSupport.parkNanos(this, nanosTimeout)
方法,也就相当于 Object.wait(long timeoutMillis)
方法,等待的线程的状态会在超时时间失效从 wait
变为 run
deadline时间
- 当前时间
是否小于0, 若果是则代表超时时间已过,直接返回false,反之则继续执行。 InterruptedException
异常。 是不是很简单,读者要把重点放到 acquire
和 release
上,其他的就很容易了。上面的内容均是获取的独占模式,下面来讲解一下 共享模式。
public final void acquireShared(int arg) { // 通过调用 tryAcquireShared 方法获取可执行权,如果未获取到则调用 doAcquireShared // 方法进行 新建 Node节点 并和 AQS的 head 节点建立关系, if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 复制代码
通过 acquireShared
方法可以看到和 acquire
并没有什么区别,获取可执行权的代码需要 自定义同步器
实现,在共享模式分析中就不对 ReentrantReadWriteLock
源码进行分析了 ,会在后面对 ReentrantLock
和 ReentrantReadWriteLock
进行源码分析,接下来看一下 doAcquireShared
,看它是不是和 acquireQueued
方法也是一样的逻辑呢?
private void doAcquireShared(int arg) { // 获取通过 addWaiter 创建的Node方法 final Node node = addWaiter(Node.SHARED); boolean interrupted = false; try { for (;;) { // 获取新建节点的前置节点 final Node p = node.predecessor(); // 判断 新建的Node节点是否等于head节点 if (p == head) { // 如果上边的 p==head 需要在此判断是否可以获取到'可执行权' int r = tryAcquireShared(arg); if (r >= 0) { // 如果获取到了可执行权 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 判断Node节点的线程是否符合被 wait ,在这里用的是 park if (shouldParkAfterFailedAcquire(p, node)) // 将线程 wait 并且线程被唤醒后 判断线程是否被中断 interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); } } 复制代码
在 doAcquireShared
方法中,我们看到,首先依然是调用 addWaiter
方法进行新建Node,这里就不多说,可以看一下上边的方法详解, doAcquireShared
也是核心方法,在其中会对线程进行 LockSupport.park
进行控制,其实现方式是 循环 ,下面就具体分析一下
当前线程
所创建的 Node
节点中的 前置节点(prev)
。 前置节点(prev)
是否 等于 AQS
的 head
节点,如果条件成立则看 步骤3
,反之看 步骤4
, 如果满足条件,也就代表着 head
节点 所对应的线程
已经执行完成并且做了释放**(release方法)**操作。 步骤2
条件成立,则再次判断 当前线程是否可以获取到可执行权 ,如果可以则设置 AQS
的 head
节点为当前线程的 新建的Node节点
, 反之则看 步骤3
。 步骤2
或 步骤3
条件不成立,则判断 Node
节点所对应的线程的状态是否符合改为 wait
状态,也就是是否可以加入到等待队列中。这个逻辑在 shouldParkAfterFailedAcquire
方法中,可以看一下上边的方法详解。 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 复制代码
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 获取通过 addWaiter 创建的Node方法 final Node node = addWaiter(Node.SHARED); try { for (;;) { // 获取新建节点的前置节点 final Node p = node.predecessor(); // 判断 新建的Node节点是否等于head节点 if (p == head) { // 如果获取到了可执行权 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 判断Node节点的线程是否符合被 wait && 将线程 wait 并且线程被唤醒后判断线程是否被中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } } 复制代码
可以看到 acquireSharedInterruptibly
和 acquireShared
方法并没有什么太大区别,唯一的区别就是在调用 parkAndCheckInterrupt
线程状态被 wait ,等到当前节点 prev
节点的所属线程调用了 release
方法后,唤醒当前节点所属线程时,如果当前线程被中断了会抛出 InterruptedException
异常。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } 复制代码
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { // 如果超时时间 小于等于0 则直接 返回失败 if (nanosTimeout <= 0L) return false; // 使用当前时间的 纳秒 + 超时时间的纳秒 = 未来超时的超时时间,用来做parkNanos, // 就相当于 Object.wait(long timeoutMillis) 方法 final long deadline = System.nanoTime() + nanosTimeout; //通过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占。这里是共享 final Node node = addWaiter(Node.SHARED); try { for (;;) { // 判断 新建的Node节点是否等于head节点 final Node p = node.predecessor(); if (p == head) { // 是否可以获得可执行权 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return true; } } // 判断计算过的 deadline 时间 - 当前时间 是否小于或等于0 是则超时时间已过,返回false nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 判断Node节点的线程是否符合被 wait ,在这里用的是 park 并且 纳秒必须大于 1000 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); // // 判断线程是否被中断 如果中断则抛出 InterruptedException 异常 if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } } 复制代码
有没有发现, doAcquireSharedNanos
方法和 doAcquireNanos
方法很相似呢,如果在 acquireShared
分析中理解其原理是不是觉得这里很简单,这里也不列举已经分析过的方法了,直接说出不同点
LockSupport.parkNanos(this, nanosTimeout)
方法,也就相当于 Object.wait(long timeoutMillis)
方法,等待的线程的状态会在超时时间失效从 wait
变为 run
deadline时间
- 当前时间
是否小于0, 若果是则代表超时时间已过,直接返回false,反之则继续执行。 InterruptedException
异常。 是不是很简单,读者要把重点放到 acquire
和 acquireShared
上,其他的就很容易了。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 复制代码
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 如果更新失败则循环 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒 head 节点的 next 节点所属的线程 unparkSuccessor(h); } // 如果更新失败则循环 else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果 head 改变了则再次循环 if (h == head) // loop if head changed break; } } 复制代码
tryReleaseShared 方法和 release 方法稍微有一点区别,下面我们就具体分析一下
tryReleaseShared
方法,如果释放成功了,就代表有资源空闲出来,那么就看 步骤2 doReleaseShared
去唤醒后续结点, 在 doReleaseShared
方法中采用了 loop
,每一次循环的过程都是首先获得 head
节点,如果 head
结点不为空且不等于 tail
结点,那么先获得该节点的状态,如果是SIGNAL的状态,则代表它需要有后继结点去唤醒,首先将其的状态变为0(初始状态),然后通过 unparkSuccessor
方法唤醒后续节点所属的线程,如果结点状态一开始就是0,那么就给他转换成 PROPAGATE 状态,保证在后续获取资源的时候,还能够向后面传播。 至此我们已经分析完了 AbstractQueuedSynchronizer 的源码,是不是很简单呢?最主要的还是要理解AQS的整体流程,说白了AQS是依赖两大利器,也就是 VarHandle 和 LockSupport。
博客地址:lantaoblog.site