转载

Java中的Condition接口实现分析

我们使用 Synchronized 实现“等待/通知”模式时,都是配合每个对象 Object 的监视器方法(wait 和 notify/notifyAll 方法)完成的,但是在显式锁 Lock 中,这种方法就显然不行了。因此显式锁中就引入了 Condition 接口,该接口提供了类似于 Object 的监视器方法,可以配合显式锁 Lock 实现“等待/通知”模式。

该接口位于 J.U.C 的 locks 包中,这个接口中定义的方法如下:

public interface Condition {

    /**
     * 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回
     * 的情况,包括:
     * 	其它线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒
     *   (1)其它线程(调用 interrupt() )中断当前线程
     *   (2)如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所持有的锁
     */
    void await() throws InterruptedException;

    /**
     * 当前线程进入等待状态直到被通知,从方法名称可以看到该方法对中断不敏感
     */
    void awaitUninterruptibly();

    /**
     * 当前线程进入等待状态后直到被通知、中断或者超时,返回值表示剩余的时间,如果在nanoTimeout纳秒
     * 之前被唤醒,那么返回值就是(nanosTimeout-实际耗时)。如果返回值是0或者负数,那么可以认定
     * 已经超时了
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    
    /**
     * 当前线程进入等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,方法返回true
     * 否则返回false
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 当前线程进入等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,方法返回true
     * 否则返回false
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 唤醒一个等待在Condition上的线程,该线程从等待方法中返回前必须获取与Condition相关的锁
     */
    void signal();

    /**
     * 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获取与COndition相关的锁
     */
    void signalAll();
}

我们可以看到接口定义中与 Object 监视器方法基本类似,与监视器方法对比如下:

对比项 Object 监视器方法 Condition
前置条件 获取对象的锁 调用Lock.lock() 获取锁,调用lock.newCondition()获取Condition对象
调用方式 直接调用 直接调用
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁并进入等待状态,在等待状态中不响应中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来某个时间 不支持 支持
唤醒等待队列中一个线程 支持 支持
唤醒等待队列中全部线程 支持 支持

使用示例

编写一个 Java 应用程序,要求有三个进程:student1,student2,teacher,其中线程student1准备“睡”1分钟后再开始上课,线程 student2 准备“睡” 5 分钟后再开始上课。Teacher 在输出 4 句“上课”后,“唤醒”了休眠的线程 student1;线程 student1 被“唤醒”后,负责再“唤醒”休眠的线程 student2。

我们先使用 Synchronized 和 Object 监视器方法实现一下:

package com.fantJ.bigdata;

/**
 * Created by Fant.J.
 * 2018/7/2 16:36
 */
public class Ten {
    static class Student1{
        private boolean student1Flag = false;
        public synchronized boolean isStudent1Flag() {
            System.out.println("学生1开始睡觉1min");
            if (!this.student1Flag){
                try {
                    System.out.println("学生1睡着了");
                    wait(1*1000*60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("学生1被唤醒");

            return student1Flag;
        }

        public synchronized void setStudent1Flag(boolean student1Flag) {
            this.student1Flag = student1Flag;
            notify();
        }
    }
    static class Student2{
        private boolean student2Flag = false;
        public synchronized boolean isStudent2Flag() {
            System.out.println("学生2开始睡觉5min");
            if (!this.student2Flag){
                try {
                    System.out.println("学生2睡着了");
                    wait(5*1000*60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("学生2被唤醒");
            return student2Flag;
        }

        public synchronized void setStudent2Flag(boolean student2Flag) {
            notify();
            this.student2Flag = student2Flag;
        }
    }
    static class Teacher{
        private boolean teacherFlag = true;
        public synchronized boolean isTeacherFlag() {
            if (!this.teacherFlag){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("老师准备吼着要上课");

            return teacherFlag;
        }

        public synchronized void setTeacherFlag(boolean teacherFlag) {
            this.teacherFlag = teacherFlag;
            notify();
        }
    }
    public static void main(String[] args) {
        Student1 student1 = new Student1();
        Student2 student2 = new Student2();
        Teacher teacher = new Teacher();

        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0;i<4;i++){
                    System.out.println("上课");
                }
                teacher.isTeacherFlag();
                System.out.println("学生1被吵醒了,1s后反应过来");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                student1.setStudent1Flag(true);
            }
        });
        Thread s1 = new Thread(new Runnable() {
            @Override
            public void run() {
                student1.isStudent1Flag();
                System.out.println("准备唤醒学生2,唤醒需要1s");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                student2.setStudent2Flag(true);
            }
        });
        Thread s2 = new Thread(new Runnable() {
            @Override
            public void run() {
                student2.isStudent2Flag();
            }
        });

        s1.start();
        s2.start();
        t.start();
    }
}

当然,用 notifyAll 可能会用更少的代码,这种实现方式虽然复杂,单性能上会比使用 notifyAll() 要强很多,因为没有锁争夺导致的资源浪费。但是可以看到,代码很复杂,实例与实例之间也需要保证很好的隔离。

然后再用 Condition 和 ReentrantLock 实现。

public class xxx{
        private int signal = 0;
        public Lock lock = new ReentrantLock();
        Condition teacher = lock.newCondition();
        Condition student1 = lock.newCondition();
        Condition student2 = lock.newCondition();

        public void teacher(){
            lock.lock();
            while (signal != 0){
                try {
                    teacher.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("老师叫上课");
            signal++;
            student1.signal();
            lock.unlock();
        }
        public void student1(){
            lock.lock();
            while (signal != 1){
                try {
                    student1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("学生1醒了,准备叫醒学生2");
            signal++;
            student2.signal();
            lock.unlock();
        }
        public void student2(){
            lock.lock();
            while (signal != 2){
                try {
                    student2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("学生2醒了");
            signal=0;
            teacher.signal();
            lock.unlock();
        }

        public static void main(String[] args) {
            ThreadCommunicate2 ten = new ThreadCommunicate2();
            new Thread(() -> ten.teacher()).start();
            new Thread(() -> ten.student1()).start();
            new Thread(() -> ten.student2()).start();
        }
}

Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition() 调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在 lock.lock()lock.unlock 之间才可以使用。

可以观察到,我取消了 Synchronized 方法关键字,在每个加锁的方法前后分别加了 lock.lock(); lock.unlock(); 来获取/释放锁,并且在释放锁之前施放想要施放的 Condition 对象。同样的,我们使用 signal 来完成线程间的通信。

深入理解Condition的使用方法

我们可以利用 Condition 来实现一个有界队列,什么叫有界队列呢?有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”。

public class MyQueue<E> {

    private Object[] objects;
    private Lock lock = new ReentrantLock();
    private Condition addCDT = lock.newCondition();
    private Condition rmCDT = lock.newCondition();

    private int addIndex;
    private int rmIndex;
    private int queueSize;

    MyQueue(int size){
        objects = new Object[size];
    }
    
    //添加元素
    public void add(E e){
        lock.lock();
        while (queueSize == objects.length){
            try {
                addCDT.await();   //进入等待状态
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        objects[addIndex] = e;
        System.out.println("添加了数据"+"Objects["+addIndex+"] = "+e);
        if (++addIndex == objects.length){
            addIndex = 0;
        }
        queueSize++;
        rmCDT.signal();
        lock.unlock();

    }
    
    //删除元素
    public Object remove(){
        lock.lock();
        while (queueSize == 0){
            try {
                System.out.println("队列为空");
                rmCDT.await();     //进入等待状态
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Object temp = objects[rmIndex];
        objects[rmIndex] = null;
        System.out.println("移除了数据"+"Objects["+rmIndex+"] = null");
        if (++rmIndex == objects.length){
            rmIndex = 0;
        }
        queueSize--;
        addCDT.signal();
        lock.unlock();
        return temp;
    }
    public void foreach(E e){
        if (e instanceof String){
            Arrays.stream(objects).map(obj->{
                if (obj == null){
                    obj = " ";
                }
                return obj;
            }).map(Object::toString).forEach(System.out::println);
        }
        if (e instanceof Integer){
            Arrays.stream(objects).map(obj -> {
                if (obj == null ){
                    obj = 0;
                }
                return obj;
            }).map(object -> Integer.valueOf(object.toString())).forEach(System.out::println);
        }
    }
}

add 方法就是往队列中添加数据。 remove 是从队列中按 FIFO 移除数据。 foreach 方法是一个观察队列内容的工具方法,很容易看出,它是用来遍历的。

测试方法:

    public static void main(String[] args) {
        MyQueue<Integer> myQueue = new MyQueue<>(5);
        myQueue.add(5);
        myQueue.add(4);
        myQueue.add(3);
//      myQueue.add(2);
//      myQueue.add(1);
        myQueue.remove();
        myQueue.foreach(5);
    }
添加了数据Objects[0] = 5
添加了数据Objects[1] = 4
添加了数据Objects[2] = 3
移除了数据Objects[0] = null
0
4
3
0
0

Condition源码分析

由上面两个例子,我们可以基本理解了Conditon的基本方法和作用,以及其简单的应用场景,那么我们肯定好奇这其中的方法是怎么去实现的,下面我们就来一探究竟。

我们从 Lock 的实现类 ReentrantLock 出发,看看里面是如何实现 Condition 的。

在 IDEA 中使用 double shift 打开 ReentrantLock 源码,可以看到 ReentrantLock 内部有一个静态内部类 Sync ,并且 Sync 是继承自 AQS(AbstractQueuedSynchronizer) 的,这与我们之前分析 AQS 原理是一致的,Sync 中有一个 newCondition 方法:

final ConditionObject newCondition() {
    return new ConditionObject();
}

可以看到其中 new 了一个 ConditionObject,然后我们使用 Ctrl + ConditionObject 打开其源码,可以看到这个类是属于 AQS 的内部类,在之前讲解 AQS 原理时没有分析其中 ConditionObject 内部实现类也是为了留到此处进行讲解。为什么 Condition 实现类需要放到 AQS 内部,其实想一下就知道,由于 Condition 的操作都需要获取相关联的锁,所以作为同步器的内部实现类也是非常合理的设计。

Java中的Condition接口实现分析

我们先看 ConditionObject 的实例域:

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

可以看到实例变量中是有两个 Node构成的,标注为 transient 是表示这两个变量不会被序列化。那么 Node 是什么呢?Node 的类型就是 AbstractQueuedSynchronizer.Node,也就是我们在 AQS 原理中讲解的 FIFO 队列,这也说明每个 Condition 对象中都包含着一个这样的等待队列,这个队列就是 Condition 对象实现等待/通知功能的关键。

等待队列

我们先回顾一下这个 FIFO 队列,Node 对应的主要字段有:

  1. waitStatus:等待状态,所有的状态见下面的表格。
  2. prev:前驱节点
  3. next:后继节点
  4. thread:当前节点代表的线程
  5. nextWaiter:Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用(将会在后面讲Condition时讲到)。在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;在作为等待队列节点使用时,nextWaiter保存后继节点。
状态 含义
CANCELLED 1 当前节点因为超时或中断被取消同步状态获取,该节点进入该状态后不会再变化
SIGNAL -1 标识后继的节点处于阻塞状态,当前节点在释放同步状态或被取消时,需要通知后续节点继续运行,每个节点在阻塞前,需要标记其前驱节点的状态为SIGNAL
CONDITION -2 标识当前节点是作为等待队列节点使用的
PROPAGATE -3 表示下一次共享式同步状态获取将会无条件地被传播下去
0 0 初始状态

一个 Condition 包含一个等待队列,Condition 拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下:

Java中的Condition接口实现分析

如上图所示,Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。此处更新节点的过程没有使用 CAS 进行保证,这是因为调用 await() 方法的线程必定是获取了锁的线程,也就是通过锁来保证线程安全的。

在 Object 监视器模型上,一个对象拥有一个同步队列和等待队列,但是 Lock 拥有一个同步队列和多个等待队列,其对应关系如下:

Java中的Condition接口实现分析

如图所示,Condition 的实现是同步器的内部类,因此每个 Condition 实例都能够访问同步器提供的方法,相当于每个 Condition 都拥有所属同步器的引用。

等待

了解了等待队列原理后,我们可以看一下 condition.await() 方法的源码:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    
    //构造一个新的等待队列Node加入到队尾
    Node node = addConditionWaiter();
    
    //释放当前线程的独占锁,不管重入几次,都把state释放为0
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    
    //如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //和中断相关,主要是区分两种中断:是在被signal前中断还是在被signal后中断,如果是被signal前就被中断则抛出 InterruptedException,否则执行 Thread.currentThread().interrupt();
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  //被中断则直接退出自旋
            break;
    }
    //退出了上面自旋说明当前节点已经在同步队列上,但是当前节点不一定在同步队列队首。acquireQueued将阻塞直到当前节点成为队首,即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

该方法就是将当前线程构造成节点并加入到等待队列中,然后释放同步状态,并唤醒同步队列中后继节点,然后当前状态会进入等待状态。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态,如果不是通过其它线程调用 Condition.signal() 方法唤醒,而是对等待线程进行中断,则会抛出 IntreeuptException。

而如果从队列的角度来看,这个过程如下图所示:

Java中的Condition接口实现分析

如图所示,同步队列的首节点不会直接加入等待队列,而是通过 addConditionWaiter() 方法把当前线程构造成一个新的节点并将其加入到等待队列中。

通知

调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException(); //如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); //通知等待队列队首的节点。
}

调用该方法的前置条件就是当前线程必须获取了锁,可以看到 signal() 方法进行了 isHeldExclusively() 检查,也就是当前线程必须是获取了锁的线程,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程,具体实现如下:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&   //transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    //如果当前节点状态为CONDITION,则将状态改为0准备加入同步队列;如果当前状态不为CONDITION,说明该节点等待已被中断,则该方法返回false,doSignal()方法会继续尝试唤醒当前节点的后继节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);  //将节点加入同步队列,返回的p是节点在同步队列中的先驱节点
    int ws = p.waitStatus;
    //如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,线程被唤醒后会执行acquireQueued方法,该方法会重新尝试将节点的先驱状态设为SIGNAL并再次park线程;如果当前设置前驱节点状态为SIGNAL成功,那么就不需要马上唤醒线程了,当它的前驱节点成为同步队列的首节点且释放同步状态后,会自动唤醒它。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

从队列的角度来看,这个过程如下图所示:

Java中的Condition接口实现分析

而 Condition 中的 signalAll() 方法,相当于对等待队列中每个节点均执行一次 signal() 方法,效果就是将等待队列中所有节点全部移到同步队列中,并唤醒每个节点的线程。

Condition实现等待/通知的本质

总的来说,Condition 的本质就是等待队列和同步队列的交互:

当一个持有锁的线程调用 Condition.await() 时,它会执行以下步骤:

  1. 构造一个新的等待队列节点加入到等待队列队尾
  2. 释放锁,也就是将它的同步队列节点从同步队列队首移除
  3. 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用 signal())或被中断
  4. 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。

当一个持有锁的线程调用 Condition.signal() 时,它会执行以下操作:

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点 CANCELLED,就尝试唤醒下一个节点;如果再 CANCELLED 则继续迭代。

对每个节点执行唤醒操作时,首先将节点加入同步队列,此时 await() 操作的步骤 3 的解锁条件就已经开启了。然后分两种情况讨论:

  1. 如果先驱节点的状态为 CANCELLED ( > 0 ) 或设置先驱节点的状态为 SIGNAL 失败,那么就立即唤醒当前节点对应的线程,此时 await() 方法就会完成步骤 3,进入步骤 4。
  2. 如果成功把先驱节点的状态设置为了 SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候 await() 的步骤 3 才执行完成,而且有很大概率快速完成步骤 4。

总结

如果知道 Object 的等待通知机制,Condition 的使用是比较容易掌握的,因为和 Object 等待通知的使用基本一致。

对 Condition 的源码理解,主要就是理解等待队列,等待队列可以类比同步队列,而且等待队列比同步队列要简单,因为等待队列是单向队列,同步队列是双向队列。

之所以同步队列要设计成双向的,是因为在同步队列中,节点唤醒是接力式的,由每一个节点唤醒它的下一个节点,如果是由 next 指针获取下一个节点,是有可能获取失败的,因为虚拟队列每添加一个节点,是先用 CAS 把 tail 设置为新节点,然后才修改原 tail 的 next 指针到新节点的。因此用 next 向后遍历是不安全的,但是如果在设置新节点为 tail 前,为新节点设置 prev,则可以保证从 tail 往前遍历是安全的。因此要安全的获取一个节点 Node 的下一个节点,先要看 next 是不是 null,如果是 null,还要从 tail 往前遍历看看能不能遍历到 Node。

而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,因此每次要执行唤醒操作的时候,直接唤醒等待队列的首节点就行了。等待队列的实现中不需要遍历队列,因此也不需要 prev 指针。

参考文章

  1. 方腾飞 等著 《Java 并发编程的艺术》
  2. Condition (Java 2 Platform SE 6)
  3. Java显式锁学习总结之六:Condition源码分析
  4. Java并发编程 – Condition
原文  https://bestzuo.cn/posts/java-condition.html
正文到此结束
Loading...