转载

JDK源码分析-AbstractQueuedSynchronizer(3)

概述

前文「 JDK源码分析-AbstractQueuedSynchronizer(2) 」分析了 AQS 在独占模式下获取资源的流程,本文分析共享模式下的相关操作。

其实二者的操作大部分是类似的,理解了前面对独占模式的分析,再分析共享模式就相对容易了。

共享模式

方法概述

与独占模式类似,共享模式下也有与之类似的相应操作,分别如下:

1. acquireShared(int arg):  以共享模式获取资源,忽略中断;

2. acquire Shared Interruptibly(int arg):  以共享模式获取资源,响应中断;

3. tryAcquire Shared Nanos(int arg, long nanosTimeout):  以共享模式获取资源, 响应中断,且有超时等待;

4. release Shared (int arg):  释放资源,唤醒后继节点,并确保传播

它们的操作与独占模式也比较类似,下面具体分析。

方法分析

1. 共享模式获取资源(忽略中断)

acquireShared:


 

public final void acquireShared(int arg) {

// 返回值小于 0,表示获取失败

if (tryAcquireShared(arg) < 0)

doAcquireShared(arg);

}


// 尝试以共享模式获取资源(返回值为 int 类型)

protected int tryAcquireShared(int arg) {

throw new UnsupportedOperationException();

}

与独占模式的 tryAcquire 方法 类似,tryAcquireShared 方法在 AQS 中也抛出异常,由子类实现其逻辑。

不同的地方在于, tryAcquire 方法的返回结果是 boolean 类型,表示获取成功与否;而  tryAcquireShared 的返回结果是 int 类型,分别为:

1) 负数:表示获取失败;

2) 0:表示获取成功,但后续共享模式的获取会失败;

3) 正数:表示获取成功,后续共享模式的获取可能会成功(需要进行检测)。

tryAcquireShared 获取成功,则直接返回;否则执行  doAcquireShared 方法:


 

private void doAcquireShared(int arg) {

// 把当前线程封装成共享模式的 Node 节点,插入主队列末尾

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

// 中断标志位

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

// 若前驱节点为头节点,则尝试获取资源

if (p == head) {

int r = tryAcquireShared(arg);

// 这里表示当前线程成功获取到了资源

if (r >= 0) {

// 设置头节点,并传播状态(注意这里与独占模式不同)

setHeadAndPropagate(node, r);

p.next = null; // help GC

if (interrupted)

selfInterrupt();

failed = false;

return;

}

}

// 是否应该休眠(与独占模式相同,不再赘述)

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

// 取消操作(与独占模式相同)

cancelAcquire(node);

}

}

doAcquireShared  方法会把当前线程封装成一个共享模式(SHARED)的节点,并插入主队列末尾。addWaiter(Node mode) 方法前文已经分析过,不再赘述。

该方法与  acquireQueued 方法的区别在于 setHeadAndPropagate 方法,把当前节点设置为头节点之后,还会有传播(propagate)行为:


 

private void setHeadAndPropagate(Node node, int propagate) {

// 记录旧的头节点

Node h = head; // Record old head for check below

// 将 node 设置为头节点

setHead(node);

/*

* Try to signal next queued node if:

* Propagation was indicated by caller,

* or was recorded (as h.waitStatus either before

* or after setHead) by a previous operation

* (note: this uses sign-check of waitStatus because

* PROPAGATE status may transition to SIGNAL.)

* and

* The next node is waiting in shared mode,

* or we don't know, because it appears null

*

* The conservatism in both of these checks may cause

* unnecessary wake-ups, but only when there are multiple

* racing acquires/releases, so most need signals now or soon

* anyway.

*/

if (propagate > 0 || h == null || h.waitStatus < 0 ||

(h = head) == null || h.waitStatus < 0) {

Node s = node.next;

// 后继节点为空或共享模式唤醒

if (s == null || s.isShared())

doReleaseShared();

}

}

doReleaseShared:


 

private void doReleaseShared() {

/*

* Ensure that a release propagates, even if there are other

* in-progress acquires/releases. This proceeds in the usual

* way of trying to unparkSuccessor of head if it needs

* signal. But if it does not, status is set to PROPAGATE to

* ensure that upon release, propagation continues.

* Additionally, we must loop in case a new node is added

* while we are doing this. Also, unlike other uses of

* unparkSuccessor, we need to know if CAS to reset status

* fails, if so rechecking.

*/

for (;;) {

// 这里的头节点已经是上面设置后的头节点了

Node h = head;

// 由于该方法有两个入口(setHeadAndPropagate 和 releaseShared),需考虑并发控制

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

// 唤醒后继节点

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

// 若头节点不变,则跳出循环;否则继续循环

if (h == head) // loop if head changed

break;

}

}

该方法与独占模式下的获取方法 acquire 大体相似,不同在于该方法中,节点获取资源后会传播状态,即,有可能会继续唤醒后继节点。 值得注意的是:该方法有两个入口 setHeadAndPropagate 和 releaseShared, 可能有多个线程操作, 需考虑并发控制。

此外,本人对于将节点设置为 PROPAGATE 状态的理解还不是很清晰,网上说法也不止一种,待后续研究明白再补充。

2. 以共享模式获取资源(响应中断)

该方法与  acquireShared 类似:


 

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

tryAcquireShared 方法前面已分析,若获取资源失败,则会执行 doAcquireSharedInterruptly 方法:


 

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

// 把当前线程封装成共享模式节点,并插入主队列

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

// 与 doAcquireShared 相比,区别在于这里抛出了异常

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

从代码可以看到,acquireSharedInterruptibly 方法与 acquireShared 方法几乎完全一样,不同之处仅在于前者会抛出 InterruptedException 异常响应中断;而后者仅记录标志位,获取结束后才响应。

3. 以共享模式获取资源(响应中断,且有超时)

代码如下( 该方法可与前文独占模式下的超时获取方法比较分析)


 

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

return tryAcquireShared(arg) >= 0 ||

doAcquireSharedNanos(arg, nanosTimeout);

}

doAcquireSharedNanos: 


 

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)

throws InterruptedException {

if (nanosTimeout <= 0L)

return false;

final long deadline = System.nanoTime() + nanosTimeout;

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return true;

}

}

nanosTimeout = deadline - System.nanoTime();

if (nanosTimeout <= 0L)

return false;

if (shouldParkAfterFailedAcquire(p, node) &&

nanosTimeout > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanosTimeout);

if (Thread.interrupted())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

该方法可与独占模式下的超时等待方法 tryAcquireNanos(int arg, long nanosTimeout) 进行对比,二者操作基本一致,不再详细分析。

4. 释放资源,唤醒节点,传播状态

如下:


 

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

tryReleaseShared:


 

protected boolean tryReleaseShared(int arg) {

throw new UnsupportedOperationException();

}

doReleaseShared() 方法前面已经分析过了。本方法与独占模式的 release 方法类似,不同的地方在于“传播”二字。

场景分析

为了便于理解独占模式和共享模式下队列和节点的状态,下面简要举例分析。

场景如下:有 T0~T4 共 5 个线程按先后顺序获取资源,其中 T2 和 T3 为共享模式,其他均为独占模式。

就此场景分析:T0 先获取到资源(假设占用时间较长),而后 T1~T4 再获取则失败,会依次进入主队列。 此时主队列中各个节点的状态示意图如下:

JDK源码分析-AbstractQueuedSynchronizer(3)

之后,T0 操作完毕并释放资源,会将 T1 唤醒。T1(独占模式) 会从 acquireQueued(final Node node, int arg) 方法的循环中继续获取资源,这时会获取成功,并将 T1 设置为头节点 (T 被移除) 。此时主队列节点示意图如下:

JDK源码分析-AbstractQueuedSynchronizer(3)

此时,T1 获取到资源并进行相关操作。

而后,T1 操作完释放资源,并唤醒下一个节点 T2,T2(共享模式) 继续从 doAcquireShared(int) 方法的循环中执行。此时 T2 获取资源成功,将自身设为头节点(T1 被移除),由于后继节点 T3 也是共享模式,因此 T1 会继续唤醒 T3;T3 唤醒后的操作与 T2 相同,但后继节点 T4 不是共享模式,因此不再继续唤醒。此时队列节点状态示意图如下:

JDK源码分析-AbstractQueuedSynchronizer(3)

此时,T2 和 T3 同时获取到资源。

之后,当二者都释放资源后会唤醒 T4:

JDK源码分析-AbstractQueuedSynchronizer(3)

T4 获取资源的与 T1 类似。

PS: 该场景仅供参考,只为便于理解,若有不当之处敬请指正。

小结

本文分析了以共享模式获取资源的三种方式,以及释放资源的操作。 分别为:

1. acquireShared: 共享模式获取资源,忽略中断;

2.  acquire Shared Interruptibly: 共享模式获取资源,响应中断;

3.  tryAcquire Shared Nanos: 共享模式获取资源,响应中断,有超时;

4. release Shared : 释放资源,唤醒后继节点,并确保传播。

并简要分析一个场景下主队列中各个节点的状态。此外,AQS 中还有嵌套类 ConditionObject 及条件队列的相关操作,后面涉及到的时候再进行分析。

单独去分析 AQS 的源码比较枯燥,后文会结合 ReentrantLock、CountdownLatch 等常用并发工具类的源码进行分析。

上述解析是参考其他资料及个人理解,若有不当之处欢迎指正。

相关阅读:

JDK源码分析-AbstractQueuedSynchronizer(2)

JDK源码分析-AbstractQueuedSynchronizer(1)

Stay hungry, stay foolish.

JDK源码分析-AbstractQueuedSynchronizer(3)

原文  http://mp.weixin.qq.com/s?__biz=MzU4NzYyMDE4MQ==&mid=2247483913&idx=1&sn=83937be40624447885473a31f0f7b072
正文到此结束
Loading...