转载

浅析AQS(AbstractQueueSynchronizer)

概念

AQS是一个实现同步锁和基于先进先出队列相关的同步器的框架,它的目的是为大多数依赖单个原子变量来表示锁状态的同步器提供提供基础框架,也就是说这个框架已经设计出一套完备安全的同步机制,当你需要实现你的自定义规则的同步器的话,只需要继承并重写它的一些方法(比如tryAcquire、tryRelease、tryShareAcquire、tryShareRelease、isHeldExclusively等)来实现你的同步策略,那么就可以在代码中通过调用acquire,release方法来实现自定义的同步策略。

我认为它设计的目的应该是为了让开发者不需要考虑太多的线程调度、唤醒阻塞等相关实现,而是着重考虑具体的同步策略:什么条件能获取到锁、什么条件下会进行阻塞状态、是共享锁还是排他锁、允许多少线程同时进行、是否公平等。

浅析AQS(AbstractQueueSynchronizer)

具体实现

它的实现主要是基于CAS和原子变量,通过CAS更新原子变量来实现加锁和释放锁的操作。接下来说说它的一些比较重要的实现。我们从4个获取锁和释放锁的方法和包装线程的Node内部类出发,来理清AQS的具体实现逻辑。

这里我们主要讲解一下不可中断(这里的不可中断主要得是在unsafe.park过程不可中断)的同步机制的实现方式,带超时和可中断的其实大同小异,可以自行通过阅读源码进行理解。

我们知道AQS中的同步锁可以分为排他锁和共享锁,对于排他锁,它是通过acquire和release这两个获取锁和释放锁的方法来实现互斥同步;而对于共享锁来说,它是通过aquireShared和releaseShared这两个获取锁和释放锁的方法来实现互斥同步。

ps:下面的代码可能会比较长,在源码的基础上加了自己的一些理解,请耐心阅读

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    
    // -------------------------- 『排他锁』 --------------------------

        // 获取排他锁
    public final void acquire(int arg) {
            // 通过tryAquire来尝试获取排他锁,如果获取失败,则进行等待队列。
            // 注意:tryAquire是放在子类中去实现的。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // 这里我们顺便解释一下排他锁的进入等待队列的实现的。
    
    // 定义为final,说明不可重写,这一块的逻辑由AQS框架来帮你实现
        // 这里的node已经完成了入队操作(不理解的可以看addWaiter这个方法的实现),接下来就是自旋获取锁的过程
        final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋操作,这里可能有人会有疑问为什么使用自旋而不是一个简单的if判断,其实是由于在非公平的情况,队列头的node通过unsafe.unpark进行唤醒后,可能会被新进来的线程抢占了锁,所以这里需要通过自旋来进行重试
            for (;;) {
                    // 获取前一个节点
                final Node p = node.predecessor();
                // 判前一个节点为队列头结点,且能否通过tryAcuire也成功获取到锁
                if (p == head && tryAcquire(arg)) {
                        // 注意这里已经通过tryAquire获取到锁了,那么这里的逻辑其实就是处理一些额外的事情(比如协助gc,以及检查当前线程是否中断)后直接返回。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 上边的条件如果不符合,将会执行下面的shouldParkAfterFailedAcquire方法通过unsafe.park来进行阻塞等待。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
        
        // 等待队列为一个链表队列,这一步的目的是将获取锁的线程封装为一个node节点。
        private Node addWaiter(Node mode) {
                // 将当前获取锁的线程和锁的类型(注意上下文,这里为『排他锁』类型)封装为一个node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
                // 如果队列尾部结点不为null,将当前节点的前指针指向尾结点
            node.prev = pred;
            // 通过CAS来重新设置新的尾部结点,返回成功说明入队成功,直接返回true;如果返回false,说明当前有其他结点更新了尾部结点,那么将会执行下边的enq方法入队
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    
    private Node enq(final Node node) {
            // 自旋,直到成功入队才会退出。
        for (;;) {
            Node t = tail;
            if (t == null) {
                    // 尾部为null,说明当前队列内的所有等待锁的线程都成功获取到锁,这时候需要重新初始化队列,设置队列头和尾
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                    // 标准的CAS入队操作
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
        
        
        // 定义为final,说明不可重写,这一块的逻辑由AQS框架来帮你实现
        // 这里的node已经完成了入队操作,接下来就是自旋获取锁的过程
        final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋操作,这里可能有人会有疑问为什么使用自旋而不是一个简单的if判断,其实是由于在非公平的情况,队列头的node通过unsafe.unpark进行唤醒后,可能会被新进来的线程抢占了锁,所以这里需要通过自旋来进行重试
            for (;;) {
                    // 获取前一个节点
                final Node p = node.predecessor();
                // 判前一个节点为队列头结点,且能否通过tryAcuire也成功获取到锁
                if (p == head && tryAcquire(arg)) {
                        // 注意这里已经通过tryAquire获取到锁了,那么这里的逻辑其实就是处理一些额外的事情(比如协助gc,以及检查当前线程是否中断)后直接返回。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 上边的条件如果不符合,将会执行下面的shouldParkAfterFailedAcquire方法通过unsafe.park来进行阻塞等待。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
      // 释放排他锁
    public final boolean release(int arg) {
            // 尝试释放排他锁,如果失败直接返回false
            // 注意:tryRelease同样是放在子类去实现的。
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 联系上下文可知这个node为等待队列的头结点,唤醒存在的下一个成功获取锁的线程。
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
                
                // 注意头结点为已经获取到锁的node,这里应该是头结点释放后,需要将头结点的状态设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
            
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
           // 由于锁的状态可能被改变或者节点可能为null,因此需要从尾部结点向前遍历找到最前一个没有被取消的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
       // -------------------------- 『共享锁』 --------------------------
    
    // 获取共享锁
    public final void acquireShared(int arg) {
            // 尝试获取共享锁,如果返回值大于0,表示已经成功获取到锁,否则通过doAcquireShared进行排队
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    private void doAcquireShared(int arg) {
            // 入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                        // 这里tryAcquireShared的返回值如果大于等于0,说明成功获取到锁,返回值r为当前共享锁的剩余可进入数。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                            // 重置队列头和可进入数后,直接返回
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 如果上边获取锁失败,则通过park进行等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // 释放共享锁
    public final boolean releaseShared(int arg) {
            // 尝试释放锁,如果失败直接返回false,成功的话将会doReleaseShared释放共享锁
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    private void doReleaseShared() {
            // 自旋
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                        // 对于已经唤醒的头结点,重置状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                       // 唤醒下一个成功获得到锁的结点
                    unparkSuccessor(h);
                }
                // 预防新结点的加入,保证它能够正常获取到锁
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    
    
    
    // -------------------------- 『Node数据结构』 --------------------------
    
    // 内部类,通过Node类对线程进行包装
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    
    ......

实现案例

ReentrantLock中的同步锁

对于不同的锁实现同步策略,java.util.concurrent包中已经提供有很多具体的实现案例,比如比较经典的ReentrantLock中的Sync类,它继承于AbstractQueuedSynchronized类,是一个典型的排他锁。当然,对于阻塞线程能否顺序的获取锁,它又有两种实现FairSync和NonFairSync,即公平锁和非公平锁,我们具体看一下它的代码实现。

// 抽象同步锁
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        // 获取锁操作.
        // 交由子类来实现,主要原因是获取锁操作会影响到能否顺序获取到锁,因此定义为抽象函数,由子类来进行具体的逻辑控制。
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                    // 这里同样是无视等待队列,进行尝试插队
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

                // 对于公平和非公平锁,它们的释放锁操作都是相同的,即state-1
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        ......
    }
    
    // 非公平锁
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        
        final void lock() {
                        // 非公平性,主要体现在这行代码,在非公平锁的实现下,不会判断当前等待队伍是否有线程在等待锁,而是直接尝试通过CAS来获取锁,这里可能无视等待队列的线程直接抢夺锁,因此它是非公平的。
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                       // 抢夺失败,通过acquire重新尝试获取锁
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                    // 公平性,在尝试获取锁之前会先判断队列是否有线程在等待,如果有,则当前线程会进行等待队列等待调度。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

CountDownLatch的同步锁

CountDownLatch的同步锁结合CountDownLatch的countDown和await这两个方法来看可能会更容易理解,请读者自行翻阅源码。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
                // 当且仅当state为0时,才返回正数
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // 将共享锁的锁数进行减1
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }    }
        }
    }
原文  https://segmentfault.com/a/1190000022196341
正文到此结束
Loading...