转载

AbstractQueuedSynchronizer 原理分析 - 独占/共享模式

1.简介

AbstractQueuedSynchronizer (抽象队列同步器,以下简称 AQS)出现在 JDK 1.5 中,由大师 Doug Lea 所创作。AQS 是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。除此之外,我们还可以基于 AQS,定制出我们所需要的同步器。

AQS 的使用方式通常都是通过内部类继承 AQS 实现同步功能,通过继承 AQS,可以简化同步器的实现。如前面所说,AQS 是很多同步器实现的基础框架。弄懂 AQS 对理解 Java 并发包里的组件大有裨益,这也是我学习 AQS 并写出这篇文章的缘由。另外,需要说明的是,AQS 本身并不是很好理解,细节很多。在看的过程中药有一定的耐心,做好看多遍的准备。好了,其他的就不多说了,开始进入正题吧。

2.原理概述

在 AQS 内部,通过维护一个 FIFO 队列 来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。大致示意图如下:

AbstractQueuedSynchronizer 原理分析 - 独占/共享模式

当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点

线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点。大致示意图如下:

AbstractQueuedSynchronizer 原理分析 - 独占/共享模式

3.重要方法介绍

本节将介绍三组重要的方法,通过使用这三组方法即可实现一个同步组件。

第一组方法是用于访问/设置同步状态的,如下:

方法 说明
int getState() 获取同步状态
void setState() 设置同步状态
boolean compareAndSetState(int expect, int update) 通过 CAS 设置同步状态

第二组方需要由同步组件覆写。如下:

方法 说明
boolean tryAcquire(int arg) 独占式获取同步状态
boolean tryRelease(int arg) 独占式释放同步状态
int tryAcquireShared(int arg) 共享式获取同步状态
boolean tryReleaseShared(int arg) 共享式私房同步状态
boolean isHeldExclusively() 检测当前线程是否获取独占锁

第三组方法是一组模板方法,同步组件可直接调用。如下:

方法 说明
void acquire(int arg) 独占式获取同步状态,该方法将会调用 tryAcquire 尝试获取同步状态。获取成功则返回,获取失败,线程进入同步队列等待。
void acquireInterruptibly(int arg) 响应中断版的 acquire
boolean tryAcquireNanos(int arg,long nanos) 超时+响应中断版的 acquire
void acquireShared(int arg) 共享式获取同步状态,同一时刻可能会有多个线程获得同步状态。比如读写锁的读锁就是就是调用这个方法获取同步状态的。
void acquireSharedInterruptibly(int arg) 响应中断版的 acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos) 超时+响应中断版的 acquireShared
boolean release(int arg) 独占式释放同步状态
boolean releaseShared(int arg) 共享式释放同步状态

上面列举了一堆方法,看似繁杂。但稍微理一下,就会发现上面诸多方法无非就两大类:一类是独占式获取和释放共享状态,另一类是共享式获取和释放同步状态。至于这两类方法的实现细节,我会在接下来的章节中讲到,继续往下看吧。

4.源码分析

4.1 节点结构

在并发的情况下,AQS 会将未获取同步状态的线程将会封装成节点,并将其放入同步队列尾部。同步队列中的节点除了要保存线程,还要保存等待状态。不管是独占式还是共享式,在获取状态失败时都会用到节点类。所以这里我们要先看一下节点类的实现,为后面的源码分析进行简单铺垫。源码如下:

static final class Node {

    /** 共享类型节点,标记节点在共享模式下等待 */
    static final Node SHARED = new Node();
    
    /** 独占类型节点,标记节点在独占模式下等待 */
    static final Node EXCLUSIVE = null;

    /** 等待状态 - 取消 */
    static final int CANCELLED =  1;
    
    /** 
     * 等待状态 - 通知。某个节点是处于该状态,当该节点释放同步状态后,
     * 会通知后继节点线程,使之可以恢复运行 
     */
    static final int SIGNAL    = -1;
    
    /** 等待状态 - 条件等待。表明节点等待在 Condition 上 */
    static final int CONDITION = -2;
    
    /**
     * 等待状态 - 传播。表示无条件向后传播唤醒动作,详细分析请看第五章
     */
    static final int PROPAGATE = -3;

    /**
     * 等待状态,取值如下:
     *   SIGNAL,
     *   CANCELLED,
     *   CONDITION,
     *   PROPAGATE,
     *   0
     * 
     * 初始情况下,waitStatus = 0
     */
    volatile int waitStatus;

    /**
     * 前驱节点
     */
    volatile Node prev;

    /**
     * 后继节点
     */
    volatile Node next;

    /**
     * 对应的线程
     */
    volatile Thread thread;

    /**
     * 下一个等待节点,用在 ConditionObject 中
     */
    Node nextWaiter;

    /**
     * 判断节点是否是共享节点
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 获取前驱节点
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    /** addWaiter 方法会调用该构造方法 */
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    /** Condition 中会用到此构造方法 */
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

4.2 独占模式分析

4.2.1 获取同步状态

独占式获取同步状态时通过 acquire 进行的,下面来分析一下该方法的源码。如下:

/**
 * 该方法将会调用子类复写的 tryAcquire 方法获取同步状态,
 * - 获取成功:直接返回
 * - 获取失败:将线程封装在节点中,并将节点置于同步队列尾部,
 *     通过自旋尝试获取同步状态。如果在有限次内仍无法获取同步状态,
 *     该线程将会被 LockSupport.park 方法阻塞住,直到被前驱节点唤醒
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

/** 向同步队列尾部添加一个节点 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 尝试以快速方式将节点添加到队列尾部
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    // 快速插入节点失败,调用 enq 方法,不停的尝试插入节点
    enq(node);
    return node;
}

/**
 * 通过 CAS + 自旋的方式插入节点到队尾
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 设置头结点,初始情况下,头结点是一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            /*
             * 将节点插入队列尾部。这里是先将新节点的前驱设为尾节点,之后在尝试将新节点设为尾节
             * 点,最后再将原尾节点的后继节点指向新的尾节点。除了这种方式,我们还先设置尾节点,
             * 之后再设置前驱和后继,即:
             * 
             *    if (compareAndSetTail(t, node)) {
             *        node.prev = t;
             *        t.next = node;
             *    }
             *    
             * 但但如果是这样做,会导致一个问题,即短时内,队列结构会遭到破坏。考虑这种情况,
             * 某个线程在调用 compareAndSetTail(t, node)成功后,该线程被 CPU 切换了。此时
             * 设置前驱和后继的代码还没带的及执行,但尾节点指针却设置成功,导致队列结构短时内会
             * 出现如下情况:
             *
             *      +------+  prev +-----+       +-----+
             * head |      | <---- |     |       |     |  tail
             *      |      | ----> |     |       |     |
             *      +------+ next  +-----+       +-----+
             *
             * tail 节点完全脱离了队列,这样导致一些队列遍历代码出错。如果先设置
             * 前驱,在设置尾节点。及时线程被切换,队列结构短时可能如下:
             *
             *      +------+  prev +-----+ prev  +-----+
             * head |      | <---- |     | <---- |     |  tail
             *      |      | ----> |     |       |     |
             *      +------+ next  +-----+       +-----+
             *      
             * 这样并不会影响从后向前遍历,不会导致遍历逻辑出错。
             * 
             * 参考:
             *    https://www.cnblogs.com/micrari/p/6937995.html
             */
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * 同步队列中的线程在此方法中以循环尝试获取同步状态,在有限次的尝试后,
 * 若仍未获取锁,线程将会被阻塞,直至被前驱节点的线程唤醒。
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 循环获取同步状态
        for (;;) {
            final Node p = node.predecessor();
            /*
             * 前驱节点如果是头结点,表明前驱节点已经获取了同步状态。前驱节点释放同步状态后,
             * 在不出异常的情况下, tryAcquire(arg) 应返回 true。此时节点就成功获取了同
             * 步状态,并将自己设为头节点,原头节点出队。
             */ 
            if (p == head && tryAcquire(arg)) {
                // 成功获取同步状态,设置自己为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            
            /*
             * 如果获取同步状态失败,则根据条件判断是否应该阻塞自己。
             * 如果不阻塞,CPU 就会处于忙等状态,这样会浪费 CPU 资源
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        /*
         * 如果在获取同步状态中出现异常,failed = true,cancelAcquire 方法会被执行。
         * tryAcquire 需同步组件开发者覆写,难免不了会出现异常。
         */
        if (failed)
            cancelAcquire(node);
    }
}

/** 设置头节点 */
private void setHead(Node node) {
    // 仅有一个线程可以成功获取同步状态,所以这里不需要进行同步控制
    head = node;
    node.thread = null;
    node.prev = null;
}

/**
 * 该方法主要用途是,当线程在获取同步状态失败时,根据前驱节点的等待状态,决定后续的动作。比如前驱
 * 节点等待状态为 SIGNAL,表明当前节点线程应该被阻塞住了。不能老是尝试,避免 CPU 忙等。
 *    —————————————————————————————————————————————————————————————————
 *    | 前驱节点等待状态 |                   相应动作                     |
 *    —————————————————————————————————————————————————————————————————
 *    | SIGNAL         | 阻塞                                          |
 *    | CANCELLED      | 向前遍历, 移除前面所有为该状态的节点               |
 *    | waitStatus < 0 | 将前驱节点状态设为 SIGNAL, 并再次尝试获取同步状态   |
 *    —————————————————————————————————————————————————————————————————
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    /* 
     * 前驱节点等待状态为 SIGNAL,表示当前线程应该被阻塞。
     * 线程阻塞后,会在前驱节点释放同步状态后被前驱节点线程唤醒
     */
    if (ws == Node.SIGNAL)
        return true;
        
    /*
     * 前驱节点等待状态为 CANCELLED,则以前驱节点为起点向前遍历,
     * 移除其他等待状态为 CANCELLED 的节点。
     */ 
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 等待状态为 0 或 PROPAGATE,设置前驱节点等待状态为 SIGNAL,
         * 并再次尝试获取同步状态。
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    // 调用 LockSupport.park 阻塞自己
    LockSupport.park(this);
    return Thread.interrupted();
}

/**
 * 取消获取同步状态
 */
private void cancelAcquire(Node node) {
    if (node == null)
        return;

    node.thread = null;

    // 前驱节点等待状态为 CANCELLED,则向前遍历并移除其他为该状态的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 记录 pred 的后继节点,后面会用到
    Node predNext = pred.next;

    // 将当前节点等待状态设为 CANCELLED
    node.waitStatus = Node.CANCELLED;

    /*
     * 如果当前节点是尾节点,则通过 CAS 设置前驱节点 prev 为尾节点。设置成功后,再利用 CAS 将 
     * prev 的 next 引用置空,断开与后继节点的联系,完成清理工作。
     */ 
    if (node == tail && compareAndSetTail(node, pred)) {
        /* 
         * 执行到这里,表明 pred 节点被成功设为了尾节点,这里通过 CAS 将 pred 节点的后继节点
         * 设为 null。注意这里的 CAS 即使失败了,也没关系。失败了,表明 pred 的后继节点更新
         * 了。pred 此时已经是尾节点了,若后继节点被更新,则是有新节点入队了。这种情况下,CAS 
         * 会失败,但失败不会影响同步队列的结构。
         */
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 根据条件判断是唤醒后继节点,还是将前驱节点和后继节点连接到一起
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                /*
                 * 这里使用 CAS 设置 pred 的 next,表明多个线程同时在取消,这里存在竞争。
                 * 不过此处没针对 compareAndSetNext 方法失败后做一些处理,表明即使失败了也
                 * 没关系。实际上,多个线程同时设置 pred 的 next 引用时,只要有一个能设置成
                 * 功即可。
                 */
                compareAndSetNext(pred, predNext, next);
        } else {
            /*
             * 唤醒后继节点对应的线程。这里简单讲一下为什么要唤醒后继线程,考虑下面一种情况:
             *        head          node1         node2         tail
             *        ws=0          ws=1          ws=-1         ws=0
             *      +------+  prev +-----+  prev +-----+  prev +-----+
             *      |      | <---- |     | <---- |     | <---- |     |  
             *      |      | ----> |     | ----> |     | ----> |     |
             *      +------+  next +-----+  next +-----+  next +-----+
             *      
             * 头结点初始状态为 0,node1、node2 和 tail 节点依次入队。node1 自旋过程中调用 
             * tryAcquire 出现异常,进入 cancelAcquire。head 节点此时等待状态仍然是 0,它
             * 会认为后继节点还在运行中,所它在释放同步状态后,不会去唤醒后继等待状态为非取消的
             * 节点 node2。如果 node1 再不唤醒 node2 的线程,该线程面临无法被唤醒的情况。此
             * 时,整个同步队列就回全部阻塞住。
             */
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    /*
     * 通过 CAS 将等待状态设为 0,让后继节点线程多一次
     * 尝试获取同步状态的机会
     */
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
       /*
        * 这里如果 s == null 处理,是不是表明 node 是尾节点?答案是不一定。原因之前在分析 
        * enq 方法时说过。这里再啰嗦一遍,新节点入队时,队列瞬时结构可能如下:
        *                      node1         node2
        *      +------+  prev +-----+ prev  +-----+
        * head |      | <---- |     | <---- |     |  tail
        *      |      | ----> |     |       |     |
        *      +------+ next  +-----+       +-----+
        * 
        * node2 节点为新入队节点,此时 tail 已经指向了它,但 node1 后继引用还未设置。
        * 这里 node1 就是 node 参数,s = node1.next = null,但此时 node1 并不是尾
        * 节点。所以这里不能从前向后遍历同步队列,应该从后向前。
        */
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒 node 的后继节点线程
        LockSupport.unpark(s.thread);
}

到这里,独占式获取同步状态的分析就讲完了。如果仅分析获取同步状态的大致流程,那么这个流程并不难。但若深入到细节之中,还是需要思考思考。这里对独占式获取同步状态的大致流程做个总结,如下:

  1. 调用 tryAcquire 方法尝试获取同步状态
  2. 获取成功,直接返回
  3. 获取失败,将线程封装到节点中,并将节点入队
  4. 入队节点在 acquireQueued 方法中自旋获取同步状态
  5. 若节点的前驱节点是头节点,则再次调用 tryAcquire 尝试获取同步状态
  6. 获取成功,当前节点将自己设为头节点并返回
  7. 获取失败,可能再次尝试,也可能会被阻塞。这里简单认为会被阻塞。

上面的步骤对应下面的流程图:

AbstractQueuedSynchronizer 原理分析 - 独占/共享模式

上面流程图参考自《Java并发编程》第128页图 5-5,这里进行了重新绘制,并做了一定的修改。

4.2.2 释放同步状态

相对于获取同步状态,释放同步状态的过程则要简单的多,这里简单罗列一下步骤:

  1. 调用 tryRelease(arg) 尝试释放同步状态
  2. 根据条件判断是否应该唤醒后继线程

就两个步骤,下面看一下源码分析。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        /*
         * 这里简单列举条件分支的可能性,如下:
         * 1. head = null
         *     head 还未初始化。初始情况下,head = null,当第一个节点入队后,head 会被初始
         *     为一个虚拟(dummy)节点。这里,如果还没节点入队就调用 release 释放同步状态,
         *     就会出现 h = null 的情况。
         *     
         * 2. head != null && waitStatus = 0
         *     表明后继节点对应的线程仍在运行中,不需要唤醒
         * 
         * 3. head != null && waitStatus < 0
         *     后继节点对应的线程可能被阻塞了,需要唤醒 
         */
        if (h != null && h.waitStatus != 0)
            // 唤醒后继节点,上面分析过了,这里不再赘述
            unparkSuccessor(h);
        return true;
    }
    return false;
}

4.3 共享模式分析

与独占模式不同,共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch 和 Semaphore 等同步组件的基础,搞懂了,再去理解一些共享同步组件就不难了。

4.3.1 获取同步状态

共享类型的节点获取共享同步状态后,如果后继节点也是共享类型节点,当前节点则会唤醒后继节点。这样,多个节点线程即可同时获取共享同步状态。

public final void acquireShared(int arg) {
    // 尝试获取共享同步状态,tryAcquireShared 返回的是整型
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 这里和前面一样,也是通过有限次自旋的方式获取同步状态
        for (;;) {
            final Node p = node.predecessor();
            /*
             * 前驱是头结点,其类型可能是 EXCLUSIVE,也可能是 SHARED.
             * 如果是 EXCLUSIVE,线程无法获取共享同步状态。
             * 如果是 SHARED,线程则可获取共享同步状态。
             * 能不能获取共享同步状态要看 tryAcquireShared 具体的实现。比如多个线程竞争读写
             * 锁的中的读锁时,均能成功获取读锁。但多个线程同时竞争信号量时,可能就会有一部分线
             * 程因无法竞争到信号量资源而阻塞。
             */ 
            if (p == head) {
                // 尝试获取共享同步状态
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 设置头结点,如果后继节点是共享类型,唤醒后继节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
   
/**
 * 这个方法做了两件事情:
 * 1. 设置自身为头结点
 * 2. 根据条件判断是否要唤醒后继节点
 */ 
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // 设置头结点
    setHead(node);
    
    /*
     * 这个条件分支由 propagate > 0 和 h.waitStatus < 0 两部分组成。
     * h.waitStatus < 0 时,waitStatus = SIGNAL 或 PROPAGATE。这里仅依赖
     * 条件 propagate > 0 判断是否唤醒后继节点是不充分的,至于原因请参考第五章
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        /*
         * 节点 s 如果是共享类型节点,则应该唤醒该节点
         * 至于 s == null 的情况前面分析过,这里不在赘述。
         */ 
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

/**
 * 该方法用于在 acquires/releases 存在竞争的情况下,确保唤醒动作向后传播。
 */ 
private void doReleaseShared() {
    /*
     * 下面的循环在 head 节点存在后继节点的情况下,做了两件事情:
     * 1. 如果 head 节点等待状态为 SIGNAL,则将 head 节点状态设为 0,并唤醒后继节点
     * 2. 如果 head 节点等待状态为 0,则将 head 节点状态设为 PROPAGATE,保证唤醒能够正
     *    常传播下去。关于 PROPAGATE 状态的细节分析,后面会讲到。
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            /* 
             * ws = 0 的情况下,这里要尝试将状态从 0 设为 PROPAGATE,保证唤醒向后
             * 传播。setHeadAndPropagate 在读到 h.waitStatus < 0 时,可以继续唤醒
             * 后面的节点。
             */
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

到这里,共享模式下获取同步状态的逻辑就分析完了,不过我这里只做了简单分析。相对于独占式获取同步状态,共享式的情况更为复杂。独占模式下,只有一个节点线程可以成功获取同步状态,也只有获取已同步状态节点线程才可以释放同步状态。但在共享模式下,多个共享节点线程可以同时获得同步状态,在一些线程获取同步状态的同时,可能还会有另外一些线程正在释放同步状态。所以,共享模式更为复杂。这里我的脑力跟不上了,没法面面俱到的分析,见谅。

最后说一下共享模式下获取同步状态的大致流程,如下:

  1. 获取共享同步状态
  2. 若获取失败,则生成节点,并入队
  3. 如果前驱为头结点,再次尝试获取共享同步状态
  4. 获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒
  5. 若失败,将节点状态设为 SIGNAL,再次尝试。若再次失败,线程进入等待状态

4.3.2 释放共享状态

释放共享状态主要逻辑在 doReleaseShared 中,doReleaseShared 上节已经分析过,这里就不赘述了。共享节点线程在获取同步状态和释放同步状态时都会调用 doReleaseShared,所以 doReleaseShared 是多线程竞争集中的地方。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

5.PROPAGATE 状态存在的意义

AQS 的节点有几种不同的状态,这个在 4.1 节介绍过。在这几个状态中,PROPAGATE 的用途可能是最不好理解的。网上包括一些书籍关于该状态的叙述基本都是一句带过,也就是 PROPAGATE 字面意义,即向后传播唤醒动作。至于怎么传播,鲜有资料说明过。不过,好在最终我还是找到了一篇详细叙述了 PROPAGATE 状态的文章。在博客园上,博友 活在夢裡 在他的文章 AbstractQueuedSynchronizer源码解读 对 PROPAGATE,以及其他的一些细节进行了说明,很有深度。在钦佩之余,不由得感叹作者思考的很深入。在征得他的同意后,我将在本节中引用他文章中对 PROPAGATE 状态说明的部分,并进行一定的补充说明。这里感谢作者 活在夢裡 的精彩分享,若不参考他的文章,我的这篇文章内容会比较空洞。好了,其他的不多说了,继续往下分析。

在本节中,将会说明两个个问题,如下:

  1. PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?
  2. 引入 PROPAGATE 状态是为了解决什么问题?

这两个问题将会在下面两节中分别进行说明。

5.1 利用 PROPAGATE 传播唤醒动作

PROPAGATE 状态是用来传播唤醒动作的,那么它是在哪里进行传播的呢?答案是在 setHeadAndPropagate 方法中,这里再来看看 setHeadAndPropagate 方法的实现:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

大家注意看 setHeadAndPropagate 方法中那个长长的判断语句,其中有一个条件是 h.waitStatus < 0 ,当 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,这个条件就会成立。那么 PROPAGATE 状态是在何时被设置的呢?答案是在 doReleaseShared 方法中,如下:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {...}
            
            // 如果 ws = 0,则将 h 状态设为 PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        ...
    }
}

再回到 setHeadAndPropagate 的实现,该方法既然引入了 h.waitStatus < 0 这个条件,就意味着仅靠条件 propagate > 0 判断是否唤醒后继节点线程的机制是不充分的。至于为啥不充分,请继续往看下看。

5.2 引入 PROPAGATE 所解决的问题

PROPAGATE 的引入是为了解决一个 BUG – JDK-6801020 ,复现这个 BUG 的代码如下:

import java.util.concurrent.Semaphore;

public class TestSemaphore {

   private static Semaphore sem = new Semaphore(0);

   private static class Thread1 extends Thread {
       @Override
       public void run() {
           sem.acquireUninterruptibly();
       }
   }

   private static class Thread2 extends Thread {
       @Override
       public void run() {
           sem.release();
       }
   }

   public static void main(String[] args) throws InterruptedException {
       for (int i = 0; i < 10000000; i++) {
           Thread t1 = new Thread1();
           Thread t2 = new Thread1();
           Thread t3 = new Thread2();
           Thread t4 = new Thread2();
           t1.start();
           t2.start();
           t3.start();
           t4.start();
           t1.join();
           t2.join();
           t3.join();
           t4.join();
           System.out.println(i);
       }
   }
}

根据 BUG 的描述消息可知 JDK 6u11,6u17 两个版本受到影响。那么,接下来再来看看引起这个 BUG 的代码 – JDK 6u17 中 setHeadAndPropagate 和 releaseShared 两个方法源码,如下:

private void setHeadAndPropagate(Node node, int propagate) {
    setHead(node);
    if (propagate > 0 && node.waitStatus != 0) {
        /*
         * Don't bother fully figuring out successor.  If it
         * looks null, call unparkSuccessor anyway to be safe.
         */
        Node s = node.next;
        if (s == null || s.isShared())
            unparkSuccessor(node);
    }
}

// 和 release 方法的源码基本一样
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

下面来简单说明 TestSemaphore 这个类的逻辑。这个类持有一个数值为 0 的信号量对象,并创建了4个线程,线程 t1 和 t2 用于获取信号量,t3 和 t4 则是调用 release() 方法释放信号量。在一般情况下,TestSemaphore 这个类的代码都可以正常执行。但当有极端情况出现时,可能会导致同步队列挂掉。这里演绎一下这个极端情况,考虑某次循环时,队列结构如下:

AbstractQueuedSynchronizer 原理分析 - 独占/共享模式

  1. 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为 0 ,并唤醒线程 t1。此时信号量数值为1。
  2. 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回 0 。然后线程 t1 被切换,暂停运行。
  3. 时刻3:线程 t4 调用 releaseShared 方法,因 head 的状态为 0 ,所以 t4 不会调用 unparkSuccessor 方法。
  4. 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。但因为 propagate = 0,线程 t1 无法调用 unparkSuccessor 唤醒线程 t2,t2 面临无线程唤醒的情况。因为 t2 无法退出等待状态,所以 t2.join 会阻塞主线程,导致程序挂住。

下面再来看一下修复 BUG 后的代码,根据 BUG 详情页显示,该 BUG 在 JDK 1.7 中被修复。这里找一个 JDK 7 较早版本(JDK 7u10)的代码看一下,如下:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

在按照上面的代码演绎一下逻辑,如下:

  1. 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为 0 ,并唤醒线程t1。此时信号量数值为1。
  2. 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回 0 。然后线程 t1 被切换,暂停运行。
  3. 时刻3:线程 t4 调用 releaseShared 方法,检测到 h.waitStatus = 0 ,t4 将头节点等待状态由 0 设为 PROPAGATE(-3)
  4. 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。因 propagate = 0, propagate > 0 条件不满足。而 h.waitStatus = PROPAGATE(-3) ,所以条件 h.waitStatus < 0 成立。进而,线程 t1 可以唤醒线程 t2,完成唤醒动作的传播。

到这里关于状态 PROPAGATE 的内容就讲完了。最后,简单总结一下本章开头提的两个问题。

问题一:PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?

答:PROPAGATE 状态用在 setHeadAndPropagate。当头节点状态被设为 PROPAGATE 后,后继节点成为新的头结点后。若 propagate > 0 条件不成立,则根据条件 h.waitStatus < 0 成立与否,来决定是否唤醒后继节点,即向后传播唤醒动作。

问题二:引入 PROPAGATE 状态是为了解决什么问题?

答:引入 PROPAGATE 状态是为了解决 并发释放信号量 所导致部分请求信号量的线程无法被唤醒的问题。

声明:

本章内容是在博友 活在夢裡 的文章 AbstractQueuedSynchronizer源码解读 基础上,进行了一定的补充说明。本章所参考的观点已经过原作者同意,为避免抄袭嫌疑,特此声明。

6.总结

到这里,本文就差不多结束了。如果大家从头看到尾,到这里就可以放松一下了。写到这里,我也可以放松一下了。这篇文章总共花费了我12天的空闲时间,确实不容易。本来我只打算讲一下基本原理,但知道后来看到本文多次推荐的那篇文章。那篇文章给我的第一感觉是,作者很厉害。第二感觉是,我也要写出一篇较为深入的 AQS 分析文章。虽然写出来也不能怎么样,水平也不会因此提高多少,也不会去造个类似的轮子。但是写完后,确实感觉很有成就感。本文的最后,来说一下如何学习 AQS 原理。AQS 的大致原理不是很难理解,所以一开始不建议纠结细节,应该先弄懂它的大致原理。在此基础上,再去分析一些细节,分析细节时,要从多线程的角度去考虑。比如,有点地方 CAS 失败后要重试,有的不用重试。总体来说 AQS 的大致原理容易理解,细节部分比较复杂。很多细节要在脑子里演绎一遍,好好思考才能想通,有点烧脑。另外因为文章篇幅的问题,关于 AQS ConditionObject 部分的分析将会放在下一篇文章中进行。

最后,再向 AQS 的作者 Doug Lea 致以崇高的敬意。仅尽力弄懂 AQS 的原理都很难了,可想而知,实现 AQS 的难度有多大。

限于本人的能力,加之深入分析 AQS 本身就比较有难度。所以文中难免会有错误出现,如果不慎翻车,请见谅。也欢迎在评论区指明这些错误,感谢。

参考

  • AbstractQueuedSynchronizer源码解读 - 活在夢裡
  • 《Java并发编程的艺术》 - 方腾飞 / 魏鹏 / 程晓明
原文  http://www.coolblog.xyz/2018/05/01/AbstractQueuedSynchronizer-原理分析-独占-共享模式/
正文到此结束
Loading...