使用过 ReentrantLock
的盆友应该也知道 Condition
的存在。先讲解下它存在的意义:就是仿照实现 Object
类的 wait
signal
signallAll
等函数功能的。
这里引申一个面试常问到的问题: wait
会释放锁, sleep
不会。
Condition
的通常使用场景是这样的: 生产者消费者模型,假设生产者只有在生产队列为空时才进行生产,则代码类似如下: Condition emptyCondition = ReentrantLock.newCondition(); Runnable consumer = new Runnable() { public void run() { if(queue.isEmpty()) { emptyCondition.signal(); // emptyObj.notify(); } else { consumer.consume(); } } } Runnable provider = new Runnable() { public void run() { emptyCondition.wait(); // emptyObj.wait(); providerInstance.produce(); } } 复制代码
所以我们可以知道 Condition
设计的意义了。下面我们来讲解下其实现原理。
还记得在 AQS:JAVA经典之锁实现算法(一) 提到的锁实现的 Sync Queue
吗? Condition
的实现是类似的原理: 每个 AQS
里有x(视你 newCondition
几次)个 Condition Queue
,它的结点类也是 AQS
内部类 Node
。 Node
里有一个 nextWaiter
,指向下一个在同一 Condition Queue
里的 Node
。 结构如下图:
condition.wait
一定是在成功 lock
的线程里调用才有效,不然不符合逻辑,同时也会抛出 IlleagleMornitorException
。 Sync Queue
的队首,当调用 condition.wait
时,该线程会释放锁(即将 AQS
的 state
置为0),同时唤醒后继结点,后继结点在 acquire
的循环里会成功获取锁,然后将自己所在结点置为队首,然后开始自己线程自己的业务代码。 这个过程看下图:
condition
的 signal
后,在 Condition Queue
中的 Node
会从 Condition Queue
中出队,进入 Sync Queue
队列,开始它的锁竞争的过程。 过程看下图: 所以,这里可以看出来,即使是被 signal
了,被 signal
的线程也不是直接就开始跑,而是再次进入 Sync Queue
开始竞争锁而已。这里的这个逻辑,跟 Object.wait Object.signal
也是完全一样的。
我们先看一段运用到 condition
的代码案例: 假设生成者在生产队列 queue
为空时 emptyCondition.signal
才进行生产操作
ReentrantLock locker = new ReentrantLock(); Condition emptyCondition = locker.newCondition(); Runnable consumer = new Runnable() { public void run() { locker.lock(); if (queue.isEmpty()) { emptyCondition.signal(); } else { ... } locker.unlock(); } }; Runnable producer = new Runnable() { public void run() { locker.lock(); emptyCondition.wait(); // 开始生产 ... locker.unlock(); } } 复制代码
我们从消费者一步一步走,拟定如下这样一套线程切换逻辑:
producer#lock consumer#lock producer#await consumer#signal consumer#unlock producer#unlock
(先从 Sync Queue Condition Queue
图解讲一遍,然后对应图解,对着代码撸一遍)
producer#lock
生产者直接获取锁成功,入队 Sync Queue
,位队首
consumer#lock
消费者竞争锁失败,进入 Sync Queue
等待获取锁
producer#await
生产者进入等待,释放锁,出 Sync Queue
,进入 Condition Queue
,等待 emptyCondition
来唤醒。
consumer#signal
消费者唤起生产者,生产者 consumer
的 node
自 Condition Queue
转移到 Sync Queue
开始竞争锁。
consumer.unlock
consumer
释放锁后, consumer
的 node
从 Sync Queue
出队,释放 state
,唤醒后继结点 provider#node
, provider
抢占到锁。
provider#unlock
这里就没有啥好说的了。
当然,我为了讲解过程,像在锁被第一次成功获取的时候,逻辑上虽然并不是直接进入 Sync Queue
我也给讲解成直接进入 Sync Queue
了,这是为了缩减边边角角的小逻辑,讲清楚主线逻辑。大家看明白主逻辑,然后再自己去撸一遍,就融会贯通了。
provider.lock
final void lock() { // 这就直接获取锁成功了,没有else的逻辑了 if (compareAndSetState(0, 1)) // 这个方法是AQS类用来设置拥有锁的线程实例 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 复制代码
consumer#lock
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); // consumer.lock就要走这里了,因为上面的compareAndSetState // 返回false else acquire(1); } 复制代码
protected final boolean compareAndSetState(int expect, int update) { // 楼下这个是CAS原理进行值修改,CAS就对比乐观锁来, // 这里想要修改this这个对象的state字段,如果state是expect // 则修改至update,返回true;否则false。我们知道provider.lock // 已经将state 改为非0值了,所以这里肯定失败啦 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } 复制代码
provider#await
先简单看下 Condition
类对象结构
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; } ... 复制代码
一个 Condition
对象就是一条链队,头尾结点在 Condition
的内部字段指定 firstWaiter lastWaiter
。
看 await
方法
public final void await() throws InterruptedException { // 因为await是响应中断的等待,这里就是检验下, // 通常而言,凡是throws InterruptedException的, // 开头基本都是这句 if (Thread.interrupted()) throw new InterruptedException(); // 这里是向condition queue中插入一个node,并返回之, // 插入了这个node,就代表当前线程在condition queue // 中开始等待了 Node node = addConditionWaiter(); // 这个是AQS释放锁方法,加个fully,就是用来将多次 // 获取锁一次性都释放掉,然后将锁获取次数返回, // 留着后面signal后成功获取锁的时候,还要加锁同样的 // 次数。 // !!!同时注意,这里唤醒了后继结点!后集结点就继续开始 // 竞争锁,就是在acquire那个自旋方法里,记得吗 // 不记得去看看文章(一) int savedState = fullyRelease(node); // 记录当前线程中断的标记 int interruptMode = 0; // 判断当前的node是否已经转移到sync queue里了。 // 转移了,说明这个node已经开始竞争锁了,不用再等待 // 唤醒了,没转,继续自旋 while (!isOnSyncQueue(node)) { // 这里把当前线程给挂起了 LockSupport.park(this); // 这里的方法checkxxx就是用来检查waiting自旋期间,线程有没有 // interrupt掉。因为await方法是响应线程中断的。 // 若interrupt了,则在checkxxx方法里,会将node转移到 // sync Queue中,去竞争,不要担心,因为同时 // 会设置interruptMode,在最后会根据其值抛Interrupted // 异常。。 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 那什么时候就结束上面的自旋呢?一个是当前的线程被 // signal了,那node就被transfer到sync queue了,while // 就不满足了。再一个就是线程中断了,在while循环体里给break掉了 } // 跳出来后,紧接着去竞争锁,知道成功为止。&& 后面这个THROW_IE,标识 // 要抛出异常,不是的话,就是REINTERRPUT,代表保证线程的中断标记不被 // 重置即可。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 这儿是在condition queue里有多个waiter的时候才起作用,主要用来将 // CANCEL的结点从链队中剔除掉 // 具体大家自己看吧。现在忽略这 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 这儿就是处理interruptMode中断标记字段的逻辑 // 在reportxxx中,interruptMode为THROW_IE,则抛出 // 异常,不是,则保证线程的中断field不被重置为“未中断”即可 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 复制代码
consumer#signal
consumer
在调用 emptyCondition.signal
的时候,会影响到 emptyCondition
的 condition queue
中的等待线程,这里 具体指上面的provider#await方法。
public final void signal() { // 先判断下,lock锁是不是在调用signal方法的当前线程手里 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 取到condition queue里的第一个waiter node,这里也就是 // consumer,因为它第一个await进入condition queue了 Node first = firstWaiter; // 这里去进行了具体的signal操作,具体会做先把waiter node的waitStatus // 从CONDITION状态改为入Sync Queue的正常状态值0 // 然后修改Sync Queue 的Head Tail等,让其入队成功 // 最后再从其前驱结点的状态值上确保当前结点能够被唤起即可。 // 这里是因为这个waitStatus值对后继结点的行为是有影响的,像SIGNAL指 // 的是在结点释放后,要去唤醒后继结点 // if (first != null) doSignal(first); } 复制代码
consumer#unlock
unlock
具体调用的 AQS
的 release()
方法
public void unlock() { sync.release(1); } // AQS.release public final boolean release(int arg) { // tryRelease,这里由NonFairSync实现,具体就是通过 // CAS去修改state值,并判断是否成功释放锁 if (tryRelease(arg)) { // 成功释放了,则在waitStatus 不是初始状态时,去唤醒后继, // 这个 != 0 来做判断的原因,就要综合所有情况, // 像FailSync NonFairSync / Exclusive / Share // 等所有情况来看这里的waitSTatus都会处于什么状态。 // 全撸一遍的话,会发现这里的 != 0能够涵盖以上所有情况。 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
provider#unlock
这里就同理上面了。