上一篇我们解读了LockSupport的源码,其中有提到JUC的一块重要基石是AbstractQueuedSynchronizer类,简称AQS,那么这一篇就正式学习这个类。由于我也是以学代练,肯定有很多地方理解的不够到位,欢迎大家留言讨论哈!还是友情提示,本文的分析的JDK版本是8。
为什么要在开篇就介绍AQS的工作原理呢?因为先对一些知识点有个大概了解,可以帮我们在看源码时更容易理解一些,做到有的放矢,事半功倍。
这里我总结了三个比较关键的点,需要我们知道的。
AQS的实现思路还是很清晰的,使用一个state来维护竞争状态,使用CAS来安全的更新state,获取锁失败的线程进入等待队列unpark,锁被释放后,从队列中唤醒一个线程来继续尝试获取锁。
AQS支持独占和共享二种模式,独占模式相对容易理解一些,光说不练假把式,我们先利用AQS实现一个独占锁SmartLock来加深理解。
public class SmartLock { private class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (getExclusiveOwnerThread() == Thread.currentThread()) return true; if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } else { return false; } } @Override protected boolean tryRelease(int arg) { setState(0); setExclusiveOwnerThread(null); return true; } } private Sync mSync; public SmartLock() { mSync = new Sync(); } public void lock() { mSync.acquire(1); } public void unLock() { mSync.release(1); } } 复制代码
我们新建一个内部类Sync继承AQS,重写它的tryAcquire和tryRelease方法,可以理解为它们分别对应独占模式下的尝试获取锁和尝试释放锁,返回true表示成功,false表示失败。
这里我们可以停下来想一下,既然AQS内部有一个state可以利用,那我们可以这样设定游戏规则,state=1时表示锁被占用,state=0表示锁没有被某个线程持有。
protected boolean tryAcquire(int arg) { // 先判断当前持有锁的线程是不是本线程,如果是,直接返回true,所以我们这个锁是支持可冲入的 if (getExclusiveOwnerThread() == Thread.currentThread()) return true; // CAS的方式更新state,只有当state=0时会成功更新为1 if (compareAndSetState(0, 1)) { // 当前线程已经获取了锁,设置为Owner thread setExclusiveOwnerThread(Thread.currentThread()); return true; } else { // 返回true,当前线程会被加入等待队列中 return false; } } protected boolean tryRelease(int arg) { // 状态更新为0, setState(0); // Owner thread设置为null setExclusiveOwnerThread(null); return true; } 复制代码
我们在SmartLock类中定义二个方法lock和unLock,分别调用acquire和release即可,这里的参数没有用到,传1即可。
public void lock() { mSync.acquire(1); } public void unLock() { mSync.release(1); } 复制代码
我们用SmartLock来实现一个线程安全的累加器,逻辑很简单就是提供一个increase方法,对counter进行++操作,我们知道++操作不是原子的,所以我们用SmartLock来保证原子性。
public class SmartAdder { private volatile int counter; private SmartLock lock; public SmartAdder() { lock = new SmartLock(); } public void increase() { lock.lock(); try { counter++; } finally { lock.unLock(); } } public int getCounter() { return this.counter; } } 复制代码
我们写一段测试的case来验证一下,新建了一个固定有20个核心线程的线程池,然后提交了40个累加任务,每个任务循环100000次,这样得到的正确结果应该是4000000。
public static void main(String[] args) { int threadCount = 20; int addCount = 100000; SmartAdder smartAdder = new SmartAdder(); CountDownLatch countDownLatch = new CountDownLatch(threadCount); ExecutorService executorService = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount * 2; i++) { executorService.submit(() -> { for (int j = 0; j < addCount; j++) { smartAdder.increase(); } countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("count:" + smartAdder.getCounter()); executorService.shutdown(); } // 打印结果 count:4000000 复制代码
打印就结果验证了我们的SmartLock确实能够正常工作,这样一个简单的互斥锁就完成了,其实也并不复杂嘛!其中CountDonwLatch也是JUC提供的一个并发同步类,关于它的用法后面会详解,这里大家只需要知道await可以让当前线程等待线程池中的任务执行完成即可。
有了前面的铺垫,我们现在先看下AQS中独占模式的acquire和release二个方法的具体实现。
先看acquire方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 复制代码
可以看到acquire是一个final方法,我们没法重写它,但是有预留一个tryAcquire方法让我们重写,我们在上面的SmartLock类中也是重写了tryAcquire该方法,如果tryAcquire返回false,会调用acquireQueued方法,它的参数是addWaiter(Node.EXCLUSIVE)的结果,我们先来具体跟进看一下addWaiter的实现。
private Node addWaiter(Node mode) { // 新建一个Node Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 如果队列的尾标兵tail不为空,将新加入的node插入到队尾,并更新tail if (pred != null) { node.prev = pred; // 如果CAS设置tail成功,直接返回 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果tail为空,或者CAS设置tail失败 enq(node); return node; } 复制代码
这里的思路就是将新建的node插入到队尾,但是由于到考虑到线程安全的问题,采用了CAS更新,如果更新失败,调用enq方法,继续跟进看一下实现。
private Node enq(final Node node) { for (;;) { Node t = tail; // 如果检查到队列没有初始化,先执行初始化,注意head对头是标兵 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 在循环中执行CAS插入尾部操作 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
所以看下来,addWaiter逻辑也很清晰,就是要将当前线程,封装为node插入到队列尾部。再看下acquireQueued的实现
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 返回node前驱 final Node p = node.predecessor(); // 如果前驱是head,说明当前队列中该线程排在第一顺位,再次调用tryAcquire // 因为后面调用的parkAndCheckInterrupt会让线程等待,当锁被release时,线程会被unpark // 所以重新tryAcquire来获取锁,如果获取成功,会将当前node设为头结点,相当于将当前 // node从队列中删除了,因为头结点只是一个标兵, if (p == head && tryAcquire(arg)) { setHead(node); // 这里之所以可以直接将.next置为null,而没有考虑node的next,因为是刚加入的node // 它在队尾,而又是head的next,说明队列中就它一个,直接将head.next = null就可以了 p.next = null; // help GC failed = false; return interrupted; } // 先对head设置waitStatus标示位,然后park线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } 复制代码
shouldParkAfterFailedAcquire这个方法很有意思,它是将head的waitStatus设为SINGLE,用来标识有任务需要被唤醒,在后面unpark的时候会检查该标识位。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取node的pre的waitStatus, int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果已经是Node.SIGNAL,可以安全的park,直接返回 return true; if (ws > 0) { // 说明pred被取消了,并发时会出现这种情况,因为我们没有加锁 // 继续向前查找waitStatus <= 0的node, do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将pred的waitStatus设为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
再看下parkAndCheckInterrupt这个方法的实现
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
比较简单,直接调用了LockSupport.park,所以AQS中让线程等待的方式就是park,这也就是为什么我们前一篇文章要分析LockSupport源码的原因。
那么线程park等待了,那当然就要有唤醒,我们看下AQS中release的实现。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
同样的AQS中release是一个final方法,不能被重写,我们可以重写tryRelease方法。当head不为空,切waitStatus不为0时,调用unparkSuccessor方法,跟进去看下实现
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 先将waitStatus设为0 compareAndSetWaitStatus(node, ws, 0); // 一般需要被唤醒的是node.next,但是如果next的node被取消了,或者waitStatus>0,这时候这里的 // 策略是从尾部开始重新选择一个node来unpark Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // unpark唤醒线程 LockSupport.unpark(s.thread); } 复制代码
release的实现相对简单一些,前面介绍tryAcquire失败后,会将当前线程插入到等待队列中时,然后将head的waitStatus置为SINGAL,那么在release时,会先检查这个标识,然后unpark,这里有个小细节,如果head.next被取消了或者waitStatus>0,会从队列的尾部开始往前查找到第一个符合条件的node来unpark。
这里有个细节大家要注意,release只是将队列中第一个满足条件等待的线程唤醒,所以接下来的逻辑还是在acquireQueued方法中,继续尝试调用tryAcquire,如果成功,则会被出队列(当前节点设为头结点),线程继续执行,否则继续等待。
介绍完了独占模式,再来看下共享模式。与独占模式类似,AQS也对共享模式提供了模板方法。分别是acquireShared和releaseShared,它们也都是final的,我们能够重写的方法是tryAcquireShared和tryReleaseShared。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } 复制代码
acquireShared先调用了tryAcquireShared方法,如果返回值小于0, doAcquireShared同样构建SHARED类型的Node加入等待队列。这里要提一下,tryAcquireShared方法使我们需要重写的,注意它的返回值是int类型的,而上面我们分析独占模式tryAcquire的返回值是boolean,因为在共享模式下这个返回值需要有三种状态,所以需要是int类型。
好,我们继续看下doAcquireShared的实现
// 添加当前节点到队列,与独占模式类似,不再赘述 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取node的前驱 final Node p = node.predecessor(); if (p == head) { // 再次尝试获取共享锁 int r = tryAcquireShared(arg); // 如果获取成功 if (r >= 0) { // 检查队列中后续的节点,是否需要被唤醒 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // park让线程等待,与独占模式类似,不再赘述 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // propagate > 0, 或者 当前头结点或者当前节点node的waitStatus < 0时,调用doReleaseShared if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 复制代码
大部分的逻辑跟独占模式类似,但是多了一个检查后续节点是否需要被唤醒的逻辑。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
可以看到doReleaseShared是在一个循环中,如果在调用中,head发生了变化,继续循环,否则挑出循环,而在都独占模式下,没有这样的并发问题,所以独占模式下不需要循环,另外干活的就是unparkSuccessor方法,它来唤醒等待的线程,上面在分析独占模式时已经分析过了,这里不再赘述。
文章先是概述了一下AQS的基本实现原理,然后利用AQS实现了一个简单的互斥锁,最后详细分析了AQS中独占和共享二种模式的关键方法的实现。以上。