AQS,即 AbstractQueuedSynchronizer类,是java并发工具类的底层实现基础,例如ReentrantLock、ReentrantReadWriteLock等都是基于AQS实现的,它将未获取到锁的线程封装在一个节点里面,不同的节点通过连接形成了一个 CHL 队列。
CHL 队列:它是一个非阻塞的 FIFO 队列,也就是说在并发条件下往此队列做插入或移除操作不会阻塞,它通过自旋锁和 CAS 保证节点插入和移除的原子性,实现无锁快速插入。
本文从我们平时使用ReentrantLock时用到的API开始,逐步剖析源码,进而理解AQS原理。
public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); lock.lock(); try { System.out.println("do work"); } finally { lock.unlock(); } } 复制代码
这段代码就是我们平时使用ReentrantLock最常见的流程,创建锁实例 -> 加锁 -> 解锁,下面我们来逐步查看它们的实现。
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { //省略 } /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync{ //省略 } /** * Sync object for fair locks */ static final class FairSync extends Sync{ //省略 } /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } //省略 } 复制代码
这里可以看出ReentrantLock有个内部类Sync继承了AbstractQueuedSynchronizer,并且有两个子类NonfairSync和FairSync,从名字可以看出一个是非公平锁,一个是公平锁。ReentrantLock的无参构造函数中对sync属性进行实例化,默认使用的是一个非公平锁的实现。公平锁和非公平锁的实现区别我们后续再说,接下来依照默认的NonfairSync来看锁的实现逻辑。
public class ReentrantLock{ public void lock() { sync.lock(); } } 复制代码
调用Reentrant.lock()方法,发现只有一行,调用的是sync.lock(),可以看出sync才是Reentrant的具体实现逻辑所在, 需要再看看NonfairSync.lock()里的实现。后面也会有很多方法是在内部类、子类和父类之间调用,所以为了方便理解,每一块都会把涉及到的相关代码放到一起来看。
public class ReentrantLock implements Lock, java.io.Serializable { static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } } public abstract class AbstractQueuedSynchronizer{ private volatile int state; protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } } 复制代码
state为锁占有状态值,含义如下:
state=0:表示没有线程获取锁;
state>1:表示锁已被线程获取,对于可重入锁,每重入一次 state+1,每释放一次 state-1.
lock方法首先通过CAS将state设置为1,如果设置成功则表示成功获取到锁,设置当前线程为锁占有线程,如果这一步失败,则会走到acquire(1)方法,这个方法是AQS类中的方法:
public abstract class AbstractQueuedSynchronizer { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } } 复制代码
首先调用tryAcquire尝试获取锁,这个方法是AQS中的抽象方法,由NonfairSync实现,NonfairSync中又去调用父类Sync.nonfairTryAcquire()方法:
abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //state为0,则尝试抢占锁 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //查看当前锁占有线程是不是当前线程 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 复制代码
nonfairTryAcquire方法首先还是获取state状态,如果为0先尝试设置为1进行抢占,成功则返回true,否则查看占有线程是不是当前线程,如果是,将 state+1并更新,返回true,如果都不是,则说明获取锁失败,返回false。这时会走到 AbstractQueuedSynchronizer.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法,这个方法的主要作用就是将当前线程挂起,然后生成节点放到同步队列中等待唤醒,我们来看源码实现:
public abstract class AbstractQueuedSynchronizer{ private transient volatile Node head; private transient volatile Node tail; static final class Node { /** 共享锁的标志 */ static final Node SHARED = new Node(); /** 独占锁的标志 */ static final Node EXCLUSIVE = null; /** 表示线程状态已取消 */ static final int CANCELLED = 1; /** 表示后续线程正在等待释放 */ static final int SIGNAL = -1; /** 表示线程正在等待condition条件 */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } } 复制代码
这里首先介绍下AQS内部类Node,既然叫AQS,自然就有Queue,Node就是队列中的元素,从类定义可以看出这是一个双向链表,属性介绍如下:
属性 | 说明 |
---|---|
waitStatus | 等待状态 |
prev | 上一节点 |
next | 下一节点 |
thread | 当前节点存储的线程 |
nextWaiter | Node既可以做同步队列节点使用,也可以做等待队列节点使用,前者中的取值为SHARED或EXCLUSIVE,用来标识共享还是独占模式,后者的取值是后继等待节点 |
其中waitStatus有5种取值:
CANCELLED:值为1,取消状态,不再等待
SIGNAL:值为-1,表示后继节点处于等待状态,等待被唤醒
CONDITION:值为-2,当前节点处于等待队列,节点线程等待在Condition上,当其他线程对condition执行signall方法时,等待队列转移到同步队列,加入到对同步状态的获取
PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态
0:以上都不是(源码翻译过来就是这个。。可以理解为初始化状态)
可以看到只有CANCELLED一个状态是大于0的,后续判断状态时会用到这一点。
Node介绍完毕,回到addWaiter方法,这个方法就是将当前线程生成Node节点,并将其添加到队尾,这里有点小技巧,可以看到compareAndSetTail只调用了一次,并没有自旋,难道不怕设置失败吗?原因就在于把自旋放到了enq方法里,可以看到enq方法中,当队列不为空时就会自旋设置tail为当前Node,这块代码其实和addWaite差不多,至于为什么要把这块重复代码放到两个方法里,我暂时也没看明白。。囧
另外可以看到第一次进到enq的for循环中时队列为空,这时会进行初始化,将头结点设置为空节点,因此同步队列中的头节点其实是起到一个头指针的作用。
addWaiter返回了当前Node,然后被acquireQueued调用,首先会判断它的前一节点是不是头结点,是的话说明当前线程是第一个等待线程,会尝试获取锁,
如果获取成功: 则把当前节点设置为头节点,并把原来的头节点设置为null,方便垃圾回收,直接返回 如果获取失败,则调用shouldParkAfterFailedAcquirefang方法判断当前线程是否需要挂起进行等待,如果需要,则调用parkAndCheckInterrupt执行挂起操作,可以看到这里通过&&完成了等同于if else的判断,也算是个小技巧把,这两个方法的执行逻辑都写在下面代码注释中了
public abstract class AbstractQueuedSynchronizer{ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前置节点状态为SIGNAL,表明当前线程需要挂起等待,返回true if (ws == Node.SIGNAL) return true; //如果前置节点>0,说明是CANCELLED状态,则通过循环将当前节点的前置节点指向到前面第一个状态不为CANCELLED的节点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //将前置节点状态置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { //通过LockSupport类挂起当前线程,后续也需要通过LockSupport.unpark来唤起线程 LockSupport.park(this); return Thread.interrupted(); } } 复制代码
可以看到,走到这里获取锁失败的线程就会进入到CHL队列中,当前线程会被挂起,那什么时候会被唤醒呢,我们接下来看看释放锁的过程。
public class ReentrantLock{ public void unlock() { sync.release(1); } } 复制代码
同样,ReentrantLock释放锁时也是调用的sync.release方法,这个方法在父类AbstractQueuedSynchronizer中
public abstract class AbstractQueuedSynchronizer{ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } } public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } } 复制代码
可以看到,release方法先执行tryRelease方法,释放成功后则会获取头结点,唤醒头结点的后继节点的线程,唤醒后该线程会继续执行acquireQueued方法中的循环,如果获取锁成功,则将自己置为头结点。
tryRelease方法是个抽象方法,在ReentrantLock.Sync的实现里可以看到,因为是可重入锁,所以每次释放会将state-1,直到state=0时这个锁才是真正释放成功返回true,进行后续唤醒下一个线程操作。
至此,ReentrantLock加锁和释放锁的整个过程就分析完毕,可以看到,其实ReentrantLock中并没有太多实现,只是重写了AQS的tryAcquire和tryRelease两个方法,其他CHL队列的管理、线程的唤醒都是AQS中实现的,所以我们说AQS是其实现的基础,同样ReentrantReadWriteLock、CountdownLatch等也都是都是基于AQS实现的,后续会再抽时间对这些类的源码进行分析。
本文开头还有说到ReentrantLock中还有NonfairSync和FairSync两个实现,它们的区别主要在于tryAcquire的实现上
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } } public abstract class AbstractQueuedSynchronizer{ //判断是否已有节点正在等待,如果head==tail,说明队列为空,返回false,如果队列不为空,且head后继节点不是当前线程节点,说明已有其他线程正在等待,返回true public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } } 复制代码
NonfairSync.lock和NonfairSync.tryAcquire方法中,新的线程进来后都会直接尝试获取锁,因此可能会出现这种情况:某个线程A终于被唤醒后,这时一个新的线程刚进来,并成功获取到锁,导致线程A执行tryAcquire方法失败,又只能继续等待,即线程饥饿的情况。
FairSync.tryAcquire方法则不同,如果锁没有被获取,首先会通过hasQueuedPredecessors方法判断是否已经有其他线程在等待(hasQueuedPredecessors也是在AQS中实现的,AQS真是好人啊。。),如果有就返回false,进入后续enq等操作,加入到CHL队列中进行等待。
可以看到,非公平锁可以减少CPU唤醒线程的开销,因此整体的吞吐效率会高点。
以上就是本文全部内容,可能有的地方理解有误,欢迎各位指出~
参考: blog.csdn.net/qq_28605513…