说起 JUC,我们常常会想起其中的线程池(ExecutorService)。然而,我们今天来看看另一个核心模块 AQS。
AQS 是 AbstractQueuedSynchronizer 的简称,在 JUC 中作为各种同步器的基石。举个例子,常见的 ReentrantLock 就是由它实现的。
我们知道,java 有一个关键字 synchronized 来给一段代码加锁,可是这是 JVM 层面的事情。那么问题来了,如何在 java 代码层面来实现模拟一个锁?
即实现这样一个接口:
package java.util.concurrent.locks; public interface Lock { void lock(); void unlock(); } 复制代码
一个简单的想法是:让所有线程去竞争一个变量owner,确保只有一个线程成功,并设置自己为owner,其他线程陷入死循环等待。这便是所谓的 自旋锁 。
一个简单的代码实现:
import java.util.concurrent.atomic.AtomicReference; public class SpinLock { private AtomicReference<Thread> owner = new AtomicReference<Thread>(); public void lock() { Thread currentThread = Thread.currentThread(); // 如果锁未被占用,则设置当前线程为锁的拥有者 while (!owner.compareAndSet(null, currentThread)) { } } public void unlock() { Thread currentThread = Thread.currentThread(); // 只有锁的拥有者才能释放锁 owner.compareAndSet(currentThread, null); } } 复制代码
扯这个自旋锁,主要是为了引出 AQS 背后的算法 CLH锁
。
关于 CLH锁
更多细节可以参考篇文章:
自旋锁、排队自旋锁、MCS锁、CLH锁
CLH锁的思想,简单的说就是:一群人去ATM取钱,头一个人拿到锁,在里面用银行卡取钱,其余的人在后面 排队等待 ;前一个人取完钱出来, 唤醒 下一个人进去取钱。
关键部分翻译成代码就是:
AQS 使用节点为 Node 的双向链表作为 同步队列 。拿到锁的线程可以继续执行代码,没拿到的线程就进入这个队列排队。
public abstract class AbstractQueuedSynchronizer ... { // 队列头 private transient volatile Node head; // 队列尾 private transient volatile Node tail; static final class Node { /** 共享模式,可用于实现 CountDownLatch */ static final Node SHARED = new Node(); /** 独占模式,可用于实现 ReentrantLock */ static final Node EXCLUSIVE = null; /** 取消 */ static final int CANCELLED = 1; /** 意味着它的后继节点的线程在排队,等待被唤醒 */ static final int SIGNAL = -1; /** 等待在条件上(与Condition相关,暂不解释) */ static final int CONDITION = -2; /** * 与共享模式相关,暂不解释 */ static final int PROPAGATE = -3; // 可取值:CANCELLED, 0, SIGNAL, CONDITION, PROPAGATE volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; } } 复制代码
这个队列大体上长这样:图片来源
条件队列是为了支持 Lock.newCondition() 这个功能,暂时不care,先跳过。
AQS 支持独占锁(Exclusive)和共享锁(Share)两种模式:
这边我们只看独占模式,它对外提供一套 api:
简单看一眼怎么用的 (ReentrantLock 的例子):
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {...} public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); } } 复制代码
可以看到,AQS 封装了排队、阻塞、唤醒之类的操作,使得实现一个锁变的如此简洁。
获取资源
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 复制代码
这个函数很短,其中 tryAcquire(int) 为模板方法,留给子类实现。类似 Activity.onCreate()。
根据 tryAcquire(arg) 的结果,分两种情况:
这一步把当前线程(Thread.currentThread())作为一个Node节点,加入同步队列的尾部,并标记为独占模式。
当然,加入队列这个动作,要保证 线程安全 。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 1处尝试更快地 enq(), 成功的话直接 return。失败的话, 在2处退化为完整版的 enq(),相对更慢些 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 1 pred.next = node; return node; } } enq(node); // 2 return node; } private Node enq(final Node node) { // 神奇的死循环 + CAS for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
可以看到,这边有一个 死循环 + CAS 的神奇操作,这是非阻塞算法的经典操作,可自行查阅相关资料。简单的说,非阻塞算法就是在多线程的情况下,不加锁同时保证某个变量(本例中为双向链表)的线程安全,而且通常比 synchronized 的效率要高。
这个函数主要做两件事:
结合这张图来看,每次出队完需要确保 head 始终指向占用资源的线程:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 这又是一个死循环 + CAS,这次CAS比较隐蔽,在 shouldParkAfterFailedAcquire()里边 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 排在队首的后面,看看能不能获得锁,成功的话自己变成队首 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 这里失败会做一些回滚操作,不分析 if (failed) cancelAcquire(node); } } 复制代码
这边的 interrupted 主要是保证这样一个功能。线程在排队的时候不响应中断,直到出来以后,如果等待的过程中被中断过,作为弥补,立即相应中断(即调用selfInterrupt())。
查看prev的waitStatus,看是不是需要阻塞。可以预见的是,经过几次死循环,全部都会变成SIGNAL状态。之后全部陷入阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 查看前驱节点的状态 if (ws == Node.SIGNAL) // SIGNAL: 可以安全的阻塞 return true; if (ws > 0) { // CANCEL: 取消排队的节点,直接从队列中清除。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 0 or PROPAGATE: 需要变成 SIGNAL,但不能立即阻塞,需要重走外层的死循环二次确认。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
值得一提的是,阻塞和唤醒没有使用常说的 wait()/notify(),而是使用了 LockSupport.park()/unpark()。这应该是出于效率上的考虑。
释放资源
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 唤醒next节点对应的线程,通常就是老二(直接后继)。 * 如果是null,或者是cancel状态(出现异常如线程遇到空指针挂掉了), * 那么跳过cancel节点,找到后继节点。 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒 node.next if (s != null) LockSupport.unpark(s.thread); } 复制代码
释放的逻辑比较简单。注意一点,对于 next 节点 unpark(),相当于在把 next 节点从 acquireQueued() 中的死循环中解放出来。
回到 ATM 的例子,相当于,他取完钱,轮到后一个人取钱了。这样逻辑全部都串起来了。