转载

AbstractQueuedSynchronizer源码解析

AQS 是什么,相信大家都不陌生这个题目,那么 AQS 到底是什么呢? AQS的全称是 Abstract Queued Synchronizer , 从字面意思理解也就是 抽象队列同步器 ,实际上 AQS 确实就是 排队同步队列 , 也是一个 抽象 类,需要 自定义 同步队列中 可执行权获取和释放中的逻辑(重新定义获取和释放语义) ,也就是重写 tryAcquire tryRelease tryAcquireShared tryReleaseShared 等方法,当然也可以 自定义方法 来通过调用 AQS 提供的 判断方法进行逻辑判断 。在 JDK9 之前 AQS 是依赖于 CAS 的,其底层是通过 UnsafecompareAndSwap* 方法实现同步更改,在之后则是使用 VarHandle , 也替代了 Unsafe说白了 AQS 利用 VarHandle 保证操作的原子性

大白话就可以理解为: 表示某件事情同一时间点仅有一人可以进行操作,如有多人则需要排队等待, 等到当前操作人完成后通知下一个人。

AQS源码解析

前言

在源码中 AbstractQueuedSynchronizer 继承了 AbstractOwnableSynchronizer , 同时也就继承了 exclusiveOwnerThread 属性,也就是 独占模式同步器的拥有者 ** , 也就意味着 该线程是当前正在执行的线程**。

AQS 中有几个重点方法,分别是: acquire acquireInterruptibly tryAcquireNanos release acquireShared acquireSharedInterruptibly tryAcquireSharedNanos releaseShared 下面逐一分析。

在分析源代码之前,先来看一张图来了解一下 AQS排队同步队列Node 节点中的 waitStatus 状态 。

AbstractQueuedSynchronizer源码解析

waitStatus 状态都分为是什么

  • CANCELLED ,值为1,代表同步队列中等待的线程 等待超时 或者 被中断 ,需要从同步队列中剔除,节点进入该状态以后不会再发生变化了。
  • SIGNAL ,值为-1,代表后继节点的线程处于等待状态,而如果当前节点的线程如果释放了同步状态或被取消,将会通知后继结点,使后继节点的线程得以运行。
  • CONDITION , 值为-2,节点在等待队列,节点线程等待在Condition上,当其他线程调用Condition的signal方法后,该节点将会从等待队列中转移到同步队列中。
  • PROPAGATE , 值为-3,表示共享式同步状态回去将会无条件的被传播下去,
  • INITAL , 值为0,初始状态。

acquire(获取)

官方解释就是 Acquires in exclusive mode, ignoring interrupts 获取独占模式并忽略interrupt(中断) , 翻译成大白话就是就可以理解为获取 独占模式 , 看一下源码

public final void acquire(int arg) {
    // 判断线程是否有可继续执行的权限, 如果没有则创建node 加入到队列中
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
复制代码

acquire 方法中分别调用了 tryAcquire acquireQueuedaddWaiter 方法,其中 tryAcquire 方法是需要自定义(重写) 获取、 执行权限 的逻辑,这里我们以 AbstractQueuedSynchronizer 的实现 ReentrantLock 为例,简单分析一下,先看 tryAcquire 方法

protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取当前线程的重入次数 如果是 0 则代表第一次
    int c = getState();
    if (c == 0) {
        // 判断是否存在队列 && 可以获取到可执行权
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            // 设置独占模式同步器的拥有者 也就是是哪个线程持有
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果进入线程是 持有可执行权的线程 则做重入 + 1 操作
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
复制代码

tryAcquire 方法核心代码就是 判断执行权限 ,这里就不具体分析了,会在下一篇文章中进行ReentrantLock的源码分析,接下来重点看 acquireQueuedaddWaiter 方法。

private Node addWaiter(Node mode) {
    // 通过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占
  	Node node = new Node(mode);
    for (;;) {
        Node oldTail = tail;
      	// 如果 tail 不是 null 则 设置 新建Node的前驱节点(prev) 指向 tail节点 反之 初始化同步队列
        if (oldTail != null) {
            // 设置 新建Node的前驱节点(prev) 指向 tail节点
            node.setPrevRelaxed(oldTail);
          	// 重新设置 tail 节点 指向 新建的Node节点
            // 白话就是 队列中的最后一个节点 == tail节点
            if (compareAndSetTail(oldTail, node)) {
                // 设置 未更改时的 tail节点 中 next 节点, 指向 新建Node节点
                oldTail.next = node;
                return node;
            }
        } else {
            // 初始化同步队列器
            initializeSyncQueue();
        }
    }
}
复制代码
// 如果未存在同步队列 则初始化同步队列
private final void initializeSyncQueue() {
    Node h;
  	// 设置 AQS head节点 为一个新建节点
    if (HEAD.compareAndSet(this, null, (h = new Node())))
      	// 赋值操作
        tail = h;
}
复制代码

addWaiterinitializeSyncQueue 方法中,核心就是新建 Node 节点并通过 acquireQueued 方法将节点加入到 AQS 中,接下来分析一下 addWaiter 具体做了什么

  1. 通过构造方法创建新的Node节点,并通过入参 mode 指定Node节点的模式,共享或独占。当然这里是设置的独占模式。

  2. 循环操作新建Node节点并将 新建节点tail 节点建立关系。首先判断 tail 是否是null,如果是则 步骤3 ,反之 步骤4

  3. 如果 tail 节点不为null, 首先将新建的 Node节点 中的 前驱节(prev) 点设置为当前的 tail 节点,然后通过 VarHandleAQStail 节点改为 新建的Node 节点,如果修改成功则将上一步 未更改时的 tail 节点 (也就是代码中的oldTail) 中的 next 指向 新建的Node节点 ,反之则可能因为并发操作导致 tial 节点已经被其他线程变更,需要再次循环操作直至成功。

  4. 如果 tial 节点是null, 则需要实例化同步对列,也就是 AQS , 通过调用 initializeSyncQueue 进行初始化操作,通过 VarHandle 设置 AQShead 指向一个新建节点 (new Node) , 然后将 head 节点的 引用 赋值给 tail 节点。 这里注意一下,是将 head 节点的 引用 赋值给 tail 节点, 也就是这时候 head 节点 和 tail 节点是同时指向一块内存地址 , 这里的用意就是在新建队列的时候, head 节点和新建节点的 prev 节点要保持是同一个引用 ,因为在后续的判断中, 获取可执行权的条件就是 AQShead 节点是否等于当前节点的 prev 节点。

    因为 addWaiter 方法中是一个循环,在 创建队列后 需要将队列新建的Node节点做关联,所以还需要在执行一次 步骤3

addWaiter 方法分析完后,再来看一下 acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) {
    // 线程中断状态
    boolean interrupted = false;
    try {
        for (;;) {
          	// 获取通过 addWaiter 创建的Node方法
            final Node p = node.predecessor();
          	// 判断 新建的Node节点是否等于head节点 && 可以获取到 可执行权
            if (p == head && tryAcquire(arg)) {
              	// 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head
                // 节点设置为当前线程创建的Node节点,可以保证head节点永远都可以和后续节点有关联关系
                setHead(node);
                // 设置 next 
                p.next = null; // help GC
                // 返回
                return interrupted;
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park  
            if (shouldParkAfterFailedAcquire(p, node))
                // 将线程 wait 并且线程被唤醒后 判断线程是否被中断
                // |= 操作等于 interrupted = interrupted | parkAndCheckInterrupt()
              	// |(按位或) 会将 | 两边的值进行二进制计算,运算规则一个为真即为真, 例如 1|1 = 1 或 1|0 = 1,
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
复制代码

acquireQueued 也是核心方法,在其中会对线程进行 LockSupport.park 进行控制,其实现方式是 循环 ,下面就具体分析一下

  1. 首先会获取 当前线程 所创建的 Node 节点中的 前置节点(prev)
  2. 判断 前置节点(prev) 是否 等于 AQShead 节点 && 可以获取到 可执行权 ,如果这两个条件成立则看 步骤3 ,反之看 步骤4 , 如果满足这两个条件,也就代表着 head 节点 所对应的线程 已经执行完成并且做了释放**(release方法)**操作。
  3. 如果 步骤2 条件成立,也就是 线程被唤醒后并获取到了可执行权 ,则将 head 节点设置为 当前线程创建的Node节点
  4. 如果 步骤2 条件不成立,则判断 Node 节点所对应的线程的状态是否符合改为 wait 状态。这个逻辑在 shouldParkAfterFailedAcquire 方法中, 接下来看一下。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果前置节点的 waitStatus == Signal 也就是 == -1 
    // 如果是 满足 线程 wait 条件
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    // 如果状态 > 0  也就是1  也就是线程已经被中断了
    // 在这里就会判断 前置节点的前置节点 是否还是被中断,如果是 循环继续判断前置节点, 
    // 如果不是 则将前置节点的next节点改为 入参的 node 节点 然后 返回false 继续循环判断 
    // 这里的作用就是 排除掉已经被中断的线程
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 否则设置 状态为 -1 等待唤醒状态 再次进来以后就会被 wait 
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}
复制代码

shouldParkAfterFailedAcquire 方法中主要就是判断节点所属的线程是否符规则,也就是更改为 wait 状态

  1. 判断 当前线程节点前置节点waitStatus 是否是 SIGNAL,如果是满足条件,返回 true ,线程将会 wait
  2. 判断 当前线程节点前置节点waitStatus 是否大于 0, 也就是1 ,如果条件成立,则代表 当前线程节点前置节点 所对应的线程已经被中断了,需要重新指定当前线程节点的前置节点(prev),通过循环的方式找到前置节点的节点,如果依然被中断,则继续循环,直到找到未中断线程所对应的Node节点为止。如果条件不成立则将 waitStatus 状态改为 SIGNAL 返回false, 再通过 acquireQueued 方法中的循环在执行一次 。
  3. 这里要说一下为什么要更改当前节点的 prev 节点中的 waitStatus 状态,是因为只有 前置节点(prev)waitStatus 等于 SIGNAL 也就是 -1 时, 就代表当前线程新建的Node节点的线程处于等待状态在当前节点的前置节点(prev) 的线程释放了同步状态或被取消,将会通知当前节点,使当前节点的线程得以运行

到这里我们整个的 acquire 方法就解析完了,接下来分享 release,有获取才有释放,会在release讲完后为大家分享一下 acquire 到 release的整个流程。

release(释放)

release 字面一次就是释放,释放通过 acquire 获取的独占模式,可以让 AQS 后续节点所对应的线程可以得到执行权,下面就看一下 release 方法

public final boolean release(int arg) {
    // 步骤 1 2 3 
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
复制代码

release 方法中首先会调用 tryRelease 方法,这里 tryRelease 方法将会有子类实现,先以 RenntrantLook 为例,这里就不展示代码了,就简单描述一下逻辑

  1. 首先会先用 state 减去 argstate 代表重入次数。
  2. 如果 步骤1 结果是0,则将 独占模式同步器的拥有者 改为null并返回true。
  3. 如果 步骤1 结果不是0, 则重新设置 state,返回false,表示还不可以释放。

接下来判断 AQShead 节点不是null并且 waitStatus 状态不等于0,代表 释放成功 ,然后进入 unparkSuccessor 方法,进行对下一个Node节点所对应的线程进行唤醒操作。

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    // 如果head的waitStatus<0 则将head的waitStatus改为0
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 如果 head 节点的 next 节点 == null 或者 节点 的状态 大于0 也就是1 也就是 下一个节点所对应的线程被中断了
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 将会循环整个同步队列,从tail节点开始 往前循环,直到只找到 waitStatus <= 0 的Node节点
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // 如果节点不是 null 则唤醒该节点的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}
复制代码

接下来分析一下在 unparkSuccessor 方法中都做了什么

  1. 首先如果 head 的 waitStatus <0, 则将 headwaitStatus 改为0。
  2. 如果 head 节点的 next 节点等于null 或者 waitStatus 状态 大于0也就是1, 表示 head 节点所对应的 next 节点所对应的线程已经被中断了,将会循环整个同步队列,从 tail 节点开始往前循环,直到找到 最前面的一个 waitStatus <= 0 的Node节点
  3. 如果 步骤2 条件不满足 则代表 head 的 next 节点不是null 或 waitStatus状态不等于1,调用 unpark 方法唤醒线程。

至此 release 方法就解析完成了,很简单,核心功能仅仅是如果符合规则,则调用 unpark 方法唤醒 AQS 队列中下一个节点所对应的线程。下面就分析一下 acquire 和 releae 整个流程。

总结 acquire 和 release

分析一次 acquirerelease 的整体流程

AbstractQueuedSynchronizer源码解析

接下来分析 acquireInterruptibly 方法, acquireInterruptibly 方法和 acquire 其实是一样的,只不过多判断了一下是否被中断。

acquireInterruptibly

acquireInterruptibly 方法就是 可中断的获取可执行权 ,具体流程和 acquire 相似。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
复制代码

acquireInterruptibly 方法中,首先会通过 Thread.interrupted() 方法判断线程是否被中断,如已经被中断,则抛出 InterruptedException , 反之则调用 tryAcquire 方法,判断是否 获取到执行权 ,如果未获取到则调用 doAcquireInterruptibly 方法进行创建 AQS新的Node节点 ,并将 新建的Node节点AQShead 节点进行关联。 到这里可能就会想到,这不是和 acquire 方法是一样的嘛,没错,就是一样。看一下源码

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    // 新建 Node 节点 并将节点 和 AQS 队列简历关联
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

看到源码是不是很熟悉,这不就是上边我们分析过的 acquire 方法嘛,唯一和 acquire 方法不同的就是,如果线程在被唤醒以后,也就是 head 节点的线程调用了 release 释放了可执行权,并且通过 LockSupport.park 方法唤醒了 head 的 next节点所属的线程时, headnext 节点所属的线程已经被中断了就会抛出 InterruptedException 异常。

这里就不进行 addWaiter 方法 和 parkAndCheckInterrupt 方法的源码展示了,如果还不明白就看一下上边 acquire 方法的源码分析。

tryAcquireNanos

tryAcquireNanos 方法的含义就是 可超时的获取执行权 ,如果设置的 超时时间 到了,还未获取到可执行权,则直接返回 false 。这里的超时时间单位是 纳秒 ns1秒(s)=1000000000纳秒(ns)

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
复制代码

看到 tryAcquireNanos 方法会想到什么? 看到方法上的 throws InterruptedException 就一下想到了上面刚刚刚说的 acquireInterruptibly 方法, 支持可中断的获取执行权 。首先这里会先调用 tryAcquire 方法获取执行权,如果可以获取到执行权则直接返回,反之则调用 doAcquireNanos(arg, nanosTimeout) 方法进行 新建 Node 节点 并和 AQShead 节点进行关联, 并且会将节点加入到 AQS的队列中然后将节点所属的线程放入等待队列中

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
  // 判断超时时间 是否小于等于 0 如果这 则直接返回 false  
  if (nanosTimeout <= 0L)
        return false;
    // 使用当前时间的 纳秒 + 超时时间的纳秒 =  未来超时的超时时间,用来做parkNanos, 
    // 就相当于 Object.wait(long timeoutMillis) 方法
    final long deadline = System.nanoTime() + nanosTimeout;
  
    //通过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            // 判断 新建的Node节点是否等于head节点 && 可以获取到'可执行权'
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head
                // 节点设置为当前线程创建的Node节点,可以保证head节点永远都可以和后续节点有关联关系
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            // 判断计算过的 deadline 时间 - 当前时间 是否小于0 是则 超时时间已过,返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park  并且 纳秒必须大于 1000 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            // 判断线程是否被中断 如果中断则抛出 InterruptedException 异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

如果在 acquirerelease 分析中理解其中原理是不是觉得这里很简单,这里也不列举已经分析过的方法了,直接说出不同点

  1. 增加超时时间,在这里使用了 LockSupport.parkNanos(this, nanosTimeout) 方法,也就相当于 Object.wait(long timeoutMillis) 方法,等待的线程的状态会在超时时间失效从 wait 变为 run
  2. 线程被唤醒以后,也就是过了超时时间则会判断计算过的 deadline时间 - 当前时间 是否小于0, 若果是则代表超时时间已过,直接返回false,反之则继续执行。
  3. 支持中断 ,如果已中断则会抛出 InterruptedException 异常。

是不是很简单,读者要把重点放到 acquirerelease 上,其他的就很容易了。上面的内容均是获取的独占模式,下面来讲解一下 共享模式。

acquireShared

public final void acquireShared(int arg) {
    // 通过调用 tryAcquireShared 方法获取可执行权,如果未获取到则调用 doAcquireShared
    // 方法进行 新建 Node节点 并和 AQS的 head 节点建立关系,
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
复制代码

通过 acquireShared 方法可以看到和 acquire 并没有什么区别,获取可执行权的代码需要 自定义同步器 实现,在共享模式分析中就不对 ReentrantReadWriteLock 源码进行分析了 ,会在后面对 ReentrantLockReentrantReadWriteLock 进行源码分析,接下来看一下 doAcquireShared ,看它是不是和 acquireQueued 方法也是一样的逻辑呢?

private void doAcquireShared(int arg) {
  	// 获取通过 addWaiter 创建的Node方法
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            // 获取新建节点的前置节点
            final Node p = node.predecessor();
         	  // 判断 新建的Node节点是否等于head节点
            if (p == head) {
                // 如果上边的 p==head 需要在此判断是否可以获取到'可执行权'
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                  	// 如果获取到了可执行权
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park  
            if (shouldParkAfterFailedAcquire(p, node))
                // 将线程 wait 并且线程被唤醒后 判断线程是否被中断
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}
复制代码

doAcquireShared 方法中,我们看到,首先依然是调用 addWaiter 方法进行新建Node,这里就不多说,可以看一下上边的方法详解, doAcquireShared 也是核心方法,在其中会对线程进行 LockSupport.park 进行控制,其实现方式是 循环 ,下面就具体分析一下

  1. 首先会获取 当前线程 所创建的 Node 节点中的 前置节点(prev)
  2. 判断 前置节点(prev) 是否 等于 AQShead 节点,如果条件成立则看 步骤3 ,反之看 步骤4 , 如果满足条件,也就代表着 head 节点 所对应的线程 已经执行完成并且做了释放**(release方法)**操作。
  3. 如果 步骤2 条件成立,则再次判断 当前线程是否可以获取到可执行权 ,如果可以则设置 AQShead 节点为当前线程的 新建的Node节点 , 反之则看 步骤3
  4. 如果 步骤2步骤3 条件不成立,则判断 Node 节点所对应的线程的状态是否符合改为 wait 状态,也就是是否可以加入到等待队列中。这个逻辑在 shouldParkAfterFailedAcquire 方法中,可以看一下上边的方法详解。

acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
复制代码
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
  	// 获取通过 addWaiter 创建的Node方法
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            // 获取新建节点的前置节点
            final Node p = node.predecessor();
            // 判断 新建的Node节点是否等于head节点
            if (p == head) {
                // 如果获取到了可执行权
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判断Node节点的线程是否符合被 wait && 将线程 wait 并且线程被唤醒后判断线程是否被中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

可以看到 acquireSharedInterruptiblyacquireShared 方法并没有什么太大区别,唯一的区别就是在调用 parkAndCheckInterrupt 线程状态被 wait ,等到当前节点 prev 节点的所属线程调用了 release 方法后,唤醒当前节点所属线程时,如果当前线程被中断了会抛出 InterruptedException 异常。

tryAcquireSharedNanos

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}
复制代码
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果超时时间 小于等于0 则直接 返回失败
    if (nanosTimeout <= 0L)
        return false;
    // 使用当前时间的 纳秒 + 超时时间的纳秒 =  未来超时的超时时间,用来做parkNanos, 
    // 就相当于 Object.wait(long timeoutMillis) 方法
    final long deadline = System.nanoTime() + nanosTimeout;
  	
    //通过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占。这里是共享
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            // 判断 新建的Node节点是否等于head节点
            final Node p = node.predecessor();
            if (p == head) {
                // 是否可以获得可执行权
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
          
            // 判断计算过的 deadline 时间 - 当前时间 是否小于或等于0 是则超时时间已过,返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park  并且 纳秒必须大于 1000 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
          	// // 判断线程是否被中断 如果中断则抛出 InterruptedException 异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

有没有发现, doAcquireSharedNanos 方法和 doAcquireNanos 方法很相似呢,如果在 acquireShared 分析中理解其原理是不是觉得这里很简单,这里也不列举已经分析过的方法了,直接说出不同点

  1. 增加超时时间,在这里使用了 LockSupport.parkNanos(this, nanosTimeout) 方法,也就相当于 Object.wait(long timeoutMillis) 方法,等待的线程的状态会在超时时间失效从 wait 变为 run
  2. 线程被唤醒以后,也就是过了超时时间则会判断计算过的 deadline时间 - 当前时间 是否小于0, 若果是则代表超时时间已过,直接返回false,反之则继续执行。
  3. 支持中断 ,如果已中断则会抛出 InterruptedException 异常。

是不是很简单,读者要把重点放到 acquireacquireShared 上,其他的就很容易了。

releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 如果更新失败则循环
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒 head 节点的 next 节点所属的线程
                    unparkSuccessor(h);
                }
                // 如果更新失败则循环
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果 head 改变了则再次循环
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

tryReleaseShared 方法和 release 方法稍微有一点区别,下面我们就具体分析一下

  1. 首先去尝试释放资源通过 tryReleaseShared 方法,如果释放成功了,就代表有资源空闲出来,那么就看 步骤2
  2. 调用 doReleaseShared 去唤醒后续结点, 在 doReleaseShared 方法中采用了 loop ,每一次循环的过程都是首先获得 head 节点,如果 head 结点不为空且不等于 tail 结点,那么先获得该节点的状态,如果是SIGNAL的状态,则代表它需要有后继结点去唤醒,首先将其的状态变为0(初始状态),然后通过 unparkSuccessor 方法唤醒后续节点所属的线程,如果结点状态一开始就是0,那么就给他转换成 PROPAGATE 状态,保证在后续获取资源的时候,还能够向后面传播。

至此我们已经分析完了 AbstractQueuedSynchronizer 的源码,是不是很简单呢?最主要的还是要理解AQS的整体流程,说白了AQS是依赖两大利器,也就是 VarHandle 和 LockSupport。

博客地址:lantaoblog.site

AbstractQueuedSynchronizer源码解析
原文  https://juejin.im/post/5d9ef782f265da5b9764c0f3
正文到此结束
Loading...