Condition在ReentrantLock中,实际上是创建AQS的ConditionObject对象,主要的成员变量有Node类型的firstWaiter和lastWaiter,作为头节点和尾节点,是单向链表。当调用await时,加入队列,signal时,加入到AQS的阻塞队列。
把节点移到Condition队列后挂起
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter();//添加一个节点成为尾节点 int savedState = fullyRelease(node);//释放所有持有的锁 int interruptMode = 0; while (!isOnSyncQueue(node)) {//要么中断,要么进入阻塞队列,退出while循环 LockSupport.park(this);//挂起 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//中断过,就跳出循环 break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//是否被中断。acquireQueued之前讲过 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled如果尾节点不为空 unlinkCancelledWaiters();//将不是CONDITION状态的移除出去 if (interruptMode != 0) reportInterruptAfterWait(interruptMode);//重新中断 }
addConditionWaiter,如果尾节点不在队列里,先移除已取消的节点,添加一个节点成为尾节点
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out.如果尾节点不为空,但是状态不是CONDITION,说明已取消,不想在Condition的队列里,就移除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters();//将不是CONDITION状态的移除出去 t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION);//创建状态是CONDITION的节点 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node;//加入到尾节点 return node; }
fullyRelease方法
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed)//如果释放失败,状态就变成CANCELLED,而不是CONDITION,所以会在addConditionWaiter方法中被移除 node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue方法
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null)//如果状态是CONDITION或者前置节点为空,说明还在Condition队列里 return false; if (node.next != null) // If has successor, it must be on queue如果有后续节点了,肯定是在阻塞队列里 return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node);//从阻塞队列的尾节点遍历,如果找到当前node,说明在阻塞队列里返回true }
checkInterruptWhileWaiting方法,唤醒前已经中断,返回THROW_IE,唤醒后中断,返回REINTERRUPT,没有中断,返回0
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { //如果cas操作成功,且预期值是CONDITION状态,说明在唤醒前就中断了,并加入阻塞队列 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node))//一直到阻塞队列里,在唤醒后才中断 Thread.yield(); return false; }
唤醒Condition队列的节点
public final void signal() { if (!isHeldExclusively())//非独占抛异常 throw new IllegalMonitorStateException(); Node first = firstWaiter;//获取头结点 if (first != null) doSignal(first);//唤醒头节点 } //如果头结点已经取消,就继续往下个节点寻找 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null)//把下一个节点移到头结点 lastWaiter = null; first.nextWaiter = null;//help gc } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * 如果cas操作没成功,说明已经取消了,就继续下一个节点 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * 没有取消,就加入阻塞队列,这个方法在aqs讲过了 * */ Node p = enq(node); int ws = p.waitStatus;//前置节点的状态 //大于0,说明前置节点已取消,就轮到当前节点,可以唤醒 //小于等于哦,把前置节点的状态设置为-1,如果失败了,唤醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);//唤醒 return true; }