本系列是基于经验设计原型,然后不断优化最终达到和AQS(AbstractQueuedSynchronizer)类似的设计。最终结果不一定和AQS完全一致,基于个人理解会有修改,可以作为理解AQS的不完全参考。
接上篇。本篇主要介绍Condition即条件变量的实现,ReentrantLock中最后一块需要实现的内容。
在实现条件变量之前,考虑一下条件变量的一些特性。
注意第一条特性,条件变量依赖的是独占锁,所以类似读锁这种共享锁是不支持条件变量的,ReentrantReadWriteLock中ReadLock#newCondition的实现是直接抛错。
按照第二条特性,可以得到如下的执行过程
注意ThreadA中被加粗的unlock和lock,调用条件变量的await方法时,虽然没有显式调用unlock和lock,但是内部其实是做了类似的事情的。
从第三条来看,条件变量应该是对应了一个专门的队列的,那些调用了await方法的线程会进入条件变量的队列中等待被signal。这个专门的队列和锁自身的lock/unlock等待队列不同,语义上唤醒是某个线程的signal/signalAll,而不是lock/unlock方法。如果有点难理解的话,可以考虑在上图中增加一个不调用条件变量任何方法的ThreadC,C的lock/unlock不会唤醒A,因为A等待的是B的唤醒,所以条件变量的队列和锁的lock/unlock队列是分离的,类似下图。
那么条件变量的等待队列需要设计得和锁的等待队列一样么?答案是不用。原因是假如你在await的unlock语义执行之前加入队列的话,因为条件队列依赖的是独占锁,只有一个线程可以操作condition queue。调用signal方法时,锁还没有被释放,同样只有一个线程可以操作condition queue。所以condition queue可以使用普通的同步队列设计。
基于以上初步分析之后得到的条件变量原型
import javax.annotation.Nonnull; import java.util.Date; import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; public class LockWithCondition1 implements Lock { @Override public void lock() { } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { } @Override @Nonnull public Condition newCondition() { return new ConditionImpl(); } @SuppressWarnings("Duplicates") private class ConditionImpl implements Condition { final LinkedList<WaitingThread> waitingThreads = new LinkedList<>(); @Override public void await() throws InterruptedException { waitingThreads.addLast(new WaitingThread(Thread.currentThread())); unlock(); LockSupport.park(this); lockInterruptibly(); } @Override public void awaitUninterruptibly() { waitingThreads.addLast(new WaitingThread(Thread.currentThread())); unlock(); LockSupport.park(this); lock(); } @Override public long awaitNanos(long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) { return 0L; } long startAt = System.nanoTime(); waitingThreads.addLast(new WaitingThread(Thread.currentThread())); unlock(); LockSupport.parkNanos(this, nanosTimeout); lock(); return System.nanoTime() - startAt; } @Override public boolean await(long time, @Nonnull TimeUnit unit) throws InterruptedException { WaitingThread t = new WaitingThread(Thread.currentThread()); waitingThreads.addLast(t); unlock(); LockSupport.parkNanos(this, unit.toNanos(time)); boolean signaled = (t.status.get() == WaitingThread.STATUS_SIGNALED || !t.abort()); lock(); return signaled; } @Override public boolean awaitUntil(@Nonnull Date deadline) throws InterruptedException { return await(deadline.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public void signal() { WaitingThread t = waitingThreads.removeFirst(); if (t != null) { t.wakeUp(); } } @Override public void signalAll() { for (WaitingThread t : waitingThreads) { t.wakeUp(); } waitingThreads.clear(); } } private static class WaitingThread { static final int STATUS_NORMAL = 0; static final int STATUS_SIGNALED = 1; static final int STATUS_ABORTED = -1; final AtomicReference<Thread> thread; final AtomicInteger status = new AtomicInteger(STATUS_NORMAL); WaitingThread(@Nonnull Thread thread) { this.thread = new AtomicReference<>(thread); } void wakeUp() { if (status.compareAndSet(STATUS_NORMAL, STATUS_SIGNALED)) { LockSupport.unpark(thread.get()); } } boolean abort() { thread.set(null); return status.compareAndSet(STATUS_NORMAL, STATUS_ABORTED); } } }
这里为了只关注条件变量省去了lock的所有代码。可以看到await方法中,执行了如下操作
对应地,signal执行了如下操作
提问,这里是否会发生signal无法唤醒await线程的情况?
考虑到enqueue和dequeue都是在持有独占锁的前提下操作的,实际可能有两种情况:
先dequeue意味着signal先执行(因为此时持有独占锁),以及队列为空,unpark不会唤醒任何线程。signal先执行等价于之后await调用不会发生,因为进入await之前的条件是满足的。执行结果正确。
先enqueue意味signal还无法执行(因为此时await的线程持有锁)。unlock之后,signal线程获取锁,发现await线程的节点,尝试unpark。这里有可能是unpark/park,也有可能是park/unpark执行序列。不管哪种,await线程都会被唤醒,尝试lock,进入lock的等待队列。signal线程释放锁之后,await线程获取锁,执行await之后的代码。执行结果正确。
可以看到,条件变量的分析比独占锁本身要容易很多,一方面这是条件变量本身的特性所致,另一方面合理利用条件变量的特性(特性一)简化了设计。
顺便说一句,设计上多个条件变量的队列互不影响,如果你看到某个锁的条件变量要求只能有一个,或者互相有影响的话,多少说明设计是有问题的。
刚才关于“先enqueue”的分析中,你是否注意到await线程被唤醒之后,由于signal线程还没有释放锁,所以await线程暂时无法获取锁,导致了一次多余的unpark/park。是否可以直接把await线程的节点从conditon queue转移到lock的等待队列?当然这样做的前提是条件变量了解lock的内部实现,由于条件变量依赖锁,所以这不是问题。
其次LockWithCondition1严格上来说在一些边界条件上不满足条件变量的语义。比如说await方法在抛出InterruptedException时必须持有锁(await返回时必须持有锁),现在的实现中lockInterruptibly在抛出InterruptedException时不持有锁。这是条件变量实现中最容易忽视的问题。
还有一些条件变量实现的细节问题,比如await的什么时候可以抛出InterruptedException。这块可以参考Condition接口本身的注释。
以下是修改版的条件变量实现。由于代码比较多,这里只展示了ConditionImpl的实现,其他代码可以参考
https://github.com/xnnyygn/concurrent-learning/tree/master/src/main/java/in/xnnyygn/concurrent/mylock
import javax.annotation.Nonnull; import java.util.Date; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.LockSupport; @SuppressWarnings("Duplicates") class ConditionImpl implements Condition { private final MyLock lock; private final ConditionQueue conditionQueue = new ConditionQueue(); ConditionImpl(MyLock lock) { this.lock = lock; } @Override public void await() throws InterruptedException { Node node = Node.createConditionForCurrent(); conditionQueue.enqueue(node); lock.unlock(); boolean interrupted = false; while (nodeNotLockEnqueued(node)) { LockSupport.park(this); /* * 1. interrupted, unknown * 2. predecessor is aborted, lock enqueued * 3. signaled by predecessor, lock enqueued */ if (Thread.interrupted()) { interrupted = lockEnqueueByAwaitingThread(node); break; } } if (lock.acquireUninterruptibly(node) && !interrupted) { Thread.currentThread().interrupt(); } if (interrupted) { conditionQueue.removeNonConditionNodes(); throw new InterruptedException(); } } @Override public void awaitUninterruptibly() { Node node = Node.createConditionForCurrent(); conditionQueue.enqueue(node); lock.unlock(); boolean interrupted = false; while (nodeNotLockEnqueued(node)) { LockSupport.park(this); if (Thread.interrupted()) { interrupted = true; } } if (lock.acquireUninterruptibly(node) || interrupted) { Thread.currentThread().interrupt(); } } @Override public long awaitNanos(long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) { return 0L; } Node node = Node.createConditionForCurrent(); conditionQueue.enqueue(node); lock.unlock(); long deadline = System.nanoTime() + nanosTimeout; long nanos = nanosTimeout; boolean interrupted = false; while (nodeNotLockEnqueued(node)) { LockSupport.parkNanos(this, nanos); nanos = deadline - System.nanoTime(); if (Thread.interrupted()) { interrupted = lockEnqueueByAwaitingThread(node); break; } if (nanos <= 0L) { lockEnqueueByAwaitingThread(node); break; } } if (lock.acquireUninterruptibly(node) && !interrupted) { Thread.currentThread().interrupt(); } if (interrupted || nanos <= 0L) { conditionQueue.removeNonConditionNodes(); } if (interrupted) { throw new InterruptedException(); } return nanos; } @Override public boolean await(long time, TimeUnit unit) throws InterruptedException { return awaitNanos(unit.toNanos(time)) > 0; } @Override public boolean awaitUntil(@Nonnull Date deadline) throws InterruptedException { return awaitNanos(TimeUnit.MILLISECONDS.toNanos(deadline.getTime() - System.currentTimeMillis())) > 0; } @Override public void signal() { Node node; do { node = conditionQueue.dequeue(); if (node == null) { return; } } while (!lockEnqueueBySignalingThread(node)); } @Override public void signalAll() { Node node; while ((node = conditionQueue.dequeue()) != null) { lockEnqueueBySignalingThread(node); } } private boolean nodeNotLockEnqueued(@Nonnull Node node) { if (node.status.get() == Node.STATUS_CONDITION || node.predecessor.get() == null) { return true; } if (node.successor.get() != null) { return false; } // status == NORMAL return !lock.queue.contains(node); } private boolean lockEnqueueByAwaitingThread(@Nonnull Node node) { if (node.status.compareAndSet(Node.STATUS_CONDITION, Node.STATUS_NORMAL)) { lock.queue.enqueue(node); return true; } // enqueuing by signal thread while (!lock.queue.contains(node)) { Thread.yield(); } return false; } private boolean lockEnqueueBySignalingThread(@Nonnull Node node) { if (!node.status.compareAndSet(Node.STATUS_CONDITION, Node.STATUS_NORMAL)) { return false; } Node predecessor = lock.queue.enqueue(node); if (predecessor.isAborted() || !predecessor.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL)) { // predecessor is aborted LockSupport.unpark(node.thread.get()); } return true; } }
在分析修改版的await之前,解释一下这里的Node。MyLock使用的Node和AQS一样,可同时用于condition queue和lock queue。这样做只是为了复用Node,你也可以选择和LockWithCondition1一样,分开使用condition queu的节点和lock queue的节点。condition queue中主要使用Node的thread,status和next,lock queue使用除next以外所有字段。为了区分condition queue和lock queue的节点,condition queue的节点一开始的状态是STATUS_CONDITION,相对的,lock queue中节点一开始为STATUS_NORMAL。前篇提到过Node中status的迁移图,不包括STATUS_CONDITION。事实上加入lock queue时节点的status不可能为STATUS_CONDITION。STATUS_CONDITION只是condition queue中的状态,这点请注意。
condition queue中节点的status只有一种迁移
STATUS_CONDITION -> STATUS_NORMAL
可能的情况有
为什么不区分异常情况?原因是异常状态是也需要加入lock queue,此时status必须是STATUS_NORMAL。你不能把一个STATUS_ABORTED的节点加入lock queue,这样你就没办法获取锁了。重申一遍,条件变量的语义中不管是正常返回,还是await超时,中断,返回时必须持有锁。
由于以上原因,status迁移只有一种。考虑到signal/signalAll时signal线程执行状态迁移,await超时和中断时await线程执行状态迁移,status迁移还必须是CAS。
condition queue的具体实现在类ConditionQueue中,这个类现阶段有3个方法
enqueue很好理解,removeNonConditionNodes主要用于在await超时和中断移除await线程的节点的。注意,只是从condition queue中移除,之后会放到lock queue中去。dequeue就是正常情况从condition queue中移除下一个节点,之后放到lock queue中去。
在上述前提下理解await和signal方法。
首先看最简单的signal方法。从condition queue中取出一个节点,如果没有节点就直接返回。有的话尝试状态迁移,失败的话,说明await线程自己修改了状态(状态迁移的情况3和4),signal会尝试下一个节点。如果状态迁移成功,此时按照前篇的要求,设置前置节点status为STATUS_SIGNAL。如果前置节点放弃了,或者CAS过程中失败了(意味着前置节点放弃了),此时signal线程能做的事情就是唤醒await线程,让await线程自己跳过放弃的前置节点。
signal线程是否可以帮await线程的节点跳过放弃的节点而不是unpark await线程?这样可以帮忙省去一次unpark/park。
回答是请分析哪些是signal线程可以做的,哪些是await线程可以做的。
从MyLock是基于CLHLock实现这点来说,设置前置节点的status是后继节点对应的线程,即await线程该做的事情。所以signal线程这里设置前置节点的status其实不是它的职责,这里个人觉得是为了减少一般情况下的unpark/park。如果碰到前置节点是放弃了的节点还考虑帮忙跳过的话,有点舍本逐末了。
signalAll和signal类似,这里不再赘述。
接下来是相对简单的awaitUninterruptibly。为什么不是await?因为await对于中断的处理有点微妙。awaitUninterruptibly不处理中断,准确来说是,碰到中断,只会重新设置中断状态。这样关注点就可以集中到中断处理以外的部分。
awaitUninterruptibly的基本流程和LockWithCondition1有所不同。考虑到park会响应中断,需要把park放在一个循环中,记录中断状态。跳过循环的唯一条件是节点已经被加入lock queue。
判断是否已加入lock queue的最直接的方法(nodeNotLockEnqueued最后)是从tail检查节点是否存在。其次是检查进入lock queue之后才会被使用的字段,比如successor。假如successor被设置,那么节点肯定被加入了lock queue。还有一种提前判断的方法,节点自身的状态。如果节点status是STATUS_CONDITION或者进入lock queue后才会被使用的predecessor字段没有被设置的话,可以断言节点还没有加入lock queue。
awaitUninterruptibly中跳出循环之后,执行acquireUninterruptibly。在AQS中这个方法被叫做acquireQueued。从行为上这个acquireXXX方法在节点被加入lock queue后执行。其次,会记录acquire过程中的中断,但是不会因为中断而停止尝试获取锁。await系列方法其实需要的就是这种获取模式,不会因为中断而放弃获取的方式。
不管是acquireUninterruptibly还是循环过程中被中断,awaitUninterruptibly最终只会重新设置中断标志,除此以外什么都不做。
在理解了acquireUninterruptibly之后,看一下await方法。
相比于await方法,循环有两个出口。一个是由signal线程放到lock queue的条件,另外一个是中断。中断时,await线程会自己尝试把节点加入lock queue。从两个lockEnqueueByXXXThread方法可以看出,存在两个线程同时把节点放入lock queue的可能性。假如signal线程先放进lock queue,那么await在CAS这一步(lockEnqueueByAwaitingThread第一行)会失败,这时必须等待signal线程enqueue操作的完成(通过yield)。假如await线程先放进lock queue,那么signal线程会失败,现有逻辑中signal线程会尝试唤醒下一个节点。
注意这里的await和AQS的await方法有些许不同。AWS的await在await线程先放入lock queue时最后会抛出InterruptedException,但是在signal线程先放入的话,会设置中断状态。这里的await,在await线程先放入时同样会抛出异常,但是在signal线程先放入的话,不会设置中断状态。这里其实有一个边界问题,在中断时恰好被signal时算不算中断?个人觉得可以不算,要算也是以留下证据为目的。
跳出循环之后,同样调用acquireUninterruptibly,按情况设置中断标志。最后在确认是中断了的情况下,从condition queue中移除自己的节点,并抛出异常。注意这里,移除节点时持有独占锁,其次,中断不代表节点仍旧在condition queue里面。一种情况是await线程比同时执行的signal先CAS成功。
考虑到这里的await和AWS的await相比还是有点差异的,如果你对于改造不太放心的话,在理解的前面内容的基础上,相信你能修改成AQS的语义。
最后是几个限时await方法,这里全部引导到同一个限时方法awaitNanos上。主体流程和await很像,有一点不同是剩余时间的处理。个人理解awaitNanos的返回值代表是是否超时,所以不能简单把timeout减去执行时间返回去调用方。具体代码这里不再分析。
总体来说,条件变量比起独占锁本身要简单。因为可以借助条件变量的特性使用同步队列。另一方面,条件变量实现中也有比较细节的问题需要考虑。
至此,ReentrantLock的实现算是真正完成了。接下来是ReentrantReadWriteLock的实现。