上文介绍了AQS的一些基础知识,包括CLH锁的原理和AQS的一些数据结构,这篇文章中我们来分析一下AQS的方法。AQS是一个抽象类,定义了几个模板方法交给子类去实现,分别是:
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
本文不分析获取共享锁的情况。
先来看一下比较简单的一个子类实现 ReentrantLock
中的内部类 FairSync
,下面就直接分析一下 ReentrantLock
的实现, ReentrantLock
有一个字段 private final Sync sync;
,而 Sync
正是继承自AQS。来看一下 ReentrantLock
的构造函数:
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
其中 NonfairSync
和 FairSync
均继承自 Sync
类,区别只是获取锁的方式是否公平:
//NonfairSync final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } //FairSync final void lock() { acquire(1); }
可以看出,非公平锁在获取锁的时候会先尝试一下是否能直接获取锁,如果可以,就不必进行排队( acquire
方法),而公平锁会直接进入等待队列排队。
看一下AQS中 acquire
方法的实现:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire
方法大体的逻辑就是先调用 tryAcquire
尝试获取锁,如果失败的话就新建一个 Node
对象并添加到AQS的等待队列中去。
private Node addWaiter(Node mode) { //使用当前线程新建一个Node对象,并且设置锁为排它锁 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //直接使用compareAndSetTail把当前Node添加到等待队列的队尾,由于多线程的问题,可能导致设置失败,此时转到enq方法去执行 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //添加Node失败,调用enq方法无限循环地添加 enq(node); return node; }
addWaiter
方法新建一个Node对象,并且设置成 EXCLUSIVE
模式,也就是排它锁。在该方法中,先使用原子操作尝试把新建节点添加到等待队列的队尾,如果添加成功则直接返回,否则调用 enq()
方法添加。
private Node enq(final Node node) { for (;;) { Node t = tail; //如果等待队列还没有建立,那么先初始化,并且设置头、尾节点都是当前节点 if (t == null) { // Must initialize //如果是等待队列的初始化,则创建一个哑节点,需要注意的是此if语句并没有返回,而是在enq的下轮循环中将node节点添加到这个哑节点后 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //依然是调用原子操作来把当前节点添加到队尾,不过由于代码处于无限循环当中,总会有添加成功的时候 if (compareAndSetTail(t, node)) { //可以看到把原尾节点的next字段设置成node和把node设置成尾节点合并在一起并不是个原子操作 t.next = node; return t; } } } }
在此要说明的是,AQS的等待队列是使用lazy模式来初始化的,也就是说,如果一直没有线程来竞争锁,那么等待队列一直都不会建立。而等待队列的初始化是在 enq()
方法中进行的。
由于在 addWaiter
的过程中可能其他线程已经释放了锁,所以在 acquireQueued
方法中把新建Node阻塞之前可以再尝试一下获取锁,如果仍然失败,再阻塞该线程:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //如果前驱节点为头节点,而且尝试获取锁成功了,那么把当前节点设置成头节点 //这也只是一条快速路径 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
AQS等待队列中的头节点代表当前持有锁的线程,所以当一个节点的前驱节点是头节点的时候,可以尝试一下获取锁(因为可能锁持有线程已经释放了锁),如果获取锁成功,那么把该节点设置成头节点并返回。如果当前节点的前驱节点并不是头节点,那么不给它尝试获取锁的机会,因为AQS等待队列是FIFO的,此时没有轮到它获取锁。
如果该节点尝试获取锁失败,那么调用 shouldParkAfterFailedAcquire
方法判断是否需要阻塞当前节点:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //如果前驱节点的状态是SIGNAL,说明前驱节点已经已经做好了后继节点阻塞的准备,可以直接阻塞当前节点 if (ws == Node.SIGNAL) return true; //只有CANCELLED的值大于0 //如果前驱节点的状态是CANCELLED,那么需要继续向前遍历找到一个未取消的节点做为当前节点的前驱节点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //此时前驱节点的状态值为0或者PROPAGATE(用于共享锁,本文暂不考虑这种状态) //将前驱节点的值设置成SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
如果 shouldParkAfterFailedAcquire
方法返回false,那么在 acquireQueued
方法中进行下一轮循环(因为 shouldParkAfterFailedAcquire
方法可能修改了当前节点的前驱节点或者前驱节点的状态),此时当前节点的前驱节点可能变成头节点或者前驱节点的状态变成 SIGNAL
。如果 shouldParkAfterFailedAcquire
方法返回true,那么就调用 parkAndCheckInterrupt
方法阻塞当前节点代表的线程:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
至此,获取锁的 acquire
方法已经分析完毕。再来看看释放锁的 release
方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //如果某个节点有后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
release
方法调用 tryRelease
(模板方法,交给子类去实现)来释放相应数量的信号量,如果方法调用成功,那么就唤醒等待队列头节点的后继节点。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //即使某个节点的next字段为空也不能判定该节点没有后继节点,因为创建一个节点并且把它的前驱节点的next字段指向它这并不是个原子操作,所以在多线程的环境中可能出现node节点的next字段为空但是实际上有后继节点的情况,此时需要从队列的tail节点向前遍历来寻找node的后继节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
假设没有中断的情况下,在 unparkSuccessor
方法中 unpark
的线程会继续在方法 acquireQueued
方法的运行,并且其 parkAndCheckInterrupt
方法会返回false,然后就会再次尝试获取锁,此次获取锁就会成功了,然后 acquire
方法就会返回,代表该线程已经可以执行临界区代码了。