AQS可以说是JAVA源码中必读源码之一。同时它也是JAVA大厂面试的高频知识点之一。认识并了解它,JAVA初中升高级工程师必备知识点之一。 AQS是AbstractQueuedSynchronizer的简称,它也是JUC包下众多非原生锁实现的核心。
AQS是基于CLH队列算法改进实现的锁机制。大体逻辑是AQS内部有一个链型队列,队列结点类是AQS的一个内部类Node,形成一个类似如下Sync Queue(记住这个名词)
可以看出,一个Node除了前后结点的索引外,还维护了一个Thread对象,一个int的waitStatus。 Thread对象就是处于竞争队列中的线程对象本身。 waitStatus表示当前竞争结点的状态,这里暂且忽略掉。
处于队首的,即Head所指向的结点,即为获取到锁的结点。释放锁即为出队,后续结点则成为队首,即获取到锁
Tips:这里帮大家理解一个事情,每一个ReentrantLock实例都有且只有一个AQS实例,一个AQS实例维护一个Sync Queue。所以说,当我们的业务代码中的多个线程对同一个ReentrantLock实例进行锁竞争操作时,其实际就是对同一个Sync Queue的队列进行入队、出队操作。
我们在用ReentrantLock时,代码通常如下:
ReentrantLock lock = new ReentrantLock(); Runnable runnable = new Runnable() { public void run() { lock.lock(); Sys.out(Thread.currentThread().name() + "抢到锁"); lock.unlock(); } }; Thread t1 = new Thread(runnable); Thread t2 = new Thread(runnable); t2.start(); t1.start(); 复制代码
可见线程t1,t2竞争lock这个锁。 lock.lock()
抢锁 lock.unlock()
释放锁 来看入口函数 lock
public void lock() { sync.lock(); } 复制代码
sync
是 ReentrantLock
内的一个继承了 AQS
的抽象类
abstract static class Sync extends AbstractQueuedSynchronizer { } 复制代码
抽象类的具体实现是另两个内部类 NonFairSync
FairSync
,分别代表非公平锁、公平锁。
我们系统来看下继承图
ReentrantLock中的sync实例,默认是在构造函数中初始化的,
public ReentrantLock() { sync = new NonfairSync(); } 复制代码
那我们就看默认的 NonFairSync
的实现逻辑。
当我们调用 ReentrantLock.lock()
时,直接调用到的是 NonFairSync.lock()
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 复制代码
compareAndSetState(0, 1)
通过CAS(CAS是啥,这里不讲,参考乐观锁。具体Google吧)方式,设置AQS的 int state
字段为1。AQS内部就是通过这个 state
是否为0来判断当前锁是否已经被线程获取到。 返回 true
,则说明获取锁成功,设置当前锁的独占线程 setExclusiveOwnerThreaThread.currentThread());
否则, acquire(1)
尝试获d(取锁。这个方法会自旋、阻塞,一直到获取锁成功为止。这里,传进去的参数1,就参考乐观锁的版本字段,同时,这里,它还记录了可重入锁重复获取到锁的次数。只有释放同样次数才能最终释放锁。
具体看AQS的 acquire()
的逻辑
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
要注意,这个方法是忽略线程中断的。 先看 tryAcquire(arg)
,这个方法是AQS留给子实现类的口子,具体实现看 NonFairSync
,它的实现里直接调用了 Sync.nonfairTryAcquire()
,
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 复制代码
可以看到方法内,先是尝试获取 state = 0
时的初始锁,如果失败,判断当前锁是否是被当前线程获取,是的话,将 acquire
时传入的参数累加到 state
字段上。在这里,这个字段就是用来记录重复获取锁的次数。 获取失败则返回 false
回到 acquire
在获取失败,返回 false
后,才会继续调用 &&
右边的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法。 先看 addWaiter
private Node addWaiter(Node mode) { 1: Node node = new Node(Thread.currentThread(), mode); 2: // Try the fast path of enq; backup to full enq on failure 3: Node pred = tail; 4: if (pred != null) { 5: node.prev = pred; 6: if (compareAndSetTail(pred, node)) { 7: pred.next = node; 8: return node; 9: } 10: } 11: enq(node); 12: return node; 13: } 复制代码
利用传进来的 Node.EXCLUSIVE
表示的排斥锁参数以及当前线程实例初始化新 Node
,第 4-10
行代码是在 Sync Queue
队列内有竞争线程时进入。为空时会走到 enq(node)
,这里是在竞争为空时将竞争线程入队的操作。 然后返回当前竞争的线程 node
此时 Sync Queue
如图
我们假设node_1是已经获取到锁的结点,node_2即为我们当前操作的结点。
返回 acquire
接着看 addWaiter
返回的 node
直接作为参数给了 acquireQueued
,这个方法就是主要的 node
竞争锁方法。
final boolean acquireQueued(final Node node, int arg) { // 获取锁成功失败标记 boolean failed = true; try { // 当前竞争线程的中断标记 boolean interrupted = false; // 自旋竞争锁,竞争不到锁的话,线程又没有中断 // 则一直在这儿循环 for (;;) { // 获取当前线程的前驱结点 final Node p = node.predecessor(); // 如果前驱结点是头结点,则尝试去tryAcquire // 这个逻辑我们之前看过,当前线程未获取到锁的情况下 // 在AQS的state字段不为0时,则返回false if (p == head && tryAcquire(arg)) { // 进入到这里,说明要不就是当前线程在重复获取 // 要不就是前边的结点释放锁,state 归0,这里获取到 setHead(node); p.next = null; // help GC // 标识竞争锁成功 failed = false; // 这个方法不响应线程中断,但是会返回线程在竞争锁过程中 // 中断标记返回 return interrupted; } // 若获取锁失败,则到这里。这里的逻辑主要在If判断 // 的两个方法中,用来将当前线程挂起的,具体逻辑 // 看下面 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
park就是停下的意思。所以这个方法从名字上也比较好理解,就是 挂起线程并且检查线程的中断状态。这里要注意, LockSupport.part(this)
方法是会在线程中断时自动唤醒的
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
这个方法传入了当前竞争结点及其前驱结点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱结点的等待状态。这里只需要记住,我们这里考虑的是 // 非共享锁、非公平锁的AQS。所以,只需要确保当前竞争 // 结点的前驱结点状态为SIGNAL就好。剩下的状态, // 与我们此时研究的情况而言没有用 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 如果前驱结点的status为0,则将其改为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
从 shouldParkAfterFailedAcquire
可以看出来,在确保前驱结点status为 SIGNAL
时,就可以放心的去 unsafe.park()
了。之所以要为 SIGNAL
,是因为这个状态含义为:当前结点OVER时要唤醒后继结点。
所以不难推出,我们的结点现在就 park
在那了。等他前驱结点释放锁,或者自己interrupt来唤醒,但因为这个方法是无视中断的,所以即使interrupt了,只是设置了一个标记位,但仍然在循环中。
这里假设前驱结点获取锁后释放,则当前结点在 parkAndCheckInterrupt()
方法中被唤醒,而后再次循环 for(;;)
,这次会在第一个 if
中就进入,当前结点获取到锁,然后重置 Head
指向的结点等,返回当前线程的中断标记。
返回 acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
if
中的 selfInterrupt()
方法只是去重新设置当前线程的中断标记位。这是因为获取线程中断状态的方法,在返回状态字段的同时,也会重置字段,所以需要标记后重新设置相应的值。
下面我们看下AQS释放锁的接口方法
public void unlock() { sync.release(1); } 复制代码
追进去
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
tryRelease
是在 NonFairLock
中的实现的,如果是释放成功,则在 Head
存在并且状态不为0(其实可以理解为值为 SIGNAL
时)去唤醒 Head
的后继结点。
下面看下 NonFairLock
的 tryRelease
方法的实现
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } 复制代码
可以看出,这个 tryRelease
其实就是去判断下是不是当前线程拥有锁,是的话,判断下当前的释放锁是否完全释放,因为锁可以重复获取,完全释放的话,就设置 state
为0,代表AQS的锁已经被释放了。