ReentrantLock是一个可重入且独占式的锁,它具有与使用synchronized监视器锁相同的基本行为和语义,但与synchronized关键字相比,它更灵活、更强大,增加了轮询、超时、中断等高级功能。ReentrantLock,顾名思义,它是支持可重入锁的锁,是一种递归无阻塞的同步机制。除此之外,该锁还支持获取锁时的公平和非公平选择。ReentrantLock是基于AQS的,建议先去看下这篇文章https://juejin.im/post/5cee4e61e51d455c8838e0ec
ReentrantLock的类图如下:
ReentrantLock的内部类Sync继承了AQS,分为公平锁FairSync和非公平锁NonfairSync。如果在绝对时间上,先对锁进行获取的请求你一定先被满足,那么这个锁是公平的,反之,是不公平的。公平锁的获取,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock的公平与否,可以通过它的构造函数来决定。 事实上,公平锁往往没有非公平锁的效率高,但是,并不是任何场景都是以TPS作为唯一指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越能够得到优先满足。 下面我们着重分析ReentrantLock是如何实现重进入和公平性获取锁的特性
ReentrantLock的构造函数为:
public ReentrantLock() { sync = new NonfairSync(); } //通过传入一个布尔值来设置公平锁,为true则是公平锁,false则为非公平锁 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }复制代码
首先来看内部类Sync对AQS的实现(Sync还有两个子类,分别对应公平和非公平锁实现
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; //交给公平和非公平的子类去实现 abstract void lock(); //非公平的排它尝试获取锁实现 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果AQS的state为0说明获得锁,并且对state加1,其他线程获取锁时被阻塞 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //判断线程是不是重新获取锁,如果是 无需排队,对AQS的state+1处理,这就是重入锁的实现 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;//都不满足获取锁失败,进入AQS队列阻塞 } //公平的排它尝试释放锁实现 protected final boolean tryRelease(int releases) { int c = getState() - releases;//对应重入锁而言,在释放锁时对AQS的state字段减1 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//如果AQS的状态字段已变为0,说明该锁被释放 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //判断是否是当前线程持有锁 protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } //获取条件队列 final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * 说明ReentrantLock是可序列化的 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } 复制代码
非公平锁
非公平锁实现:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //判断当前state是否为0,如果为0直接通过cas修改状态,并获取锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);//否则进行排队 } //调用父类的的非公平尝试获取锁 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } 复制代码
非公平锁的实现很简单,在lock获取锁时首先判断判断当前锁是否可以用(AQS的state状态值是否为0),如果是 直接“插队”获取锁,否则进入排队队列,并阻塞当前线程。
公平锁
公平锁实现:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1);//获取公平,每次都需要进入队列排队 } /** * 公平锁实现 尝试获取实现方法, * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //AQS队列为空,或者当前线程是头节点 即可获的锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //重入锁实现 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 复制代码
可见公平锁和非公平锁区别在于tryAcquire方法中判断条件多了hasQueuedPredecessors()方法 ,该方法定义如下:
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. // 同步队列尾节点 Node t = tail; // Read fields in reverse initialization order // 同步队列头节点 Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }复制代码
ReentrantLock的获取与释放锁操作都是委托给该同步组件来实现的。
public void lock() { sync.lock();//根据构造器中的公平锁和非公平锁走不同逻辑 }复制代码
成功获取锁的线程在完成业务逻辑之后,需要调用unlock()来释放锁:
public void unlock() { sync.release(1); } 复制代码
unlock()调用 Sync 类的release(int)方法释放锁,release(int)方法是定义在AQS中的方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
tryRelease(int)是子类需要实现的方法(这是在Sync中实现的方法):
protected final boolean tryRelease(int releases) { // 计算新的状态值 int c = getState() - releases; // 判断当前线程是否是持有锁的线程,如果不是的话,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 新的状态值是否为0,若为0,则表示该锁已经完全释放了,其他线程可以获取同步状态了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 更新状态值 setState(c); return free; } 复制代码
如果该锁被获取n次,那么前(n-1)次tryRelease(int)方法必须返回false,只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当状态为0时,将占有线程设为null,并返回true,表示释放成功。
Condition必须被绑定到一个独占锁上使用,在ReentrantLock中,有一个newCondition方法,该方法调用了Sync中的newCondition方法,看下Sync中newCondition的实现:
final ConditionObject newCondition() { return new ConditionObject(); } 复制代码
ConditionObject是在AQS中定义的,它实现了Condition接口,自然也就实现了上述的Condition接口中的方法。该类有两个重要的变量:
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; 复制代码
这里的firstWaiter和lastWaiter是不是和之前说过的head和tail有些类似,而且都是Node类型的。对于Condition来说,它是不与独占模式或共享模式使用相同的队列的,它有自己的队列,所以这两个变量表示了队列的头节点和尾节点。
Condition Queue 是一个并发不安全的, 只用于独占模式的队列(PS: 为什么是并发不安全的呢? 主要是在操作 Condition 时, 线程必需获取 独占的 lock, 所以不需要考虑并发的安全问题); 而当Node存在于 Condition Queue 里面, 则其只有 waitStatus, thread, nextWaiter 有值, 其他的都是null(其中的 waitStatus 只能是 CONDITION, 0 ( 0 代表node进行转移到 Sync Queue里面, 或被中断/timeout ).这里有个注意点, 就是当线程被中断或获取 lock 超时, 则一瞬间 node 会存在于 Condition Queue, Sync Queue 两个队列中
节点 Node4, Node5, Node6, Node7 都是调用 Condition.awaitXX 方法加入 Condition Queue(PS: 加入后会将原来的 lock 释放)。
Condition的关键方法await()
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //1.如果线程中断抛出InterruptedException Node node = addConditionWaiter(); //2.调用addConditionWaiter将当前线程如等待队列 int savedState = fullyRelease(node);//3.释放当前线程占用的锁 int interruptMode = 0; //记录在条件队列中中断情况 //4.判断是否在sync队列上,如果节点WaitStatus=CONDTTION或者节点prev为null,那么节点一定 //不再sync队列,如果节点next不为空,那么一定在sync队列上,或者跟sync队列节点逐一比较。 while (!isOnSyncQueue(node)) { LockSupport.park(this); //5.挂起当前线程 /******************************此处是await()方法的分割线到这里先看signal()方法在回过头看后面代码*/ //此时节点已经被signal()方法加入到同步队列中了,然后调用acquireQueued进行自旋或者挂起等待锁。 //6. 如果等待过程发生中断,中断唤醒发生在signal()之前就throw InterruptedException, //如果在之后就调用selfInterrupt()标记线程,判断逻辑见下面详解 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //7.调用acquireQueued进行自旋或者挂起等待锁.条件1表示同步队列等待过程 //中中断过,条件2是为了兼容2中情况,就是条件队列中断和同步队列中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //如果中断超时的话,通过checkInterruptWhileWaiting转移到同步队列的nextWaiter是不为空的 if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) //9.判断抛出InterruptedException还是调用selfInterrupt() reportInterruptAfterWait(interruptMode); } } 复制代码
await()逻辑:
final boolean isOnSyncQueue(Node node) { //下面2个条件用于快速判断是否在队列中,结合队列转移加入到同步队列过程理解 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); } 复制代码
该方法判断当前线程的node是否在Sync队列中。
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } 复制代码
该方法就是从Sync队列尾部开始判断,因为在isOnSyncQueue方法调用该方法时,node.prev一定不为null。但这时的node可能还没有完全添加到Sync队列中,这时可能是在自旋中。见AQS的enq方法,signal的时候会调用这个方法:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 执行findNodeFromTail方法时可能一直在此自旋 if (compareAndSetTail(t, node)) { t.next = node; // 主要这一步没有执行成功,但是node.prev是不为空的 return t; } } } } 复制代码
将当前线程封装成一个 Node 节点放入到 Condition Queue 里面大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况。
private Node addConditionWaiter(){ Node t = lastWaiter; // Condition queue 的尾节点 // 尾节点已经Cancel, 直接进行清除, /** * 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其 * 他await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await * 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal * 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会) */ if(t != null && t.waitStatus != Node.CONDITION){ /** * 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行 * 删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 * (signal/timeout/interrupt)) */ unlinkCancelledWaiters(); t = lastWaiter; } //将线程封装成 node 准备放入 Condition Queue 里面 Node node = new Node(Thread.currentThread(), Node.CONDITION); if(t == null){ //Condition Queue 是空的 firstWaiter = node; } else { // 追加到 queue 尾部 t.nextWaiter = node; } lastWaiter = node; return node; }复制代码
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { //持有的锁资源全部释放 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } 复制代码
ReentrantLock的release方法,该方法在unlock方法中被调用:
public void unlock() { sync.release(1); } 复制代码
在unlock时传入的参数是1,因为是可重入的原因,只有在state为0的时候才会真的释放锁,所以在fullyRelease方法中,需要 将之前加入的锁的次数全部释放 ,目的是将该线程从Sync队列中移出。
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } 复制代码
//这是中断超时才会调用的方法 final boolean transferAfterCancelledWait(Node node) { //如果状态不是-2为0,说明已经转移或者正在转移 //如果还在条件队列中,则修改状态为0加入同步队列 if (compareAndSetWaitStatus(node,Node.CONDITION, 0)) { enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) Thread.yield(); //让出执行时间,当前节点正在被其它线程转移到同步队列 return false; } 复制代码
该方法是判断,在线程中断的时候,是否这时有signal方法的调用。
当Node在Condition Queue 中, 若状态不是 CONDITION, 则一定是被中断或超时。在调用 addConditionWaiter 将线程放入 Condition Queue 里面时或 awiat 方法获取结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点。这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点。
private void unlinkCancelledWaiters(){ Node t = firstWaiter; Node trail = null; while(t != null){ Node next = t.nextWaiter; // 1. 先初始化 next 节点 if(t.waitStatus != Node.CONDITION){ // 2. 节点不有效, 在Condition Queue 里面 // Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的) t.nextWaiter = null; // 3. Node.nextWaiter 置空 if(trail == null){ // 4. 一次都没有遇到有效的节点 firstWaiter = next;// 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, // 这只是一个临时处理) } else { trail.nextWaiter = next; // 6. next 赋值给 trail.nextWaiter, 这一步其实就是 //删除节点 t } if(next == null){ // 7. next == null 说明 已经 traverse 完了 //Condition Queue lastWaiter = trail; } }else{ trail = t; // 8. 将有效节点赋值给 trail } t = next; } } 复制代码
//该方法根据interruptMode来确定是应该抛出InterruptedException还是继续中断。 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } 复制代码
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } //找到第一个waitStatus=CONDITION的节点,将此节点waitStatus=0,入sync队列 //不是CONDITION的节点全部从条件队列中断开连接 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } 复制代码
transferForSignal只有在节点被正常唤醒才调用的正常转移的方法。 将Node 从Condition Queue 转移到 Sync Queue 里面在调用transferForSignal之前, 会 first.nextWaiter = null;而我们发现若节点是因为 timeout / interrupt 进行转移, 则不会进行这步操作; 两种情况的转移都会把 wautStatus 置为 0 ( 中断或者超时是通过checkInterruptWhileWaiting方法,正常唤醒是通过doSignal方法加入同步队列的 )
final boolean transferForSignal(Node node){ /** * If cannot change waitStatus, the node has been cancelled */ // 1. 若 node 不是CONDITION说明已经被其他线程调用signal()加入到sync队列,或者已经中断,则失败 // 返回false 上层方法会从链表中继续转移下一个 if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ return false; } // 2. ws成功设置为0则加入 Sync Queue Node p = enq(node); int ws = p.waitStatus; // 3. 这里的 ws > 0 指Sync Queue 中node 的前继节点cancelled 了, 所以, 唤醒一下 node ; // compareAndSetWaitStatus(p, ws, Node.SIGNAL)失败, 则说明 前继节点已经变成 SIGNAL //或 cancelled, 所以也要 唤醒 if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ LockSupport.unpark(node.thread); } return true; } 复制代码
下面画图理解一下: