队列同步器 (AQS), 是用来构建锁或其他同步组件的基础框架,它通过使用 int 变量表示同步状态,通过内置的 FIFO 的队列完成资源获取的排队工作。(摘自《Java并发编程的艺术》)
我们知道获取同步状态有独占和共享两种模式,本文先针对独占模式进行分析。
private transient volatile Node head; 复制代码
head 同步队列头节点
private transient volatile Node tail; 复制代码
tail 同步队列尾节点
private volatile int state; 复制代码
state 同步状态值
volatile int waitStatus; 复制代码
waitStatus 节点的等待状态,可取值如下 :
volatile Node prev; 复制代码
prev 指向当前节点的前置节点
volatile Node next; 复制代码
next 指向当前节点的后置节点
volatile Thread thread; 复制代码
thread 节点对应的线程也是指当前获取锁失败的线程
Node nextWaiter; 复制代码
独占模式下获取同步状态, 既是当前只允许一个线程获取到同步状态
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
从 acquire 方法中我们可以大概猜测下,获取锁的过程如下:
下面具体看下各个阶段如何实现:
private Node addWaiter(Node mode) { // 绑定当前线程 创建 Node 节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 判断同步队列尾节点是否为空 if (pred != null) { // node 的前置节点指向队列尾部 node.prev = pred; // 将同步队列的 tail 移动指向 node if (compareAndSetTail(pred, node)) { // 将原同步队列的尾部后置节点指向 node pred.next = node; return node; } } // tail 为空说明同步队列还未初始化 // 此时调用 enq 完成队列的初始化及 node 入队 enq(node); return node; } 复制代码
private Node enq(final Node node) { // 轮询的方式执行 // 成功入队后退出 for (;;) { Node t = tail; if (t == null) { // Must initialize // 创建 Node, 并将 head 指向该节点 // 同时将 tail 指向该节点 // 完成队列的初始化 if (compareAndSetHead(new Node())) tail = head; } else { // node 的前置节点指向队列尾部 node.prev = t; // 将同步队列的 tail 移动指向 node if (compareAndSetTail(t, node)) { // 将原同步队列的尾部后置节点指向 node t.next = node; return t; } } } } 复制代码
从代码中可以看出通过 CAS 操作保证节点入队的有序安全,其入队过程中如下图所示:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // for (;;) { // 获取当前节点的前置节点 final Node p = node.predecessor(); // 判断前置节点是否为 head 头节点 // 若前置节点为 head 节点,则再次尝试获取同步状态 if (p == head && tryAcquire(arg)) { // 若获取同步状态成功 // 则将队列的 head 移动指向当前节点 setHead(node); // 将原头部节点的 next 指向为空,便于对象回收 p.next = null; // help GC failed = false; // 退出轮询过程 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 若前置节点状态为 -1 ,则说明后置节点 node 可以安全挂起了 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { // ws > 0 说明前置节点状态为 CANCELLED , 也就是说前置节点为无效节点 // 此时从前置节点开始向队列头节点方向寻找有效的前置节点 // 此操作也即是将 CANCELLED 节点从队列中移除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 若前置节点状态为初始状态 则将其状态设为 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
private final boolean parkAndCheckInterrupt() { // 将当前线程挂起 LockSupport.park(this); // 被唤醒后检查当前线程是否被挂起 return Thread.interrupted(); } 复制代码
从 acquireQueued 的实现可以看出,节点在入队后会采用轮询的方式(自旋)重复执行以下过程:
如下图所示:
接下来我们看看同步状态释放的实现。
释放同步状态
public final boolean release(int arg) { // 尝试释放同步状态 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒后置节点 unparkSuccessor(h); return true; } return false; } 复制代码
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) // 将 head 节点状态改为 0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取后置节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒后置节点上所阻塞的线程 LockSupport.unpark(s.thread); } 复制代码
从上述代码,我们可以明白释放同步状态的过程如下:
如下图所示(红色曲线表示节点自旋过程) :
独占模式下获取同步状态,不同于 acquire 方法,该方法对中断操作敏感; 也就是说当前线程在获取同步状态的过程中,若被中断则会抛出中断异常
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 检查线程是否被中断 // 中断则抛出中断异常由调用方处理 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } 复制代码
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 不同于 acquire 的操作,此处在唤醒后检查是否中断,若被中断直接抛出中断异常 throw new InterruptedException(); } } finally { if (failed) // 抛出中断异常后最终执行 cancelAcquire cancelAcquire(node); } } 复制代码
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. // 若当前节点为 tail 节点,则将 tail 移动指向 node 的前置节点 if (node == tail && compareAndSetTail(node, pred)) { // 同时将node 前置节点的 next 指向 null compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 当前节点位于队列中部 Node next = node.next; if (next != null && next.waitStatus <= 0) // 将前置节点的 next 指向 node 的后置节点 compareAndSetNext(pred, predNext, next); } else { // 若 node 的前置节点为 head 节点则唤醒 node 节点的后置节点 unparkSuccessor(node); } node.next = node; // help GC } } 复制代码
从 acquireInterruptibly 的实现可以看出,若线程在获取同步状态的过程中出现中断操作,则会将当前线程对应的同步队列等待节点从队列中移除并唤醒可获取同步状态的线程。
独占模式超时获取同步状态,该操作与acquireInterruptibly一样对中断操作敏感,不同在于超过等待时间若未获取到同步状态将会返回
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } 复制代码
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 计算等待到期时间 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) // 超时时间到期直接返回 return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 按指定时间挂起s LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码
同步队列中的节点在自旋获取同步状态的过程中,会将前置节点的状态由 0 初始状态改为 -1 (SIGNAL), 若是中断敏感的操作则会将状态由 0 改为 1 (CANCELLED)
同步队列中的节点在释放同步状态的过程中会将同步队列的 head 节点的状态改为 0, 也即是由 -1(SIGNAL) 变为 0;