转载

Java并发——Condition

和基类Object的监视器方法一样,Condition接口也提供了类似方法,配合Lock可以实现等待/通知模式

Object的监视器方法与Condition接口对比:

对比项 Object监视器方法 Condition
前置条件 获取对象的锁 调用Lock.lock()获取锁
调用Lock.newCondition()获取Condition对象
调用方式 直接调用
如:object.wait()
直接调用
如:condition.wait()
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁并进入等待状态,在等待状态中不响应中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

Condition实现

在介绍AQS同步器时知道其维护了 一个 同步队列,其实还维护了 多个 等待队列, 两队列均为FIFO队列 ,F4看Condition实现类,可以发现其只有这两个类(这两个类区别在于state同步状态一个int,一个long)

Java并发——Condition

等待队列

具体来看下Condition实现类AQS中内部类ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        // 头节点
        private transient Node firstWaiter;
        // 尾结点
        private transient Node lastWaiter;

        public ConditionObject() { }
        ...
    }    
复制代码

结构如图( 单向队列 ):

Java并发——Condition

await()

await()方法过程相当于 同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中

public final void await() throws InterruptedException {
            // 判断当前线程是否中断
            if (Thread.interrupted())
                throw new InterruptedException();
            // 当前线程加入等待队列    
            Node node = addConditionWaiter();
            // 释放同步状态
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判断此节点是否在同步队列中,若不在直至在同步队列为止
            while (!isOnSyncQueue(node)) {
                // 阻塞当前线程
                LockSupport.park(this);
                // 若线程已经中断则退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 竞争同步状态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
复制代码

先是判断线程是否中断,若中断直接抛出异常,否则调用addConditionWaiter()将线程包装成节点加入等待队列

private Node addConditionWaiter() {
            // 获取等待队列的尾节点
            Node t = lastWaiter;
            // 若尾节点状态不为CONDITION,清除节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 清除等待队列中所有状态不为CONDITION的节点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 将当前线程包装成Node节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 尾插节点
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            // 将节点置为尾节点    
            lastWaiter = node;
            return node;
        }
复制代码

从源码来看将节点加入等待队列 并没有使用CAS ,因为调用await()方法的线程必定是获取了锁的线程,即此过程是由锁来保证线程安全,成功加入等待队列后,调用fullyRelease()释放同步状态

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取同步状态
            int savedState = getState();
            // 释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
复制代码

调用AQS的模板方法release()方法释放同步状态并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,否则抛出异常。随后调用isOnSyncQueue()判断节点是否在同步队列

final boolean isOnSyncQueue(Node node) {
        // 若状态为CONDITION或者前驱节点为null
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 若后继节点不为null,表明节点肯定在同步队列中    
        if (node.next != null) // If has successor, it must be on queue
            return true;
        // 从同步队列尾节点找节点    
        return findNodeFromTail(node);
    }
复制代码

若节点不在同步队列会一直在while循环体中,当 此线程被中断或者线程关联的节点被移动到了同步队列中(即另外线程调用的condition的signal或者signalAll方法)会结束循环调用acquireQueued()方法获取,否则会在循环体中通过LockSupport.park()方法阻塞线程

await()方法示意:

Java并发——Condition

signal()/signalAll()

  • signal()
  • public final void signal() {
            // 判断当前线程是否为获取锁的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 唤醒条件队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
    复制代码

    isHeldExclusively()方法需要子类重写,其目的在于判断当前线程是否为获取锁的线程

    protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    复制代码

    doSignal

    private void doSignal(Node first) {
                do {
                    // 将头节点从等待队列中移除
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    复制代码

    doSignal()将头节点移出等待队列,再调用transferForSignal()方法将节点添加到同步队列中

    final boolean transferForSignal(Node node) {
            // 将node节点状态由CONDITION  CAS置为初始状态0
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            // 将node节点尾插同步队列,返回插入后同步队列中node节点的前驱节点
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    复制代码

    signal()方法示意:

    Java并发——Condition
  • signalAll()
  • private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                // 将等待队列中节点从头节点开始逐个移出等待队列,添加到同步队列
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    复制代码

    signalAll()方法相当于对等待队列中的每个节点均执行一次signal()方法,将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程

    原文  https://juejin.im/post/5b69a5a151882563522b7e42
    正文到此结束
    Loading...