作者:张文俊,转载请注明出处。
任何一个java对象都天然继承于Object类,在线程间实现通信往往会应用到Object的几个方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)与notify(),notifyAll()几个方法实现等待/通知机制,同样的, 在java Lock体系下依然会有同样的方法实现等待/通知机制。
wait(),notify(),与await()和signal的区别:
前者是配合内置锁synchronized来实现多个线程间的通知的,后者一般结合Lock来实现前者的功能,不同的是synchronized同时 只能与一个共享变量的notify或wait方法实现同步 ,而AQS的一个锁可以对应多个条件变量。
(AQS阻塞队列)中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回
2、signalAll():能够唤醒所有等待在condition上的线程
要想能够深入的掌握condition还是应该知道它的实现原理,现在我们一起来看看condiiton的源码。创建一个condition对象是通过 lock.newCondition()
,而这个方法实际上是会new出一个 ConditionObject 对象,该类是AQS的一个内部类,可以访问到AQS的变量和方法。condition内部维护了一个 等待队列 ,所有调condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。另外注意到ConditionObject中有两个成员变量:
private transient Node firstWaiter; private transient Node lastWaiter;
这样我们就可以看出来ConditionObject通过持有等待队列的头尾指针来管理等待队列。主要注意的是Node类复用了在AQS中的Node类,进一步说明, 等待队列是一个单向队列 ,而AQS同步队列是一个双端队列。
同时还有一点需要注意的是:我们可以多次调用lock.newCondition()方法创建多个condition对象,也就是一个lock可以持有多个等待队列。而在之前利用Object的方式实际上是指在 对象Object对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列 。示意图如下:
ConditionObject是AQS的内部类,因此每个ConditionObject能够访问到AQS提供的方法,相当于每个Condition都拥有所属同步器的引用
当调用condition.await()方法后会使得当前获取lock的线程进入到等待队列,如果该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock。接下来,我们就从几个核心方法的代码说起,首先从await()方法开始。await()方法源码为:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 1. 将当前线程包装成Node,尾插入到等待队列中 Node node = addConditionWaiter(); // 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 4. 自旋等待获取到同步状态(即获取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 5. 处理被中断的情况 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
当前线程调用condition.await()方法后,会使得当前线程释放lock然后加入到等待队列中,直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理。那么关于这个实现过程我们会有这样几个问题:
而这段代码的逻辑就是告诉我们这三个问题的答案。具体 请看注释 ,在第1步中调用addConditionWaiter将当前线程添加到等待队列中,该方法源码为:
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //将当前线程包装成Node Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else //尾插入 t.nextWaiter = node; //更新lastWaiter lastWaiter = node; return node; }
将当前节点包装成Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。就是 通过尾插入的方式将当前线程封装的Node插入到等待队列中即可 ,同时可以看出等待队列是一个 不带头结点的链式队列 ,之前我们学习AQS时知道同步队列 是一个带头结点的链式队列 ,这是两者的一个区别。将当前节点插入到等待对列之后,会使当前线程释放lock,由fullyRelease方法实现,fullyRelease源码为:
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { //成功释放同步状态 failed = false; return savedState; } else { //不成功释放同步状态抛出异常 throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
调用AQS的模板方法release方法释放AQS的同步状态并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。到目前为止,这两段代码已经解决了前面的两个问题的答案了,还剩下第三个问题,怎样从await方法退出?现在回过头再来看await方法有这样一段逻辑:
while (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
很显然,当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,出口就只剩下两个地方: 1. 逻辑走到break退出while循环;2. while循环中的逻辑判断为false 。
再看代码出现第1种情况的条件是当前等待的线程被中断后代码会走到break退出,第二种情况是当前节点被移动到了同步队列中(即另外线程调用的condition的signal或者signalAll方法),while中逻辑判断为false后结束while循环。总结下,就是 当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后 ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用 acquireQueued(node, savedState)
该方法的作用是在 自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到lock) 。这样也说明了 退出await方法必须是已经获得了condition引用(关联)的lock 。
到目前为止,开头的三个问题我们通过阅读源码的方式已经完全找到了答案,也对await方法的理解加深。await方法示意图如下图:
如图,调用condition.await方法的线程必须是已经获得了lock,也就是当前线程是同步队列中的头结点。调用该方法后会使得当前线程所封装的Node尾插入到等待队列中。
2、不响应中断的支持,condition.awaitUninterruptibly()方法
调用condition的signal或者signalAll方法可以将等待队列中等待时间最长的节点移动到同步队列中,使得该节点能够有机会获得lock。按照等待队列是先进先出(FIFO)的,所以等待队列的头节点必然会是等待时间最长的节点,也就是每次调用condition的signal方法是将头节点移动到同步队列中。我们来通过看源码的方式来看这样的猜想是不是对的,signal方法源码为:
public final void signal() { //1. 先检测当前线程是否已经获取lock if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //2. 获取等待队列中第一个节点,之后的操作都是针对这个节点 Node first = firstWaiter; if (first != null) doSignal(first); }
signal方法首先会检测当前线程是否已经获取lock,如果没有获取lock会直接抛出异常,如果获取的话再得到等待队列的头指针引用的节点,之后的操作的doSignal方法也是基于该节点。下面我们来看看doSignal方法做了些什么事情,doSignal方法源码为:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //1. 将头结点从等待队列中移除 first.nextWaiter = null; //2. while中transferForSignal方法对头结点做真正的处理 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
具体逻辑请看注释,真正对头节点做处理的逻辑在 transferForSignal 放,该方法源码为:
final boolean transferForSignal(Node node) { //1. 更新状态为0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //2.将该节点移入到同步队列中去 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
关键逻辑请看注释,这段代码主要做了两件事情1.将头结点的状态更改为CONDITION;2.调用enq方法,将该节点尾插入到同步队列中,关于enq方法请看AQS的底层实现这篇文章。现在我们可以得出结论: 调用condition的signal的前提条件是当前线程已经获取了lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出 。signal执行示意图如下图:
sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已经知道d oSignal方法只会对等待队列的头节点进行操作, ,而doSignalAll的源码为:
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
该方法只不过时间等待队列中的每一个节点都移入到同步队列中,即“通知”当前调用condition.await()方法的每一个线程
如图,线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列。
公众号简介:作者是蚂蚁金服的一线开发,分享自己的成长和思考之路。内容涉及数据、研发、算法。