之前在介绍synchronized时说过,要实现锁只需要添加个关键字即可,对于程序员来说,它的内部实现是透明的,这种方式叫隐式锁。
而从java1.5起,并发大神Doug Lea开拓性地引入了一种程序员可以显示地操作锁的接口-Lock以及它的实现类,所以以Lock接口为代表的锁可以看做是显示锁,而Lock最常用的实现则是ReentrantLock了,本文也主要围绕ReentrantLock的原理来揭开显示锁神秘的面纱~
开始之前,要先介绍一下ReentrantLock这座大厦的基础:Lock、AQS、LockSupport。
Lock是一个接口,主要提供了以下几个方法:
//获取锁,获取不到时阻塞,直到h获取成功 void lock(); //释放锁 void unlock(); //尝试获取锁,获取成功返回ture,失败则返回false,不阻塞 boolean tryLock(); //尝试获取锁,可以指定尝试时间,若指定时间内还没获取到则返回false,获取到返回true boolean tryLock(long time, TimeUnit unit) throws InterruptedEx //创建一个显示条件 ception;Condition newCondition(); 复制代码
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 序列串 private static final long serialVersionUID = 7373984972572414691L; // 默认构造 protected AbstractQueuedSynchronizer() { } // 等待锁队列头结点 private transient volatile Node head; // 等待锁队列尾节点 private transient volatile Node tail; // 同步状态 0-表示未加锁 其他非0值N表示已锁且N为重入次数 private volatile int state; // 内部节点类-等待锁队列的节点 static final class Node { /* 两种锁的模式 */ // 共享锁模式 static final Node SHARED = new Node(); // 独占锁模式 static final Node EXCLUSIVE = null; /* 等待状态-值范围 */ // 表示当前线程取消获取锁 static final int CANCELLED = 1; // 表示队列中的后继线程需要被唤醒 static final int SIGNAL = -1; // 表示线程正在等待条件 static final int CONDITION = -2; // 表示下一个共享请求应无条件传播 static final int PROPAGATE = -3; // 等着状态本尊 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // 等待获取锁的线程 volatile Thread thread; // 下一个等待者 Node nextWaiter; // 判断是否是共享锁模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } // Used by addWaiter 通过addWaiter创建 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // Used by Condition 通过条件队列创建 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } } 复制代码
AQS全称AbstractQueuedSynchronizer,顾名思义也就是“抽象队列同步器”,它不是一个可以直接拿来使用的类,而是将锁的公共操作功能提取成一个抽象类,供锁的具体实现类使用,比如后面会讲到的ReentrantLock就是基于AQS实现锁的操作的。AQS内部使用多个Node连接成的队列来实现锁等待线程的管理,当锁释放时,会唤醒head的下一个线程来竞争锁。内部队列图如下:
LockSupport常用于实现线程的阻塞,类似于wait()和notify()的组合,但是与它们不一样的是LockSupport不需要依赖于Synchronized。
public class LockSupport { // 私有构造方法 private LockSupport() {} // Cannot be instantiated. // 阻塞当前线程,如果掉用unpark(Thread)方法或被中断,才能从park()返回 public static void park() { UNSAFE.park(false, 0L); } // 阻塞当前线程,超时后返回,阻塞时间最长不超过nanos纳秒 public static void parkNanos(long nanos) { if (nanos > 0) UNSAFE.park(false, nanos); } // 阻塞当前线程,直到指定时间点返回 public static void parkUntil(long deadline) { UNSAFE.park(true, deadline); } // 唤醒指定线程 public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } // 设置阻塞对象 private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. UNSAFE.putObject(t, parkBlockerOffset, arg); } // 阻塞当前线程,并指定要求阻塞的发起者 public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } // 阻塞当前线程,并指定要求阻塞的发起者,超时后返回,阻塞时间最长不超过nanos纳秒 public static void parkNanos(Object blocker, long nanos) { if (nanos > 0) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, nanos); setBlocker(t, null); } } // 阻塞当前线程,并指定要求阻塞的发起者,直到deadline时间点 public static void parkUntil(Object blocker, long deadline) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(true, deadline); setBlocker(t, null); } // 获取阻塞对象 public static Object getBlocker(Thread t) { if (t == null) throw new NullPointerException(); return UNSAFE.getObjectVolatile(t, parkBlockerOffset); } } 复制代码
从LockSupport源码中可以看出,它内部实现线程阻塞都是通过Unsafe类中的native方法来完成的,Unsafe类简单说就是提供了类似C/C++直接修改内存的操作,在java中认为是不安全的,所以一般要经过安全授信才能使用。它内部的大部分代码最终实现是其他语言实现的,所以在java中不能直接看到实现,不过可以通过openjdk的源码查看,具体就不详述了。
主要介绍下LockSuppport中的几个常用方法:
方法 | 说明 |
---|---|
park() | 阻塞当前线程,直到对该线程执行unpark方法唤醒或线程中断返回 |
parkNanos(long nanos) | 阻塞当前线程,允许指定阻塞时间,超时后返回,单位是纳秒 |
parkUntil(long deadline) | 阻塞当前线程,允许在指定时间到之前返回 |
unpark(Thread thread) | 唤醒指定线程 |
public class ReentrantLock implements Lock, java.io.Serializable { // 序列号 private static final long serialVersionUID = 7373984872572414699L; // 内部同步类Syn的一个属性,供public方法中调用AQS方法 private final Sync sync; // 内部类Syn 继承 AQS,供内部的公平和非公平两种锁类继承 abstract static class Sync extends AbstractQueuedSynchronizer {} // 非公平锁类 static final class NonfairSync extends Sync {} // 公平锁类 static final class FairSync extends Sync {} // 默认构造方法,默认是非公平锁 public ReentrantLock() { sync = new NonfairSync(); } // 用于指定锁方式的构造方法 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } // 普通加锁方法,会阻塞直到成功 public void lock() // 普通加锁方法,与lock不同的是它支持响应中断 public void lockInterruptibly() // 尝试获取锁方法,不管成功还是失败都是立即返回结果,不阻塞 public boolean tryLock() // 尝试获取锁方法,成功立即返回,失败则阻塞一定时间,超时未成功则结束阻塞返回异常。同时支持响应中断请求 public boolean tryLock(long timeout, TimeUnit unit) // 普通解锁方法 public void unlock() // 新建条件 public Condition newCondition() public int getHoldCount() public boolean isHeldByCurrentThread() public boolean isLocked() public final boolean isFair() protected Thread getOwner() public final boolean hasQueuedThreads() public final boolean hasQueuedThread(Thread thread) public final int getQueueLength() protected Collection<Thread> getQueuedThreads() public boolean hasWaiters(Condition condition) public int getWaitQueueLength(Condition condition) protected Collection<Thread> getWaitingThreads(Condition condition) } 复制代码
ReentrantLock内部构造:
简单说明下:
该方法用于直接加锁,并且会一直阻塞到加锁成功(或者线程中断),先简单总结下它的流程:
接下来看下它的源码实现:
public void lock() { sync.lock(); } 复制代码
实际调用那种lock方式取决于当前的ReentrantLock是公平锁还是非公平锁,这里以非公平锁的方式进行解析。
// 静态类NonfairSync 内部方法 final void lock() { // 原子性校验并设置锁 if (compareAndSetState(0, 1)) // 如果获取到锁则更设置当前线程为独占锁的线程 setExclusiveOwnerThread(Thread.currentThread()); else // 如果获取锁失败则继续尝试 acquire(1); } 复制代码
acquire()是AQS中的方法,用于阻塞获取锁。
public final void acquire(int arg) { // 尝试一次获取锁,成功直接返回,失败则继续自旋几遍锁后再视情况阻塞 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果获取锁失败且阻塞获取锁的时候收到了中断请求则执行中断线程操作 selfInterrupt(); } 复制代码
tryAcquire()方法在AQS中定义但是没有实现,由它的子类来重写。此处是要执行的是NofairSyn中的重写方法。
// AQS中定义 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // NofairSyn中的实现 protected final boolean tryAcquire(int acquires) { // 实际调用的是Syn中的方法 return nonfairTryAcquire(acquires); } // Syn中真实实现tryAcquire的方法 final boolean nonfairTryAcquire(int acquires) { // 获取当前执行的线程对象 final Thread current = Thread.currentThread(); // 获取当前的同步状态 int c = getState(); // 同步状态为无状态时,说明此时锁是空的 if (c == 0) { // 尝试获取锁 if (compareAndSetState(0, acquires)) { // 获取锁成功后设置当前线程为独占锁线程 setExclusiveOwnerThread(current); return true; } } // 当已经被锁时,校验当前获取锁的线程是不是自己(是自己则可重入) else if (current == getExclusiveOwnerThread()) { // 重新获取锁次数累加 int nextc = c + acquires; // 如果超出最大锁定次数则抛出异常 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 设置锁状态为重入次数 setState(nextc); return true; } // 否则获取锁失败直接返回false return false; } 复制代码
addWaiter()方法用于为当前线程和给定模式创建队列节点。
private Node addWaiter(Node mode) { // 创建指定模式的节点,有两种模式:共享和独占 Node node = new Node(Thread.currentThread(), mode); // 首先尝试在队列尾部加入 Node pred = tail; // 队列不为空 if (pred != null) { // 在尾部加入队列 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果失败则执行插入方式 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 队列为空则执行初始化操作,新建一个节点作为头节点 if (compareAndSetHead(new Node())) tail = head; } else { // 否则直接在尾部插入 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
acquireQueued()方法用于自旋检查节点是否在head节点,是则尝试获取锁。否则继续自旋,但是不是一直无休止的自旋下去(太消耗性能了),而是在每次自旋后都会检查是否需要继续自旋,不需要则通过LockSupport.park()释放CPU,进入休眠等待唤醒状态。
final boolean acquireQueued(final Node node, int arg) { // 默认会执行失败,以便在最后执行失败处理 boolean failed = true; try { // 默认中断标志为false-不中断 boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 如果前驱节点是head节点 并且 尝试获取锁成功 if (p == head && tryAcquire(arg)) { // 将本节点设置为head节点 setHead(node); // 将此前的head节点的后继置为空,减少引用,帮助GC回收 p.next = null; // help GC failed = false; return interrupted; } // 每次获取锁失败都检查下是否需要将本线程休眠 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 休眠结束后如果收到了中断请求,则返回中断标志 interrupted = true; } } finally { // 如果失败则执行失败处理,主要包括更新节点的waitStatus为Canceled,将将其中等待队列中移除 if (failed) cancelAcquire(node); } } 复制代码
shouldParkAfterFailedAcquire()方法用于检查当前线程是否需要停止自旋,转为阻塞休眠。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱节点的等待状态 int ws = pred.waitStatus; // 如果前驱节点状态为SIGNAL,说明前置Node的线程已经阻塞休眠了,此时后面的线程也都需要休眠了 if (ws == Node.SIGNAL) return true; // ws大于0说明前置节点已经取消或者中断了,此时只是将前置节点移除队列 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // ws < 0说明此时前置节点也正在自旋中,故不再继续自旋,将自己的状态改为SIGNAL,以便让后面的节点都直接进入阻塞睡眠中 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
parkAndCheckInterrupt()方法用于执行线程休眠并判断中断标志的:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
cancelAcquire()方法用于取消请求获取锁的一些操作:
private void cancelAcquire(Node node) { // 如果节点不存在直接返回 if (node == null) return; // 置节点线程为空 node.thread = null; // 跳过已经取消的前驱,将node的前驱指向前面第一个不为空的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 备份前驱的后继 Node predNext = pred.next; // 直接更新waitStatus为Canceled node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. // 如果当前节点是尾节点 则设置当前节点的前驱节点为尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 由于pred变成了尾节点,故要将它的后继置为null compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // 前驱不为head // 前驱的状态为SIGNAL 或 将前驱的状态更新为SIGNAL成功 // 前驱的线程不为空 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; // 如果后继不为空,且未取消则更新前驱的后继 为 node的后继 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 唤醒下一个不为空的节点 unparkSuccessor(node); } node.next = node; // help GC } } 复制代码
unparkSuccessor()方法用于唤醒队列中下一个不为空的节点线程
private void unparkSuccessor(Node node) { // 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁。 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 唤醒不为空的后继节点,如果为空,则总尾部开始向前遍历到首部第一个不为空的节点唤醒 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果存在未取消的等待节点则执行唤醒 if (s != null) LockSupport.unpark(s.thread); } 复制代码
该方法主要用于将已获取的锁释放掉,主要包含以下几步:
public void unlock() { sync.release(1); } public final boolean release(int arg) { // 尝试释放锁成功 if (tryRelease(arg)) { // 获取锁等待对了head节点 Node h = head; // head不为空 且 waitStatus不为0 表示有等待锁的线程 if (h != null && h.waitStatus != 0) // 唤醒需锁的线程 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // 当前状态-1,第一次获取锁后state为1,后面每重入锁一次就累加一次 int c = getState() - releases; // 检查当前线程是不是该锁的独占线程,若不是抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 计算后的状态值为0,表示可用释放锁了 if (c == 0) { // 释放成功标识置为true free = true; // 设置该锁的独占线程为null setExclusiveOwnerThread(null); } // 更新锁状态 setState(c); return free; } 复制代码
该方法主要用于尝试获取锁,根据参数不同,有两种类型:
不阻塞的方法比较简单,内部只是调用了非公平锁中的nonfairTryAcquire方法。
public boolean tryLock() { return sync.nonfairTryAcquire(1); } 复制代码
超时阻塞其实是在非阻塞的基础上加了类似acquireQueued()的自旋获取锁的过程,但是另外增加了超时时间的校验。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 超时时间设置为0则与非阻塞tryLock一样了 if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; // 创建节点并添加到锁等待队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 前驱节点为head 且 尝试获取锁成功 if (p == head && tryAcquire(arg)) { // 设置当前节点为head节点 setHead(node); p.next = null; // help GC failed = false; return true; } // 计算阻塞时间是否超时 超时则直接返回 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; // 与acquireQueued()类似检查是否符合阻塞休眠条件 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 检查是否有线程中断请求 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码
上面介绍的都是基于非公平锁的方式,而公平锁其实大体上与非公平锁的实现相同,不同的是:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果锁空闲时,则先检查是否有等待线程队列,没有才继续参与竞争锁,有的话则直接返回false,后续则acquireQueued方法进入等待队列 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { …… } return false; } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // h != t 表示等待队列为空,则直接返回false 在tryAcquire中会继续竞争锁 // (s = h.next) == null || s.thread != Thread.currentThread() 表示自己就是队列的尾节点 或者 head的下一个节点是自己 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 复制代码
理论上,在高并发场景中,公平锁是保证获取锁顺序按照他们请求锁的顺序来,等待的线程要阻塞休眠。每次锁释放后都要去唤醒阻塞的线程,因为阻塞和唤醒线程有个切换的过程,频繁的操作有一定的性能消耗。
而非公平锁是只有刚请求锁时,锁被占用的情况下会加入等待队列,后续操作也类似公平锁。但是如果一个线程释放锁后,刚进来一个线程立即获取到了锁则不需要去唤醒阻塞的线程,也就避免了这部分的性能开销。
所以非公平锁的性能比公平锁好,ReentrantLock也是默认非公平锁的。
一句话说明ReentrantLock的原理:基于CAS原子方式竞争锁,并在高并发获取锁时,为避免一直死循环CAS而将等待线程放入到AQS同步队列中,并通过LockSupport实现线程的阻塞和唤醒。