AbstractQueuedSynchronizer
是大师 Doug Lea 编写的一个并发编程类,位于 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的组成部分,他们中关于 “锁” 的部分与 AQS
息息相关。
借用一下源码中的说法, AbstractQueuedSynchronizer
基于一个 FIFO 队列
提供了一套阻塞锁和同步相关的实现。该类被设计成为很多同步容器 synchronizers
的底层实现,它使用了一个原子int private volatile int state;
来表示当前状态。当在 AQS
被 acquired
(获取资源) 或被 release
(释放资源)时,需要依据这个 state
来进行判断。所以子类需要定义方法来修改这个状态,该状态的含义由我们自由定制。(翻译的不好...)
我们来看一个最简单的例子,我们有一个类 Sync
继承了 AbstractQueuedSynchronizer
,并重写了其 tryAcquire
和 tryRelease
方法。实现非常简单,我们通过调用父类的 compareAndSetState()
以及 setState()
来完成, 简单来说(不是特别准确) ,就是 tryAcquire
返回 true
,代表获取锁成功,否则就会阻塞。而 tryRelease
则负责锁的释放。
在例子中:将 state
设置为 100
代表当前状态为无锁, 1
则代表已经有某个线程获取了该锁。当然这个 state
表达的含义是怎么样的,完全是我们定义的,实际上锁定或者无锁是 100
还是 200
还是 -100
,都没有什么关系。
/** * Created by Anur IjuoKaruKas on 2019/5/7 */ public class Mutex extends AbstractQueuedSynchronizer { public static class Sync extends AbstractQueuedSynchronizer { public Sync() { setState(100); // set the initial state, being unlocked. } @Override protected boolean tryAcquire(int ignore) { boolean result = compareAndSetState(100, 1); print("尝试获取锁" + (result ? "成功" : "失败")); return result; } @Override protected boolean tryRelease(int ignore) { setState(100); return true; } } private final Sync sync = new Sync(); public void lock() { sync.acquire(0); } public void unLock() { sync.release(0); } public static void main(String[] args) throws InterruptedException { Mutex mutex = new Mutex(); mutex.lock(); Thread thread = new Thread(() -> { print("调用 mutex.lock() 之前"); mutex.lock(); print("调用 mutex.lock() 之后"); }); thread.start(); print("main 线程 Sleep 之前"); Thread.sleep(5000); print("main 线程 Sleep 之后"); mutex.unLock(); } public static void print(String print) { System.out.println(String.format("时间 - %s/t/t%s/t/t%s", new Date(), Thread.currentThread(), print)); } } ========================================= 输出 时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] 尝试获取锁成功 时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] main 线程 Sleep 之前 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:24 CST 2019 Thread[main,5,main] main 线程 Sleep 之后 时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功 时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
我们可以看到,代码符合我们的预期:在 main 函数所在线程调用 mutex.unLock();
释放锁之前,子线程是一直阻塞的, 调用 mutex.lock() 之后
的日志输出发生在 main 线程 Sleep 之后
之后。
通过重写 tryAcquire
、 tryRelease
方法,以及调用 acquire
和 release
方法,我们很容易就实现了一个锁,当然这个锁有一堆问题... 我们只是通过这个小例子,来建立对 AQS
一个简单的了解。
看到这里,有些细心的小伙伴可能会想了,既然锁是由 tryAcquire
控制的,那和 state
又有什么关系呢? 我们完全可以定义一个自定义变量,比如 sign
, false
代表无锁, true
代表锁定,好像也可以实现这段逻辑啊?这个时候就需要引出我们神奇的 compareAndSet
, CAS
操作了。
前面说到,我们暂时认为 : tryAcquire
返回 true
,代表获取到锁,反之只要 tryAcquire
返回 flase
,线程就会被阻塞 (不准确,后面会细说)。实际上这里有一个 隐含条件,我们必须做到:
tryAcquire
成功,且在某个线程 tryAcquire
成功之后,并在其 release
释放锁之前,任何线程进行 tryAcquire
都将返回 false
。 下面这个例子我们简单使用一个自定义变量 sign
来实现 tryAcquire
,看看会发生什么:
private boolean sign; @Override protected boolean tryAcquire(int ignore) { boolean result = false; if (!sign) { sign = true; result = true; } print("尝试获取锁" + (result ? "成功" : "失败")); return result; } @Override protected boolean tryRelease(int ignore) { sign = false; return true; } ========================================= 输出 时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] 尝试获取锁成功 时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] main 线程 Sleep 之前 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:17 CST 2019 Thread[main,5,main] main 线程 Sleep 之后 时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功 时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
看起来好像没问题,在这个 demo 中也得到了和第一个 DEMO 一样的预期的结果。然而事情并没有那么简单,新写的这个 tryAcquire
实现是一个 "CompareThenSet"
操作,在并发的情况下,会出现不可预期的情况
sign
为 false
sign
为 false
sign
修改为 true
,问题就来了。 我们改一下 Main 方法,我们使用 100 个线程并发执行 mutex.lock();
获取锁成功则会输出语句 print("获取锁成功");
,执行,发现,竟然有两个线程同时获取到了锁。 有两个线程同时将 sign
修改为了 true
。
public static void main(String[] args) throws InterruptedException { Mutex mutex = new Mutex(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10000; i++) { threads.add(new Thread(() -> { mutex.lock(); print("获取锁成功"); })); } ExecutorService executorService = Executors.newFixedThreadPool(100); threads.forEach(executorService::submit); Thread.sleep(1000); }
如果我们使用 AQS
帮我们写好的 compareAndSetState
则没有这个问题。
在 Java9
之前,底层实现是调用 unsafe
包的 compareAndSwapInt
来实现的:
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
而在 Java9
之后,则是使用 VarHandle
来实现 VarHandle
是 unSafe
的一个替代方案,本文不多赘述,后面会有文章讲到这个 ~ 。
// VarHandle mechanics private static final VarHandle STATE; --------------------------------------------- protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); }
这里简单的说一下 CAS
,即 CompareAndSwap
。 CAS
可原子性地比较并替换一个值,乐观锁中一个典型的实现便是使用 CAS
来完成的。对并发编程有所了解的小伙伴应该都知道 CAS
,一般情况下, Compare(比较)
和 Swap(交换)
至少是两个原子操作(实际上是更多个原子操作,主要看编译成多少条机器码)。 而 CAS
则保证了 Compare
和 Swap
为一个原子操作。
上文说到, 我们暂时认为 : tryAcquire
返回 true
,代表获取到锁,反之只要 tryAcquire
返回 flase
,线程就会被阻塞。
AQS
当然没有这么简单,但我们可以先看看加锁时调用的 acquire
方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们发现, tryAcquire
只是第一重判断,如果 tryAcquire
失败,紧接着还有另一个 核心逻辑 acquireQueued
。在简介里,我们说, AQS
除了使用一个 原子state
来作为状态判断以外,还有一个 FIFO 队列
,此队列就和 acquireQueued
方法息息相关。另外, AQS
所控制的资源访问,还可以是共享的,或者独占的( addWaiter
参数 Node.EXCLUSIVE
)。
以下的分析我们以一个简单的 独占式非公平 AQS
实现: java.util.concurrent.locks.ReentrantLock.NonfairSync
来深入解析。独占式很好理解,大部分的锁实现都只允许一个线程在同一时间获取到锁定的资源。
先看看 NonfairSync
的 tryAcuire
是怎么实现及优化的。首先, NonfairSync
中将 state == 0
定义为无锁状态。
state == 0
),再调用 CAS
。这实际上对性能是一个很好的优化,假设当前取 state
不为 0
,实际上 CompareAndSetState
成功的概率也很小,这也可以避免同一时间内,过多的线程去并发修改 state
这个状态。 CAS
操作,会发生什么?毫无疑问是 CAS
失败,这会间接导致死锁。这里我们可以看到,重入以后,有一个 int nextc = c + acquires;
操作,这是方便我们记录到底套了几层锁用的,如果没有这个机制,我们将 无法精确的控制加锁和解锁的层级 ,难免会出现一些意料之外的情况。简单来说: lock
几次,就要 unLock
几次。当然我们也可以做到 aquire
多次,一次性 release
掉,或者反过来,取决于怎么我们实现 tryAquire
和 tryRlease
方法。 CAS
操作,返回 true
即可。 @ReservedStackAccess final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 避免过多的线程竞争 CAS 操作 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current);// 如果 CAS 操作成功,则将当前线程保存起来,重入和解锁时用于判断。 return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 重入优化,每次加锁相当于 `state++` if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; // 偏向优化 } return false; } @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 每次解锁相当于 `state--` 直到 state == 0 ,代表可释放锁了 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
①、addWaiter阶段
如果 tryAquire
失败,就会进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
, addWaiter
方法创建了一个新的 Node
实例, Node
实例中主要保存了当前线程信息,并将 nextWaiter
赋值为 Node.EXCLUSIVE
, 这个 nextWaiter
后面再谈,它主要用于线程调度、以及独占模式、共享模式的区分,我们可以先不管它。
操作比较简单,原理是将 node
塞入双向链表尾端,也就是前面提到的 FIFO队列
。就是利用 CAS
操作将新创建的、带有本线程信息的 node
设置为双向链表新的 tail
,并且修改两者的 ‘指针’ prev
和 next
。
/** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); }
Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); // 初始化双向链表,就是创建一个新的空 node,并且头尾都是此 node。 // 这个 node 除了拿来标记链表从哪里开始,没有什么别的意义。 } }
②、acquireQueued阶段(自旋)
入队成功后,进入 acquireQueued
方法, 抛开线程被 interrupt
的情况 , acquireQueued
的代码其实也很简单,我们不看 interrupt
相关逻辑,其实逻辑还是很简单的。这是一个 无限循环(或者说自旋) ,只要没有 tryAcquire
成功,就会一直循环下去,逻辑如下:
FIFO
队列头,则进行一次 tryAquire
,如果成功,则跳出循环。 parkAndCheckInterrupt
便是阻塞直到被唤醒(或者被 interrupt
,暂时先不考虑这个情况)。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
tryAcquire()
和 parkAndCheckInterrupt()
都很好理解,前者就是去尝试一下获取锁定资源,看能否成功。后者则是阻塞直到被唤醒。
③、阻塞阶段
我们先说说 shouldParkAfterFailedAcquire
,这个判断是一个挺有意思的设计,后续文章会细说,它和线程调度、取消获取锁等相关。因为在获取锁定资源和释放锁定资源的过程中,实际上我们只需要用到两个状态,一个是初始状态 pred.waitStatus == 0
,另一个是 pred.waitStatus == SIGNAL == -1
。
代码中我们可以很容易看出,在 CAS
将 prev
节点的 waitStatus
设置为 SIGNAL : -1
之前,都将返回 false
,如果设置成功,下一次自旋进入该方法就是 true
了,也就是说,会进入 parkAndCheckInterrupt()
方法,阻塞直到被唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
自旋阶段图解:
阻塞阶段图解:
①、唤醒 FIFO 的下一个节点
阻塞直到唤醒这个逻辑在锁定资源、释放资源 这两个阶段来看十分简单,最后我们来看看 release
做了什么, release
除了调用了我们自己实现的 tryRelease
之外,其实关键的就是这个 unparkSuccessor
。
tryRelease
上面也说过了,就是改改原子 state
,这里不多赘述。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
代码中可以看出,当 FIFO
队列不为空且头结点的 waitStatu
被修改过,就会进入 unparkSuccessor
, unparkSuccessor
传入了当前 FIFO
的队列头,逻辑如下:
waitStatus
为负(可能为 SIGNAL
、 CONDITION
或者 PROPAGATE
),我们这里简单先看成只有 SIGNAL
状态,则 CAS
将其设置为 0
。其他几个状态我们后面会说到。 !(s == null || s.waitStatus > 0)
,也就是说 node.next
的 waitStatus <= 0
,则简单的直接将其唤醒: LockSupport.unpark(s.thread);
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
②、被唤醒后
被唤醒的线程当然不是直接获得了锁,它还是会继续 acquireQueue
进行自旋,逻辑还是和之前一样,避免小伙伴往上翻代码,这里贴了一份如果 prev
是头结点,如果 tryAcquire
成功,我们看到其实很简单,只是将自己设为头部即可。
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
这篇文章只是简单的说说 AQS
的正向获取资源,释放资源流程,后续会继续解析 wait
、 notify
、 condition
等基于 AQS
的线程调度解析 ~~ 以及各个锁是如何实现 AQS
的 ~~
文章皆是基于源码一步步分析,没有参考过多资料,如有错误, 请指出!!
另外欢迎来 Q 群讨论技术相关(目前基本没人) [左二维码] ~
如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码] ~
Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example
另外小伙伴可以思考一下:
ThreadB
被唤醒后,继续自旋时,另一个线程 ThreadC
tryAcquire
成功了会发生什么。 CAS
操作,都有 ABA
问题,如果说修改 waitStatus
发生了 ABA
问题,会发生什么?