我们使用 Synchronized 实现“等待/通知”模式时,都是配合每个对象 Object 的监视器方法(wait 和 notify/notifyAll 方法)完成的,但是在显式锁 Lock 中,这种方法就显然不行了。因此显式锁中就引入了 Condition 接口,该接口提供了类似于 Object 的监视器方法,可以配合显式锁 Lock 实现“等待/通知”模式。
该接口位于 J.U.C
的 locks 包中,这个接口中定义的方法如下:
public interface Condition { /** * 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回 * 的情况,包括: * 其它线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒 * (1)其它线程(调用 interrupt() )中断当前线程 * (2)如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所持有的锁 */ void await() throws InterruptedException; /** * 当前线程进入等待状态直到被通知,从方法名称可以看到该方法对中断不敏感 */ void awaitUninterruptibly(); /** * 当前线程进入等待状态后直到被通知、中断或者超时,返回值表示剩余的时间,如果在nanoTimeout纳秒 * 之前被唤醒,那么返回值就是(nanosTimeout-实际耗时)。如果返回值是0或者负数,那么可以认定 * 已经超时了 */ long awaitNanos(long nanosTimeout) throws InterruptedException; /** * 当前线程进入等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,方法返回true * 否则返回false */ boolean await(long time, TimeUnit unit) throws InterruptedException; /** * 当前线程进入等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,方法返回true * 否则返回false */ boolean awaitUntil(Date deadline) throws InterruptedException; /** * 唤醒一个等待在Condition上的线程,该线程从等待方法中返回前必须获取与Condition相关的锁 */ void signal(); /** * 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获取与COndition相关的锁 */ void signalAll(); }
我们可以看到接口定义中与 Object 监视器方法基本类似,与监视器方法对比如下:
对比项 | Object 监视器方法 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock() 获取锁,调用lock.newCondition()获取Condition对象 |
调用方式 | 直接调用 | 直接调用 |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来某个时间 | 不支持 | 支持 |
唤醒等待队列中一个线程 | 支持 | 支持 |
唤醒等待队列中全部线程 | 支持 | 支持 |
编写一个 Java 应用程序,要求有三个进程:student1,student2,teacher,其中线程student1准备“睡”1分钟后再开始上课,线程 student2 准备“睡” 5 分钟后再开始上课。Teacher 在输出 4 句“上课”后,“唤醒”了休眠的线程 student1;线程 student1 被“唤醒”后,负责再“唤醒”休眠的线程 student2。
我们先使用 Synchronized 和 Object 监视器方法实现一下:
package com.fantJ.bigdata; /** * Created by Fant.J. * 2018/7/2 16:36 */ public class Ten { static class Student1{ private boolean student1Flag = false; public synchronized boolean isStudent1Flag() { System.out.println("学生1开始睡觉1min"); if (!this.student1Flag){ try { System.out.println("学生1睡着了"); wait(1*1000*60); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("学生1被唤醒"); return student1Flag; } public synchronized void setStudent1Flag(boolean student1Flag) { this.student1Flag = student1Flag; notify(); } } static class Student2{ private boolean student2Flag = false; public synchronized boolean isStudent2Flag() { System.out.println("学生2开始睡觉5min"); if (!this.student2Flag){ try { System.out.println("学生2睡着了"); wait(5*1000*60); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("学生2被唤醒"); return student2Flag; } public synchronized void setStudent2Flag(boolean student2Flag) { notify(); this.student2Flag = student2Flag; } } static class Teacher{ private boolean teacherFlag = true; public synchronized boolean isTeacherFlag() { if (!this.teacherFlag){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("老师准备吼着要上课"); return teacherFlag; } public synchronized void setTeacherFlag(boolean teacherFlag) { this.teacherFlag = teacherFlag; notify(); } } public static void main(String[] args) { Student1 student1 = new Student1(); Student2 student2 = new Student2(); Teacher teacher = new Teacher(); Thread t = new Thread(new Runnable() { @Override public void run() { for (int i = 0;i<4;i++){ System.out.println("上课"); } teacher.isTeacherFlag(); System.out.println("学生1被吵醒了,1s后反应过来"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } student1.setStudent1Flag(true); } }); Thread s1 = new Thread(new Runnable() { @Override public void run() { student1.isStudent1Flag(); System.out.println("准备唤醒学生2,唤醒需要1s"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } student2.setStudent2Flag(true); } }); Thread s2 = new Thread(new Runnable() { @Override public void run() { student2.isStudent2Flag(); } }); s1.start(); s2.start(); t.start(); } }
当然,用 notifyAll
可能会用更少的代码,这种实现方式虽然复杂,单性能上会比使用 notifyAll()
要强很多,因为没有锁争夺导致的资源浪费。但是可以看到,代码很复杂,实例与实例之间也需要保证很好的隔离。
然后再用 Condition 和 ReentrantLock 实现。
public class xxx{ private int signal = 0; public Lock lock = new ReentrantLock(); Condition teacher = lock.newCondition(); Condition student1 = lock.newCondition(); Condition student2 = lock.newCondition(); public void teacher(){ lock.lock(); while (signal != 0){ try { teacher.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("老师叫上课"); signal++; student1.signal(); lock.unlock(); } public void student1(){ lock.lock(); while (signal != 1){ try { student1.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("学生1醒了,准备叫醒学生2"); signal++; student2.signal(); lock.unlock(); } public void student2(){ lock.lock(); while (signal != 2){ try { student2.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("学生2醒了"); signal=0; teacher.signal(); lock.unlock(); } public static void main(String[] args) { ThreadCommunicate2 ten = new ThreadCommunicate2(); new Thread(() -> ten.teacher()).start(); new Thread(() -> ten.student1()).start(); new Thread(() -> ten.student2()).start(); } }
Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition()
调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在 lock.lock()
和 lock.unlock
之间才可以使用。
可以观察到,我取消了 Synchronized 方法关键字,在每个加锁的方法前后分别加了 lock.lock(); lock.unlock();
来获取/释放锁,并且在释放锁之前施放想要施放的 Condition 对象。同样的,我们使用 signal 来完成线程间的通信。
我们可以利用 Condition 来实现一个有界队列,什么叫有界队列呢?有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”。
public class MyQueue<E> { private Object[] objects; private Lock lock = new ReentrantLock(); private Condition addCDT = lock.newCondition(); private Condition rmCDT = lock.newCondition(); private int addIndex; private int rmIndex; private int queueSize; MyQueue(int size){ objects = new Object[size]; } //添加元素 public void add(E e){ lock.lock(); while (queueSize == objects.length){ try { addCDT.await(); //进入等待状态 } catch (InterruptedException e1) { e1.printStackTrace(); } } objects[addIndex] = e; System.out.println("添加了数据"+"Objects["+addIndex+"] = "+e); if (++addIndex == objects.length){ addIndex = 0; } queueSize++; rmCDT.signal(); lock.unlock(); } //删除元素 public Object remove(){ lock.lock(); while (queueSize == 0){ try { System.out.println("队列为空"); rmCDT.await(); //进入等待状态 } catch (InterruptedException e) { e.printStackTrace(); } } Object temp = objects[rmIndex]; objects[rmIndex] = null; System.out.println("移除了数据"+"Objects["+rmIndex+"] = null"); if (++rmIndex == objects.length){ rmIndex = 0; } queueSize--; addCDT.signal(); lock.unlock(); return temp; } public void foreach(E e){ if (e instanceof String){ Arrays.stream(objects).map(obj->{ if (obj == null){ obj = " "; } return obj; }).map(Object::toString).forEach(System.out::println); } if (e instanceof Integer){ Arrays.stream(objects).map(obj -> { if (obj == null ){ obj = 0; } return obj; }).map(object -> Integer.valueOf(object.toString())).forEach(System.out::println); } } }
add 方法就是往队列中添加数据。 remove 是从队列中按 FIFO 移除数据。 foreach 方法是一个观察队列内容的工具方法,很容易看出,它是用来遍历的。
测试方法:
public static void main(String[] args) { MyQueue<Integer> myQueue = new MyQueue<>(5); myQueue.add(5); myQueue.add(4); myQueue.add(3); // myQueue.add(2); // myQueue.add(1); myQueue.remove(); myQueue.foreach(5); }
添加了数据Objects[0] = 5 添加了数据Objects[1] = 4 添加了数据Objects[2] = 3 移除了数据Objects[0] = null 0 4 3 0 0
由上面两个例子,我们可以基本理解了Conditon的基本方法和作用,以及其简单的应用场景,那么我们肯定好奇这其中的方法是怎么去实现的,下面我们就来一探究竟。
我们从 Lock 的实现类 ReentrantLock 出发,看看里面是如何实现 Condition 的。
在 IDEA 中使用 double shift 打开 ReentrantLock 源码,可以看到 ReentrantLock 内部有一个静态内部类 Sync ,并且 Sync 是继承自 AQS(AbstractQueuedSynchronizer) 的,这与我们之前分析 AQS 原理是一致的,Sync 中有一个 newCondition 方法:
final ConditionObject newCondition() { return new ConditionObject(); }
可以看到其中 new 了一个 ConditionObject,然后我们使用 Ctrl + ConditionObject 打开其源码,可以看到这个类是属于 AQS 的内部类,在之前讲解 AQS 原理时没有分析其中 ConditionObject 内部实现类也是为了留到此处进行讲解。为什么 Condition 实现类需要放到 AQS 内部,其实想一下就知道,由于 Condition 的操作都需要获取相关联的锁,所以作为同步器的内部实现类也是非常合理的设计。
我们先看 ConditionObject 的实例域:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
可以看到实例变量中是有两个 Node构成的,标注为 transient 是表示这两个变量不会被序列化。那么 Node 是什么呢?Node 的类型就是 AbstractQueuedSynchronizer.Node,也就是我们在 AQS 原理中讲解的 FIFO 队列,这也说明每个 Condition 对象中都包含着一个这样的等待队列,这个队列就是 Condition 对象实现等待/通知功能的关键。
我们先回顾一下这个 FIFO 队列,Node 对应的主要字段有:
状态 | 值 | 含义 |
---|---|---|
CANCELLED | 1 | 当前节点因为超时或中断被取消同步状态获取,该节点进入该状态后不会再变化 |
SIGNAL | -1 | 标识后继的节点处于阻塞状态,当前节点在释放同步状态或被取消时,需要通知后续节点继续运行,每个节点在阻塞前,需要标记其前驱节点的状态为SIGNAL |
CONDITION | -2 | 标识当前节点是作为等待队列节点使用的 |
PROPAGATE | -3 | 表示下一次共享式同步状态获取将会无条件地被传播下去 |
0 | 0 | 初始状态 |
一个 Condition 包含一个等待队列,Condition 拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下:
如上图所示,Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。此处更新节点的过程没有使用 CAS 进行保证,这是因为调用 await() 方法的线程必定是获取了锁的线程,也就是通过锁来保证线程安全的。
在 Object 监视器模型上,一个对象拥有一个同步队列和等待队列,但是 Lock 拥有一个同步队列和多个等待队列,其对应关系如下:
如图所示,Condition 的实现是同步器的内部类,因此每个 Condition 实例都能够访问同步器提供的方法,相当于每个 Condition 都拥有所属同步器的引用。
了解了等待队列原理后,我们可以看一下 condition.await() 方法的源码:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //构造一个新的等待队列Node加入到队尾 Node node = addConditionWaiter(); //释放当前线程的独占锁,不管重入几次,都把state释放为0 int savedState = fullyRelease(node); int interruptMode = 0; //如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞 while (!isOnSyncQueue(node)) { LockSupport.park(this); //和中断相关,主要是区分两种中断:是在被signal前中断还是在被signal后中断,如果是被signal前就被中断则抛出 InterruptedException,否则执行 Thread.currentThread().interrupt(); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断则直接退出自旋 break; } //退出了上面自旋说明当前节点已经在同步队列上,但是当前节点不一定在同步队列队首。acquireQueued将阻塞直到当前节点成为队首,即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
该方法就是将当前线程构造成节点并加入到等待队列中,然后释放同步状态,并唤醒同步队列中后继节点,然后当前状态会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态,如果不是通过其它线程调用 Condition.signal() 方法唤醒,而是对等待线程进行中断,则会抛出 IntreeuptException。
而如果从队列的角度来看,这个过程如下图所示:
如图所示,同步队列的首节点不会直接加入等待队列,而是通过 addConditionWaiter() 方法把当前线程构造成一个新的节点并将其加入到等待队列中。
调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。 Node first = firstWaiter; if (first != null) doSignal(first); //通知等待队列队首的节点。 }
调用该方法的前置条件就是当前线程必须获取了锁,可以看到 signal() 方法进行了 isHeldExclusively() 检查,也就是当前线程必须是获取了锁的线程,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程,具体实现如下:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && //transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。 (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //如果当前节点状态为CONDITION,则将状态改为0准备加入同步队列;如果当前状态不为CONDITION,说明该节点等待已被中断,则该方法返回false,doSignal()方法会继续尝试唤醒当前节点的后继节点 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); //将节点加入同步队列,返回的p是节点在同步队列中的先驱节点 int ws = p.waitStatus; //如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,线程被唤醒后会执行acquireQueued方法,该方法会重新尝试将节点的先驱状态设为SIGNAL并再次park线程;如果当前设置前驱节点状态为SIGNAL成功,那么就不需要马上唤醒线程了,当它的前驱节点成为同步队列的首节点且释放同步状态后,会自动唤醒它。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
从队列的角度来看,这个过程如下图所示:
而 Condition 中的 signalAll() 方法,相当于对等待队列中每个节点均执行一次 signal() 方法,效果就是将等待队列中所有节点全部移到同步队列中,并唤醒每个节点的线程。
总的来说,Condition 的本质就是等待队列和同步队列的交互:
当一个持有锁的线程调用 Condition.await() 时,它会执行以下步骤:
当一个持有锁的线程调用 Condition.signal() 时,它会执行以下操作:
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点 CANCELLED,就尝试唤醒下一个节点;如果再 CANCELLED 则继续迭代。
对每个节点执行唤醒操作时,首先将节点加入同步队列,此时 await() 操作的步骤 3 的解锁条件就已经开启了。然后分两种情况讨论:
如果知道 Object 的等待通知机制,Condition 的使用是比较容易掌握的,因为和 Object 等待通知的使用基本一致。
对 Condition 的源码理解,主要就是理解等待队列,等待队列可以类比同步队列,而且等待队列比同步队列要简单,因为等待队列是单向队列,同步队列是双向队列。
之所以同步队列要设计成双向的,是因为在同步队列中,节点唤醒是接力式的,由每一个节点唤醒它的下一个节点,如果是由 next 指针获取下一个节点,是有可能获取失败的,因为虚拟队列每添加一个节点,是先用 CAS 把 tail 设置为新节点,然后才修改原 tail 的 next 指针到新节点的。因此用 next 向后遍历是不安全的,但是如果在设置新节点为 tail 前,为新节点设置 prev,则可以保证从 tail 往前遍历是安全的。因此要安全的获取一个节点 Node 的下一个节点,先要看 next 是不是 null,如果是 null,还要从 tail 往前遍历看看能不能遍历到 Node。
而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,因此每次要执行唤醒操作的时候,直接唤醒等待队列的首节点就行了。等待队列的实现中不需要遍历队列,因此也不需要 prev 指针。