通过AQS独占锁,我们对AQS的数据结构有了基本的了解。它本质上就是一个优化过的CLH队列,因为CLF队列只有一个前驱指针,而AQS除了前驱指针,还有一个后驱指针。先简单总结一个AQS的特性
Node
state
,被 volatile
关键字修饰 state==0
表可以获取锁, state>1
代表锁重入 state>0
代表可以获取锁,同步器初始化的时候,会给sate设置一个初始化,这个值代表同时允许多少个线程获取锁 tryAcquireShared
返回值的特点是: 小于0
代表获取锁失败; 等于0
代表本次获取锁成功,但随后的获取将返回失败,也就是此刻这是共享模式下的最后一把锁,除非接下来有人释放锁,否则你获取不了; 大于0
代表本次获取锁成,并且接下来也可以获取锁 nextWaiter
属性,表示该节点是独占模式还是共享模式,独占为 EXCLUSIVE
,独占为 SHARED
waitStatus
属性, 这个字段的取值有以下可能: SIGNAL(阻塞)
CANCELLED(取消排队)
CONDITION(条件等待)
PROPAGATE(共享模式下用到)
0(如果没有给它设置状态,默认为0)
help GC
;当获取锁失败时,会向CLH队列尾部添加一个节点,同时通过自旋将其前驱节点的 waitStatus
属性设置为 SIGNAL
,然后通过 LockSupport
将该节点对应的线程阻塞 LockSupport
将该节点对应的线程解除阻塞;如果为null,则通过 前驱指针
反向遍历找到该节点,然后通过 LockSupport
将该节点对应的线程解除阻塞。为什么要通过前驱指针遍历呢?因为AQS的后驱指针在极限情况下是不可靠的,但很多时候可以通过后置指针达到优化的效果:添加节点的时候,当CAS成功但在设置后置指针之前,此时后置指针为null;给前驱节点设置 SIGNAL
状态的时候,会保证其前驱节点是一个有效的节点(非取消状态),如果为取消状态,则找其前驱的前驱 setHeadAndPropagate
方法,如果此时同步器中还有可用的锁,则会调用 doReleaseShared
方法唤醒下一个节点,这就是传播 doReleaseShared
方法,该方法会唤醒head节点的下一个节点,而唤醒的节点在通过自旋获得锁后,会调用 setHeadAndPropagate
方法,如果此时同步器中还有可用的锁,则会继续调用 doReleaseShared
方法唤醒下一个节点
下面以 CountDownLatch
讲解AQS共享锁
CountDownLatch countDownLatch = new CountDownLatch(3); new Thread(() -> { sleep(TimeUnit.MILLISECONDS, 80); System.out.println(Thread.currentThread().getName() + " Finished"); countDownLatch.countDown(); }).start(); new Thread(() -> { sleep(TimeUnit.MILLISECONDS, 50); System.out.println(Thread.currentThread().getName() + " Finished"); countDownLatch.countDown(); }).start(); new Thread(() -> { sleep(TimeUnit.MILLISECONDS, 60); System.out.println(Thread.currentThread().getName() + " Finished"); countDownLatch.countDown(); }).start(); countDownLatch.await(); System.out.println("All Finished"); } --------------------------------------------结果--------------------------------------------------- Thread-1 Finished Thread-2 Finished Thread-0 Finished All Finished 复制代码
CountDownLatch
的用法:比如将一个任务分成3个小任务,然后在主线程上等待所有任务完成,这时可以使用 CountDownLatch
。在构造函数中传入3,在AQS共享模式下 state == 3
代表同时可以有3个线程获取锁,在 CountDownLatch
代表有三个线程调用 CountDownLatch#countDown
方法后,调用 CountDownLatch.await()
的线程将不再阻塞。在上面例子中,每个小任务完成时调用 CountDownLatch#countDown
方法,然后在主线程上调用 CountDownLatch#countDown
,这样就达到我们想要的效果了。
CountDownLatch countDownLatch = new CountDownLatch(3); public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); } 复制代码
CountDownLatch
中有一个内部类 Sync
,它继承了 AbstractQueuedSynchronizer
抽象类,它是实现同步器的关键 CountDownLatch
中,会根据我们传入的count值,调用 AbstractQueuedSynchronizer#setState
方法,即最终AQS中 state == count
// CountDownLatch#await public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AbstractQueuedSynchronizer#acquireSharedInterruptibly public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // Sync#tryAcquireShared protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } 复制代码
和之前的独占模式一样,还是模板模式,也就是说,想实现一个基于共享锁的同步器,只需要重写 tryAcquireShared
和 tryReleaseShared
方法。
acquireSharedInterruptibly
方法名可以知道,这是一个可以响应中断的方法,如果线程发生中断,则抛出 InterruptedException
tryAcquireShared
方法的主要作用就是当 state==0时返回1,否则返回-1
,我们知道 CountDownLatch
构造函数执行完成之后,AQS中state的值为3,那state的值什么情况下会变为0呢?其实不难猜出应该是在调用 CountDownLatch#countDown
方法时会改变state的值,这一块内容我们接下来再去验证。也就是说,在没有其他处理的情况下,此时 tryAcquireShared
方法会返回 -1
acquireSharedInterruptibly
方法中的if条件成立,所以接下来会执行 doAcquireSharedInterruptibly
方法 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 创建共享节点,注意这里的 Node.SHARED,然后将其添加到队列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 自旋获取锁 for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 如果前驱节点时head,则直接获取锁 if (p == head) { // 尝试获取锁 int r = tryAcquireShared(arg); if (r >= 0) { // 尝试获取锁成功,需要重设设置头节点,这里面还有传播操作,等下重点关注 setHeadAndPropagate(node, r); // 将之前的头节点从队列中删除 p.next = null; // help GC failed = false; return; } } // 设置前驱节点的状态为 SIGNAL 并且阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码
SIGNAL setHeadAndPropagate
那么,这个 setHeadAndPropagate
方法是干嘛用的呢?有必要认真看看
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 设置当前节点为head节点,和独占模式下一样 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 唤醒下一个节点, doReleaseShared(); } } 复制代码
咋一看觉得这段代码是比较难理解的,除了最开始的 setHead
和独占模式一样,表示设置当前节点为head节点以外,其他的代码看起来有点迷糊。其实这时候我们得从AQS共享模式的一个特点去理解它。
tryAcquireShared
返回值特点: 小于0
代表获取锁失败; 等于0
代表本次获取锁成功,但随后的获取将返回失败,也就是此刻这是共享模式下的最后一把锁,除非接下来有人释放锁,否则你获取不了; 大于0
代表本次获取锁成,并且接下来也可以获取锁 propagate
即 tryAcquireShared
方法的返回值,如果 propagate>0
,则说明共享模式下还有锁可以获取,这时候如果队列中有排队的节点,应该通知它们,这就是传播。那怎么通知呢?调用 doReleaseShared
方法 head==null || head.waitStatus<0
又是对应什么场景呢?接下来再说
有关于 CountDownLatch#await
方法,到这里我们可以放一放,当然还存在一些疑问,先记下来
head==null || head.waitStatus<0 (h = head) == null || h.waitStatus < 0) node.next == null || node.next.isShared()
其实前面已经猜测过, CountDownLatch#countDown
方法应该会改变state的值,同样还是模板模式
// CountDownLatch#countDown public void countDown() { sync.releaseShared(1); } // AbstractQueuedSynchronizer#releaseShared public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // Sync#tryReleaseShared protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
tryReleaseShared
表示尝试获取锁,如果获取成功,此时需要通知 head节点的后驱节点
,并对该后驱节点解除阻塞。 Sync#tryReleaseShared
方法中向判断state是否等于0,如果是则直接返回false,然后设置 state = state-1
,然后再返回state是否等于。为什么有这么一段逻辑呢?这其实是和同步器的特性相关,对我在 CountDownLatch
的构造函数中传入3时,表示我们在3个线程上调用 CountDownLatch#countDown
方法后,调用 CountDownLatch#await
的线程将解除阻塞。就是3个,再多几个调用也没有什么效果,所以这里直接返回false CountDownLatch#countDown
方法后, AbstractQueuedSynchronizer#releaseShared
中的if条件将返回true,此时将调用 AbstractQueuedSynchronizer#doReleaseShared
方法,表示对head的后驱节点解除阻塞,其实还涉及到一些传播的逻辑 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果 head.waitStatus == Node.SIGNAL,说明它的后驱节点正被阻塞,在添加节点的时候会改变前驱节点的状态 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 如果 head.waitStatus == Node.SIGNAL,则唤醒head.next节点,和独占模式一样 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
这一块内容和 setHeadAndPropagate
方法一起看会更好一些,我把代码也贴上
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 复制代码
下面开始模拟执行 doReleaseShared
方法的极限情况
(1). 假设队列中有两个节点 A->B->C->D
A:head D:tail
(2). threa1
代表执行 doReleaseShared
的线程, thread2
代表节点B线程, thread3
代表节点C线程, thread4
代表节点D线程,最开始执行 doReleaseShared
方法的时候, thread2
thread3
thread4
是被阻塞的
thread1
第一次进入循环, h != null && h != tail
成立,然后开始执行 unparkSuccessor
方法唤醒 thread2
, unparkSuccessor
方法执行完成之后即代表 thread2
被唤醒,此时 thread1
和 thread2
同时在运行,这一时刻,head的状态为0
thread1
继续往下执行,虽然此时 head.waitStatus ==0
,但是变量 ws
的值是在 compareAndSetWaitStatus(h, Node.SIGNAL, 0)
方法执行之前赋值,所以此时 ws==SIGNAL
,所以此时 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
条件不成立,然后判断 h == head
是否成立
2.1 如果此时 thread2
还没有获取锁成功,即自己还没有成功升级为head,则 h == head
成立,此时 thread1
直接退出,接下来只需要 thread2
通过自旋获取锁成功就可以了,已经没有 thread1
什么事了,那有人会问,如果同步器里还可以获取锁,现在只唤醒一个节点B,线程 threa1
就直接退出了,后面的C、D节点怎么唤醒呢,这个交给 setHeadAndPropagate
2.2 如果此时 thread2
获取锁成功,即自己已经成功升级为head,则 h == head
不成立,此时 thread1
进入下一轮循环;同时 thread2
会执行 setHeadAndPropagate
方法,如果在同步器有锁得情况下, thread2
还会执行 doReleaseShared
,所以此时有可能两个线程同时执行 doReleaseShared
thread1
和 thread2
同时执行 doReleaseShared
方法,因为此时新head是节点B,状态为 SIGNAL
, compareAndSetWaitStatus(h, Node.SIGNAL, 0)
只有一个线程能执行成功
3.1 如果 thread1
执行成功,则会唤醒下一个节点,即节点C,如果节点C在升级head成功之后判断同步器中还有锁,节点C所在线程 thread3
会继续唤醒下一个节点,所以此时可能有3个线程在同时执行 doReleaseShared
方法
3.2 在 thread1
执行成功的时候, thread2
可能会在新一轮循环退出:在执行 Node h = head
和 if (h == head)
这两行代码时head未发生变化;也可能不退出:即head发生变化,此时有可能 thread1
thread2
thread3
同时在竞争执行 compareAndSetWaitStatus(h, Node.SIGNAL, 0)
方法,具体谁能成功,谁也说不准
在同步器有锁的情况下,如果 A B C D 4个节点都被唤醒了,说明此时队列中只剩下一个head,即节点D。当然,此时可能 A B C D 节点对应的线程可能都在执行 doReleaseShared
方法,但是没有关系,因为新一轮的循环条件 if (h != null && h != tail) && h == head
会导致它们退出
但考虑这种一种情况,加入有3个线程都正常退出了,然后在线程 thread4
执行 if (h != null && h != tail)
之前,队列中添加了一个新的节点,即节点E,这时候 thread4
会重新进入循环,这时候head(即D节点)的状态有两种情况
5.1 节点E添加到尾部成功,并且已经修改了其前驱节点(节点D)的状态为 SIGNAL
,也就是此时 head(即D节点)的状态 == SIGNAL
, 这时候 thread4
会唤醒节点E,接下来的流程和上面一样,就不分析了
5.2 节点E添加到尾部成功,但还没来得急修改其前驱节点(节点D)的状态为 SIGNAL
,也就是此时 head(即D节点)的状态 == 0
,这时候 thread4
会进入到 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
判断
ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
什么情况下会不成立呢?
6.1 在多个线程执行 doReleaseShared
的时候,加入3个线程执行 doReleaseShared
,第一个线程成功执行了 unparkSuccessor
方法,那剩下两个线程有可能在新一轮循环中并发执行 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
,这种情况我们占不考虑
6.2 即在执行 ws == 0
和 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
这两行代码中间,head节点的状态被改掉了。而我们知道,在添加节点的时候会改变前驱节点的状态为SIGNAL,所以在节点E对应线程自旋设置head状态,可能导致这里不成立
虽然只有几行代码,但在不了解作者意图的情况下,真的好难看懂,做一个总结
共享锁
的含义, 共享锁
代表在同一时刻可以有多个线程获取锁,具体有几个线程由用户自己决定;而 独占锁
代表同一个时刻只能由一个线程获取锁 setHeadAndPropagate
方法,如果此时同步器中还有可用的锁,则会调用 doReleaseShared
方法唤醒下一个节点,这就是传播 doReleaseShared
方法,该方法会唤醒head节点的下一个节点,而唤醒的节点在通过自旋获得锁后,会调用 setHeadAndPropagate
方法,如果此时同步器中还有可用的锁,则会继续调用 doReleaseShared
方法唤醒下一个节点