[ Java并发] AQS抽象队列同步器源码解析--独占锁获取过程
上一篇已经讲解了AQS独占锁的获取过程,接下来就是对AQS独占锁的释放过程进行详细的分析说明,废话不多说,直接进入正文...
首先进行说明下,能够正常执行到release方法这里来的线程都是获取到锁的,从下面代码可以看出释放锁步骤只有两个重要的方法:tryRelease与unparkSuccessor,tryRelease尝试释放锁,unparkSuccessor唤醒后继节点所封装的线程。
public final boolean release(int arg) { // 尝试释放锁 if (tryRelease(arg)) { Node h = head; // 如果头节点不为空,并且waitStatus不为0则唤醒后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 无论是否执行唤醒后继节点,总会返回true return true; } // 释放失败 return false; }
接下来就开始分析tryRelease与unparkSuccessor这两个主要的方法。
tryRelease方法在AQS是默认不实现具体逻辑的,如下:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
因此,我们就拿ReentrantLock的tryRelease的具体实现加以说明,
protected final boolean tryRelease(int releases) { // 释放后的锁的计数(可重入锁) int c = getState() - releases; // 当前释放锁的必须为锁持有的线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 当锁计数为0时说明已锁已完全释放,将AQS中占有线程设为空 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) {// 唤醒后继节点 int ws = node.waitStatus; // waitStatus,直接将waitStatus设为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // waitStatus > 0 ,说明该节点已被取消,从后往前遍历找到未被取消距离该节点最近的节点并唤醒 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread); }
unparkSuccessor的方法执行逻辑:
1.如果头节点waitStatus < 0,就直接将waitStatus 设为0
2.从后往前遍历,找出waitStatus <=0 的节点,并且是离头节点最近的节点,也就是头节点的后继节点
3.找到待唤醒的后继节点后唤醒该节点对应的线程。
以上就是本节要讲的主要内容了,下次再会.....
以为AQS独占锁的释放过程就此结束了吗?没那么简单。
重新往前看下锁释放的代码,不知道有没有发现问题?
1.tryRelease方法为什么不用CAS进行减少锁计数
2.unparkSuccessor方法中为什么只判断头节点waitStatus<0时,将waitStatus设为0,那么waitStatus>0的情况怎么不判断
3.unparkSuccessor中if (s == null || s.waitStatus > 0) {... },为什么需要判断waitStatus >0
4.为什么需要从后往前遍历找到离头节点最近的并且waitStatus<=0的后继节点进行线程唤醒,不可以从前往后遍历吗?
接下来我们逐一的对以上上个问题进行解释。
这个问题其实是最简单的一个问题,就是前面也提到的,能够执行到release方法这里来的线程都是已经获取到锁的线程,并且独占锁也只能是一个线程,因此不需要进行CAS进行比较后才赋值。
不知道大家还记不记得上一篇分析的内容,重新回顾一下shouldParkAfterFailedAcquire方法,当前驱节点的waitStatus>0时,我们会遍历剔除掉waitStatus>0的节点,因此,当前头节点waitStatus一定不会大于0
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 如果ws == Node.SIGNAL,则说明当前线程已经准备好被唤醒,因此现在可以被阻塞,之后等待被唤醒 if (ws == Node.SIGNAL) return true; if (ws > 0) { // 如果ws > 0,说明当前节点已经被取消,因此循环剔除ws>0的前驱节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果ws<=0,则将标志位设置为Node.SIGNAL,当还不可被阻塞,需要的等待下次执行shouldParkAfterFailedAcquire判断是否需要阻塞 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
在执行过程中,头节点的第一个后继节点的waitStatus >0 时就是节点是被取消的,有可能时因为获取锁超时被取消,因此我们需要跳过该节点的,所以需要重新找下个需要被唤醒的节点,而如果头节点的第一个后继节点的waitStatus<=0直接唤醒。
这个问题我们需要重新回顾上一篇的一个方法addWaiter 跟 enq方法
private Node addWaiter(Node mode) {// 首先尝试快速添加到队尾,失败再正常执行添加到队尾 Node node = new Node(Thread.currentThread(), mode); // 快速方式尝试直接添加到队尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果快速添加到队尾失败则执行enq(node)添加到队尾 enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 添加到队列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
从这两个方法可以取出添加节点到同步节点队尾的关键部分进行分析
node.prev = pred; // 步骤1 if (compareAndSetTail(pred, node)) { pred.next = node; // 步骤2 return node; }
从上面代码可以看出在执行插入节点的过程中,总是先执行node.prev = pred,然后再执行pred.next = node,因此关于问题4的答案就可以解释了:
如果我们从头往后遍历的话,再并发的环境下如果添加新节点的话可能node.prev = pre已经执行了,但pred.next=node 还未执行,但此时也已经开始执行了unparkSuccessor方法,所以会导致新添加的节点可能没被遍历到,但如果是从后往前遍历的话就不会有该问题。
以上就是AQS独占锁的释放过程,如果有什么问题,欢迎各位不吝指正。