AQS
全称 AbstractQueuedSynchronizer
是jdk中一个非常重要的方法,这是一个jdk的同步器的实现,JUC中的很多类例如 ReentrantLock
等的实现都依赖于AQS。
AQS的同步实现方式依赖于CAS,那么CAS究竟是什么呢?
CAS
全称 Compare And Swap
,比较然后交换。JAVA中CAS的实现位于Unsafe类下。CAS本质上属于乐观锁,它的实现原理如下:当你想要修改位于某个内存地址 R
的值的时候,会带入两个值,一个是在地址上的旧值 A
,一个是想要修改的新值 B
。比较内存地址上的值与 A
,如果相同则将 B
的值更新入内存地址 R
中。
CAS
有优点也有缺点,但是在本文中不详细阐述了,大家可以自行了解。在这里只是介绍下CAS是什么,为我们理解AQS的实现做好准备。
这个是AQS内部维护的FIFO链表的示意图,我们可以看出每个节点会维护一个prev和一个next指针来维护双向链表。除此之外addWaiter额外维护了一个单向链表用于Condition的操作。每个Node节点内部会封装一个线程,当线程争抢锁失败后会封装成Node加入到ASQ队列中去。
AQS内部维护了一个双向链表,链表的节点定义如下:
static final class Node { // 共享模式 static final Node SHARED = new Node(); // 独占模式 static final Node EXCLUSIVE = null; // 节点的状态值定义 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 节点状态,取值为以上定义的CANCELLED/SIGNAL/CONDITION/PROPAGATE以及0 volatile int waitStatus; // 先驱节点 volatile Node prev; // 后继节点 volatile Node next; // 节点线程 volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
链表插入的代码如下
private Node addWaiter(Node mode) { // 根据传入的模式(共享或者独占)以及当前线程创建新的节点 Node node = new Node(Thread.currentThread(), mode); // 获取等待队列的尾节点 Node pred = tail; // 如果尾节点不为空,即等待队列不为空,那么新加入的节点就直接加在尾部 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果等待队列空了,或者是CAS直接把新结点加在尾部失败,那么调用enq来加入节点 enq(node); return node; } private Node enq(final Node node) { for (;;) { // 首先申明临时变量赋值为尾巴节点 Node t = tail; // 判断尾巴节点是否为空 if (t == null) { // Must initialize // 如果为空,经过CAS,新创建一个节点为头节点,把头节点赋值给尾巴节点 if (compareAndSetHead(new Node())) tail = head; } else { // 如果不为空,把当前节点的先驱节点赋值为尾巴节点 node.prev = t; // CAS操作,将当前节点赋值给尾巴系欸但,将前尾巴节点的后继节点赋值为当前节点,至此,当前节点成为最新的尾巴节点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这里有几个需要注意的地方:
enq
内部是一个无限循环,是为了要保证CAS操作一定要成功,如果不成功就反复尝试直到成功为止。 addWaiter
方法中会有一次尝试直接把新节点放到尾部,这是一次尝试提高效率的操作。如果失败,再使用通用的 enq
方法来加入节点。 此外上述插入新节点的代码里就利用到的CAS在内部进行了一次封装,具体的代码如下:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }
AQS内部将CAS的代码再次进行了一层封装,使得它可以轻松调用于内部方法。
所谓独占模式,指的是同时只能有一个线程获取到同步器。例如可重入锁 ReentrantLock
就是一个AQS的独占模式的典型实现。
AQS的独占模式有两个核心方法:
获取同步器 acquire
:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
获取同步器的方法比较简单,调用 tryAcquire
来判断是否可以获取同步器,然后调用 acquireQueued
来将新加入的节点放入队列。然后我们来看下这两个方法的具体实现,首先是 tryAcquire
:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
我们可以看到 tryAcquire
并没有在AQS内部实现,而是由AQS的具体实现类根据自己的需求自行实现的。那么再来看 acquireQueued
:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取node的先驱节点 final Node p = node.predecessor(); // 如果是头节点那就去尝试着获取同步器 if (p == head && tryAcquire(arg)) { // 如果获取同步器成功那就重新设置头节点并且返回 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 检查获取同步器失败后的节点是否需要阻塞住(park) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 检查获取同步器失败后的节点是否需要阻塞住 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果节点状态为SIGNAL,那就需要把节点park住 return true; if (ws > 0) { // 如果ws大于0,意味着节点状态为CANCEL,那就不断循环向前,把所有的取消节点全部删除掉 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 走到这一步,那就只有几种情况: // 1. 状态为0,那他就是一个新加入的节点 // 2. 状态为PROPAGATE,那他就是一个共享模式的状态 // 无论是以上的那种情况走到这里,都需要尝试将节点状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
释放同步器 release
:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
释放同步器的方法主要是这样的:首先调用 tryRelease
来看看是否满足释放同步器的条件,如果满足条件,那么需要在释放前先将后继节点唤醒(如果有后继节点,并且后继节点状态不为0)。来看下具体代码:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 将当前节点的状态设置为0,允许失败 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取当前节点的后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 如果后继节点为空或者是状态大于0,即状态为CANCEL, // 则从尾部开始向前遍历,找到状态不为CANCEL的节点,设置为需要唤醒的节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
可以看到和获取同步器一样 tryRelease
也是需要AQS实现类自己实现的。在唤醒后继节点时有这么一个问题,为什么需要从尾部开始遍历而不是从前面开始遍历?这里我们可以去看一下插入节点的代码,即 enq
,里面插入节点是在尾部插入的,代码是这样的:
node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }
在CAS设置了尾节点的值之后,在 t.next
指向node之前,如果是从前开始遍历,遍历到这里就会发现节点为 null
,这个时候就会漏掉部分节点。反之如果从后往前遍历则没有这些问题。
所谓的共享模式,是指多个线程可以共享同一个同步器。
共享模式的两个核心方法:
获取同步器 acquireShared
:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
和独占模式一样 tryAcquireShared
同样需要子类自己实现。
// 不同于其他的方法,共享模式的tryAcquire方法返回的不是一个布尔值, // 而是一个int,根据代码中的注释我们可以得知,这个int值如果是小于0, // 说明获取失败,如果等于0说明获取成功,但是没有剩下的余量,如果大于0,则说明获取成功并且有余量 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
然后我们来看 doAcquireShared
:
private void doAcquireShared(int arg) { // 将node插入fifo队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取node的先驱节点 final Node p = node.predecessor(); if (p == head) { // 如果该节点为头节点,那么尝试获取共享同步器 int r = tryAcquireShared(arg); if (r >= 0) { // 如果获取成功并且留有余量,那么就设置为头节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 以下是和独占模式相同的实现 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
我们可以看到同步模式中和独占模式最大的不同是 setHeadAndPropagate
,我们看下具体实现:
private void setHeadAndPropagate(Node node, int propagate) { // 记录下旧的头节点 Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
我们可以看到 setHeadAndPropagate
中依然是调用了 setHead
方法,不同之处在于他会在设置完头节点后会根据条件释放后继节点。造成这点不同的原因就是因为在独占模式中,同时只能有一个线程占有同步器,所以在获取同步器的过程中不会出现需要唤醒其他线程的情况,但是在共享模式中,则可以有多个线程持有同步器。因此判断条件如下:
propagate > 0
: 当还剩有余量的时候 h == null || h.waitStatus < 0
: 当旧的头节点为空或者是状态为 SIGNAL
或者 PROPAGATE
的时候 (h = head) == null || h.waitStatus < 0
: 当新的头节点为空或者是状态为 SIGNAL
或者 PROPAGATE
的时候 在这几种情况下,我们需要尝试着唤醒后面的节点来尝试获取同步器。至于唤醒方法,会在 releaseShared
部分解析。
释放同步器 releaseShared
:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
接下来看一下 acquire
和 release
里面都有调用了的 doReleaseShared
:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }
其实这个方法不是很容易理解,这里进行下分解。首先我们观察可以注意到这是一个无限自旋的方法,唯一的一个跳出条件就是 if (h == head)
,也就是说,只有当h为头节点的时候才会跳出这个循环。然后我们来看下h的值是什么,我们可以看到h在循环的开始就被赋值为了头节点 Node h = head;
这是怎么回事呢?这是因为在共享模式下不止一个线程可以获取到同步器,因此一个线程进行释放后续节点的操作时,其他节点可能也在进行这步操作,也就是说,在这个过程中头节点可能会进行变动。因此我们需要保证在每个线程内部如果头结点的值和自己预期不同就一直循环下去。
然后我们来看这段代码:
if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); }
这段代码相对比较容易理解,如果一个节点的状态为 SIGNAL
那么将它的值通过CAS,变为0,并且不断的失败重试直到成功为止。然后释放它的后继节点。
比较令人费解的是下面这段代码:
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
这段代码究竟是干什么的呢?我们来一步一步分析。首先ws什么时候会是0,那只有一种情况,那就是这个节点是新加入的节点,也就是说队列的最后的节点成为了队列的头节点。那么什么时候这个CAS会失败呢?只有当ws不为0的时候,也就是说只有在前一刻判断ws为0,下一刻ws被其他的线程修改导致不为0的时候才会走到这步 continue;
之中。至于为什么会有这一步操作呢?回想一下当ws为0的时候什么操作会改变ws的值。没错就是当有新的节点加入的时候,会调用到的 shouldParkAfterFailedAcquire
,里面这段代码:
if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); }
在这种情况下确实是需要继续进行下一轮循环,然后唤醒后续的节点。确实是有道理,但是似乎优化的太细致了,不知道是不是我的理解不到位。
condition
是jdk中定义的一个条件协作的接口,常用于阻塞队列等情况下。AQS内部有一个对其的实现。
在AQS中定义了一个类 ConditionObject
实现了 condition
接口。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject() { } private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } }
在类中定义了两个Node,一个是 condition
队列的头节点,一个是尾节点。还有一个比较重要的内部方法也放到这里讲: addConditionWaiter
。这个方法和之前的队列中的 addWaiter
有点像,但是区别在于他插入并不是依赖Node中的 prev
和 next
,而是 nextWaiter
,并且在代码中我们可以发现和之前的双向队列不同, condition
的队列是一个单向队列。
condition
中的主要方法有两个:
await
:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 将节点添加到Condition的队列里面 Node node = addConditionWaiter(); // 将节点持有的同步器释放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 判断该节点是否已经在同步器的队列之中, // 如果在队列之中,那么就阻塞节点,等待signal或者signalAll来唤醒 // 当然如果在循环中发现interruptMode不为0,也跳出循环 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 首先获取同步器,如果获取成功,并且中断的模式非THROW_IE,则将interruptMode设置为REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除取消的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 中断处理 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
上面的代码加了一些注释,但是可能还是有点不清晰,所以逐步来进行讲解。首先这个 interruptMode
是什么东西呢?我们来看代码中的定义:
private static final int REINTERRUPT = 1; private static final int THROW_IE = -1;
THROW_IE
表示该中断需要抛出异常, REINTERRUPT
则不同。那么再来看代码查询节点是否在队列中出现过是怎么实现的呢:
final boolean isOnSyncQueue(Node node) { // 如果这个节点状态是CONDITION或者他先驱节点为空,则说明他不在队列内 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果一个节点有后继节点,则说明他在队列内 if (node.next != null) return true; return findNodeFromTail(node); } // 从尾部开始循环查找节点是否在队列内 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
然后我们来看下 (interruptMode = checkInterruptWhileWaiting(node)) != 0
这个条件在什么情况下成立,我们看下 checkInterruptWhileWaiting
的实现:
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
这个方法实现很明了,如果一个线程被中断了,那么就根据 transferAfterCancelledWait
方法的结果来判断中断的类型,否则返回0。那么循环跳出的条件就很明了了,要么是节点已经在同步器队列内了,要么是线程被中断了(当然前提是有signal方法唤醒了阻塞的线程)
signal
:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
这里涉及到了两个方法:一个是 isHeldExclusively
,这个方法是由子类实现的,判断当前是否是在独占资源,另一个是 doSignal
也就是 signal
实现的核心方法,代码如下:
// 节点传递直到找到非取消节点或者null private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // CAS修改node的值,如果修改失败返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // node的值成功修改成为0,将节点放入同步队列内 Node p = enq(node); int ws = p.waitStatus; // 如果节点已经取消了或者是将状态修改为SINGNAL失败,则唤醒这个节点的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
当然 Condition
内部除了这两个核心方法之外还有诸如 signalAll
, awaitNanos
等方法,实现大致相同,大家可以自行学习一下。
java锁的基础实现靠的是AQS,AQS的基础使用的是CAS。AQS内部的实现依赖于FIFO双向队列,Condition的实现依靠的是一个单向链表。在AQS内部使用了大量的自旋操作,因而会对性能有一定的挑战,因此设计者在内部进行了大量的优化。在本文中未能将这些优化尽数到来,大家可以自己找一份源码细细品味。