转载

JDK并发之独占锁ReentrantLock以及Condition

简介

ReentrantLock是一个可重入且独占式的锁,它具有与使用synchronized监视器锁相同的基本行为和语义,但与synchronized关键字相比,它更灵活、更强大,增加了轮询、超时、中断等高级功能。ReentrantLock,顾名思义,它是支持可重入锁的锁,是一种递归无阻塞的同步机制。除此之外,该锁还支持获取锁时的公平和非公平选择。ReentrantLock是基于AQS的,建议先去看下这篇文章https://juejin.im/post/5cee4e61e51d455c8838e0ec

ReentrantLock的类图如下:

JDK并发之独占锁ReentrantLock以及Condition

ReentrantLock的内部类Sync继承了AQS,分为公平锁FairSync和非公平锁NonfairSync。如果在绝对时间上,先对锁进行获取的请求你一定先被满足,那么这个锁是公平的,反之,是不公平的。公平锁的获取,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock的公平与否,可以通过它的构造函数来决定。 事实上,公平锁往往没有非公平锁的效率高,但是,并不是任何场景都是以TPS作为唯一指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越能够得到优先满足。 下面我们着重分析ReentrantLock是如何实现重进入和公平性获取锁的特性

获取锁

ReentrantLock的构造函数为:

public ReentrantLock() {    
    sync = new NonfairSync();
}
//通过传入一个布尔值来设置公平锁,为true则是公平锁,false则为非公平锁
public ReentrantLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();
}复制代码

公平锁与非公平锁

首先来看内部类Sync对AQS的实现(Sync还有两个子类,分别对应公平和非公平锁实现

abstract static class Sync extends AbstractQueuedSynchronizer {  
    private static final long serialVersionUID = -5179523762034025860L;  
   
    //交给公平和非公平的子类去实现  
    abstract void lock();  
   
    //非公平的排它尝试获取锁实现  
    final boolean nonfairTryAcquire(int acquires) {  
        final Thread current = Thread.currentThread();  
        int c = getState();  
        //如果AQS的state为0说明获得锁,并且对state加1,其他线程获取锁时被阻塞  
        if (c == 0) {  
            if (compareAndSetState(0, acquires)) {  
                setExclusiveOwnerThread(current);  
                return true;  
            }  
        }  
         
        //判断线程是不是重新获取锁,如果是 无需排队,对AQS的state+1处理,这就是重入锁的实现  
        else if (current == getExclusiveOwnerThread()) {  
            int nextc = c + acquires;  
            if (nextc < 0) // overflow  
                throw new Error("Maximum lock count exceeded");  
            setState(nextc);  
            return true;  
        }  
        return false;//都不满足获取锁失败,进入AQS队列阻塞  
    }  
   
    //公平的排它尝试释放锁实现  
    protected final boolean tryRelease(int releases) {  
        int c = getState() - releases;//对应重入锁而言,在释放锁时对AQS的state字段减1  
        if (Thread.currentThread() != getExclusiveOwnerThread())  
            throw new IllegalMonitorStateException();  
        boolean free = false;  
        if (c == 0) {//如果AQS的状态字段已变为0,说明该锁被释放  
            free = true;  
            setExclusiveOwnerThread(null);  
        }  
        setState(c);  
        return free;  
    }  
   
    //判断是否是当前线程持有锁  
    protected final boolean isHeldExclusively() {  
        // While we must in general read state before owner,  
        // we don't need to do so to check if current thread is owner  
        return getExclusiveOwnerThread() == Thread.currentThread();  
    }  
   
    //获取条件队列  
    final ConditionObject newCondition() {  
        return new ConditionObject();  
    }  
   
    // Methods relayed from outer class  
   
    final Thread getOwner() {  
        return getState() == 0 ? null : getExclusiveOwnerThread();  
    }  
   
    final int getHoldCount() {  
        return isHeldExclusively() ? getState() : 0;  
    }  
   
    final boolean isLocked() {  
        return getState() != 0;  
    }  
   
    /** 
     * 说明ReentrantLock是可序列化的 
     */  
    private void readObject(java.io.ObjectInputStream s)  
            throws java.io.IOException, ClassNotFoundException {  
        s.defaultReadObject();  
        setState(0); // reset to unlocked state  
    }  
}  
复制代码

非公平锁

非公平锁实现:

static final class NonfairSync extends Sync {  
    private static final long serialVersionUID = 7316153563782823691L;  
   
    /** 
     * Performs lock.  Try immediate barge, backing up to normal 
     * acquire on failure. 
     */  
    final void lock() {  
        //判断当前state是否为0,如果为0直接通过cas修改状态,并获取锁  
        if (compareAndSetState(0, 1))  
            setExclusiveOwnerThread(Thread.currentThread());  
        else  
            acquire(1);//否则进行排队  
    }  
   
    //调用父类的的非公平尝试获取锁  
    protected final boolean tryAcquire(int acquires) {  
        return nonfairTryAcquire(acquires);  
    }  
}  
复制代码

非公平锁的实现很简单,在lock获取锁时首先判断判断当前锁是否可以用(AQS的state状态值是否为0),如果是 直接“插队”获取锁,否则进入排队队列,并阻塞当前线程。

公平锁

公平锁实现:

static final class FairSync extends Sync {  
    private static final long serialVersionUID = -3000897897090466540L;  
   
    final void lock() {  
        acquire(1);//获取公平,每次都需要进入队列排队  
    }  
   
    /** 
     * 公平锁实现 尝试获取实现方法, 
     * Fair version of tryAcquire.  Don't grant access unless 
     * recursive call or no waiters or is first. 
     */  
    protected final boolean tryAcquire(int acquires) {  
        final Thread current = Thread.currentThread();  
        int c = getState();  
        if (c == 0) {  
            //AQS队列为空,或者当前线程是头节点 即可获的锁  
            if (!hasQueuedPredecessors() &&  
                    compareAndSetState(0, acquires)) {  
                setExclusiveOwnerThread(current);  
                return true;  
            }  
        }  
        //重入锁实现  
        else if (current == getExclusiveOwnerThread()) {  
            int nextc = c + acquires;  
            if (nextc < 0)  
                throw new Error("Maximum lock count exceeded");  
            setState(nextc);  
            return true;  
        }  
        return false;  
    }  
}  复制代码

可见公平锁和非公平锁区别在于tryAcquire方法中判断条件多了hasQueuedPredecessors()方法 ,该方法定义如下:

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    // 同步队列尾节点
    Node t = tail; // Read fields in reverse initialization order
    // 同步队列头节点
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}复制代码

ReentrantLock的获取与释放锁操作都是委托给该同步组件来实现的。

public void lock() {  
     sync.lock();//根据构造器中的公平锁和非公平锁走不同逻辑
}复制代码

成功获取锁的线程在完成业务逻辑之后,需要调用unlock()来释放锁:

public void unlock() {
    sync.release(1);
}
复制代码

unlock()调用 Sync 类的release(int)方法释放锁,release(int)方法是定义在AQS中的方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

复制代码

tryRelease(int)是子类需要实现的方法(这是在Sync中实现的方法):

protected final boolean tryRelease(int releases) {
    // 计算新的状态值
    int c = getState() - releases;
    // 判断当前线程是否是持有锁的线程,如果不是的话,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 新的状态值是否为0,若为0,则表示该锁已经完全释放了,其他线程可以获取同步状态了
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 更新状态值
    setState(c);
    return free;
}
复制代码

如果该锁被获取n次,那么前(n-1)次tryRelease(int)方法必须返回false,只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当状态为0时,将占有线程设为null,并返回true,表示释放成功。

Condition Queue

Condition必须被绑定到一个独占锁上使用,在ReentrantLock中,有一个newCondition方法,该方法调用了Sync中的newCondition方法,看下Sync中newCondition的实现:

final ConditionObject newCondition() {
    return new ConditionObject();
}
复制代码

ConditionObject是在AQS中定义的,它实现了Condition接口,自然也就实现了上述的Condition接口中的方法。该类有两个重要的变量:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

复制代码

这里的firstWaiter和lastWaiter是不是和之前说过的head和tail有些类似,而且都是Node类型的。对于Condition来说,它是不与独占模式或共享模式使用相同的队列的,它有自己的队列,所以这两个变量表示了队列的头节点和尾节点。

Condition Queue 是一个并发不安全的, 只用于独占模式的队列(PS: 为什么是并发不安全的呢? 主要是在操作 Condition 时, 线程必需获取 独占的 lock, 所以不需要考虑并发的安全问题); 而当Node存在于 Condition Queue 里面, 则其只有 waitStatus, thread, nextWaiter 有值, 其他的都是null(其中的 waitStatus 只能是 CONDITION, 0 ( 0 代表node进行转移到 Sync Queue里面, 或被中断/timeout ).这里有个注意点, 就是当线程被中断或获取 lock 超时, 则一瞬间 node 会存在于 Condition Queue, Sync Queue 两个队列中

JDK并发之独占锁ReentrantLock以及Condition

节点 Node4, Node5, Node6, Node7 都是调用 Condition.awaitXX 方法加入 Condition Queue(PS: 加入后会将原来的 lock 释放)。

Condition的关键方法await()

public final void await() throws InterruptedException {
            if (Thread.interrupted())            
                throw new InterruptedException(); //1.如果线程中断抛出InterruptedException
            Node node = addConditionWaiter();  //2.调用addConditionWaiter将当前线程如等待队列
            int savedState = fullyRelease(node);//3.释放当前线程占用的锁
            int interruptMode = 0; //记录在条件队列中中断情况

            //4.判断是否在sync队列上,如果节点WaitStatus=CONDTTION或者节点prev为null,那么节点一定
            //不再sync队列,如果节点next不为空,那么一定在sync队列上,或者跟sync队列节点逐一比较。
            while (!isOnSyncQueue(node)) { 
                 LockSupport.park(this);   //5.挂起当前线程
 
/******************************此处是await()方法的分割线到这里先看signal()方法在回过头看后面代码*/
 
//此时节点已经被signal()方法加入到同步队列中了,然后调用acquireQueued进行自旋或者挂起等待锁。
 
                //6. 如果等待过程发生中断,中断唤醒发生在signal()之前就throw InterruptedException,
                //如果在之后就调用selfInterrupt()标记线程,判断逻辑见下面详解
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;                    
                }
           //7.调用acquireQueued进行自旋或者挂起等待锁.条件1表示同步队列等待过程
           //中中断过,条件2是为了兼容2中情况,就是条件队列中断和同步队列中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;   

             //如果中断超时的话,通过checkInterruptWhileWaiting转移到同步队列的nextWaiter是不为空的
             if (node.nextWaiter != null) 
                unlinkCancelledWaiters();     
            if (interruptMode != 0)
                //9.判断抛出InterruptedException还是调用selfInterrupt()
                reportInterruptAfterWait(interruptMode);
        }
}		
复制代码

await()逻辑:

  1. 如果当前线程中断,抛出InterruptedException
  2.  获取当前线程的state数值,然后通过release方法释放state,也就是释放锁。
  3.  挂起直到对应的signal()方法或者被中断。
  4.  唤醒后调用acquireQueued()去尝试获取锁,到这步就和线程刚进入同步队列去争夺锁步骤一样了。
  5. 注意,如果3中的中断唤醒发生在signal()之前就throw InterruptedException,如果在之后就调用selfInterrupt()标记线程中断。

isOnSyncQueue方法

final boolean isOnSyncQueue(Node node) {
    //下面2个条件用于快速判断是否在队列中,结合队列转移加入到同步队列过程理解
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}
复制代码

该方法判断当前线程的node是否在Sync队列中。

  1. 如果当前线程node的状态是CONDITION或者node.prev为null时说明已经在Condition队列中了,所以返回false; 
  2. 如果node.next不为null,说明在Sync队列中,返回true; 
  3. 如果两个if都未返回时,可以断定 node的prev一定不为null,next一定为null ,这个时候可能node正处于放入Sync队列的执行CAS操作执行过程中。也可能已经转移成功了,通过findNodeFromTail判断

findNodeFromTail方法

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}
复制代码

该方法就是从Sync队列尾部开始判断,因为在isOnSyncQueue方法调用该方法时,node.prev一定不为null。但这时的node可能还没有完全添加到Sync队列中,这时可能是在自旋中。见AQS的enq方法,signal的时候会调用这个方法:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // 执行findNodeFromTail方法时可能一直在此自旋 
            if (compareAndSetTail(t, node)) {
                t.next = node; // 主要这一步没有执行成功,但是node.prev是不为空的
                return t;
            }
        }
    }
}
复制代码

入队列方法 addConditionWaiter

将当前线程封装成一个 Node 节点放入到 Condition Queue 里面大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况。

private Node addConditionWaiter(){
    Node t = lastWaiter;                                
    // Condition queue 的尾节点           
	// 尾节点已经Cancel, 直接进行清除,
    /** 
    * 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其
    * 他await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await
    * 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 
    * 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会)
    */
    if(t != null && t.waitStatus != Node.CONDITION){
    	/** 
    	* 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行	
	* 删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 
    	* (signal/timeout/interrupt))
    	*/
        unlinkCancelledWaiters();                     
        t = lastWaiter;                     
    }
    //将线程封装成 node 准备放入 Condition Queue 里面
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if(t == null){
    	//Condition Queue 是空的
        firstWaiter = node;                           
    } else {
    	// 追加到 queue 尾部
        t.nextWaiter = node;                          
    }
    lastWaiter = node;                               
    return node;
}复制代码

fullyRelease方法

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {  //持有的锁资源全部释放
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
复制代码

ReentrantLock的release方法,该方法在unlock方法中被调用:

public void unlock() {
    sync.release(1);
}
复制代码

在unlock时传入的参数是1,因为是可重入的原因,只有在state为0的时候才会真的释放锁,所以在fullyRelease方法中,需要 将之前加入的锁的次数全部释放 ,目的是将该线程从Sync队列中移出。

checkInterruptWhileWaiting方法

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
复制代码

transferAfterCancelledWait方法

//这是中断超时才会调用的方法
final boolean transferAfterCancelledWait(Node node) {
    //如果状态不是-2为0,说明已经转移或者正在转移
    //如果还在条件队列中,则修改状态为0加入同步队列
    if (compareAndSetWaitStatus(node,Node.CONDITION, 0)) {
        enq(node);  
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    while (!isOnSyncQueue(node))
        Thread.yield(); //让出执行时间,当前节点正在被其它线程转移到同步队列
    return false;
}
复制代码

该方法是判断,在线程中断的时候,是否这时有signal方法的调用。

  1. 如果compareAndSetWaitStatus(node, Node.CONDITION, 0)执行成功, 则说明中断发生时,没有signal的调用,因为signal方法会将状态设置为0 ; 
  2. 如果第1步执行成功,则将node添加到Sync队列中,并返回true,表示中断在signal之前; 
  3. 如果第1步失败,说明节点正在转移到同步队列中,则检查当前线程的node是否已经在Sync队列中了,如果不在Sync队列中,则让步给其他线程执行,直到当前的node已经被signal方法添加到Sync队列中;
  4.  返回false

删除Cancelled节点的方法 unlinkCancelledWaiters

当Node在Condition Queue 中, 若状态不是 CONDITION, 则一定是被中断或超时。在调用 addConditionWaiter 将线程放入 Condition Queue 里面时或 awiat 方法获取结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点。这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点。

private void unlinkCancelledWaiters(){
    Node t = firstWaiter;
    Node trail = null;
    while(t != null){
        Node next = t.nextWaiter;  // 1. 先初始化 next 节点
        if(t.waitStatus != Node.CONDITION){  // 2. 节点不有效, 在Condition Queue 里面
         // Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)

            t.nextWaiter = null;   // 3. Node.nextWaiter 置空
            if(trail == null){     // 4. 一次都没有遇到有效的节点
                firstWaiter = next;// 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的,
                                      // 这只是一个临时处理)
            } else {
                trail.nextWaiter = next;  // 6. next 赋值给 trail.nextWaiter, 这一步其实就是
                                          //删除节点 t
            }
            if(next == null){       // 7. next == null 说明 已经 traverse 完了 
                                    //Condition Queue
                lastWaiter = trail;
            }
        }else{
            trail = t;               // 8. 将有效节点赋值给 trail
        }
        t = next;
    }
}
复制代码

reportInterruptAfterWait方法

//该方法根据interruptMode来确定是应该抛出InterruptedException还是继续中断。
private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
复制代码

Condition的关键方法signal()

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
     }
    //找到第一个waitStatus=CONDITION的节点,将此节点waitStatus=0,入sync队列
     //不是CONDITION的节点全部从条件队列中断开连接
    private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null; 
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
复制代码

转移节点的方法 transferForSignal

transferForSignal只有在节点被正常唤醒才调用的正常转移的方法。 将Node 从Condition Queue 转移到 Sync Queue 里面在调用transferForSignal之前, 会 first.nextWaiter = null;而我们发现若节点是因为 timeout / interrupt 进行转移, 则不会进行这步操作; 两种情况的转移都会把 wautStatus 置为 0 ( 中断或者超时是通过checkInterruptWhileWaiting方法,正常唤醒是通过doSignal方法加入同步队列的 )

final boolean transferForSignal(Node node){
    /**
     * If cannot change waitStatus, the node has been cancelled
     */
    // 1. 若 node 不是CONDITION说明已经被其他线程调用signal()加入到sync队列,或者已经中断,则失败
    // 返回false 上层方法会从链表中继续转移下一个
    if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ 
        return false;
    }
    // 2. ws成功设置为0则加入 Sync Queue
    Node p = enq(node);                              
    int ws = p.waitStatus;

     // 3. 这里的 ws > 0 指Sync Queue 中node 的前继节点cancelled 了, 所以, 唤醒一下 node ; 
    // compareAndSetWaitStatus(p, ws, Node.SIGNAL)失败, 则说明 前继节点已经变成 SIGNAL 
    //或 cancelled, 所以也要 唤醒
    if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
        LockSupport.unpark(node.thread);
    }
    return true;
}
复制代码

下面画图理解一下:

JDK并发之独占锁ReentrantLock以及Condition

原文  https://juejin.im/post/5cf5c8fbe51d4510a5033571
正文到此结束
Loading...