为了节省各位的时间,我简单介绍一下这篇文章。这篇文章主要分为三块:Lock的实现,AQS的由来(通过演变的方式),JUC三大工具类的使用与原理剖析。
Lock的实现:简单介绍ReentrantLock,ReentrantReadWriteLock两种JUC下经典Lock的实现,并通过手写简化版的ReentrantLock和ReentrantReadWriteLock,从而了解其实现原理。
AQS的由来:通过对两个简化版Lock的多次迭代,从而获得AQS。并且最终的Lock实现了J.U.C下Lock接口,既可以使用我们演变出来的AQS,也可以对接JUC下的AQS。这样一方面可以帮助大家理解AQS,另一方面大家可以从中了解,如何利用AQS实现自定义Lock。而这儿,对后续JUC下的三大Lock工具的理解有非常大的帮助。
JUC三大工具:经过前两个部分的学习,这个部分不要太easy。可以很容易地理解CountDownLatch,Semaphore,CyclicBarrier的内部运行及实现原理。
不过,由于这三块内容较多,所以我将它拆分为三篇子文章进行论述。
Lock接口位于J.U.C下locks包内,其定义了Lock应该具备的方法。
Lock 方法签名:
ReentrantLock是一个可重入锁,一个悲观锁,默认是非公平锁(但是可以通过Constructor设置为公平锁)。
ReentrantLock通过构造方法获得lock对象。利用lock.lock()方法对当前线程进行加锁操作,利用lock.unlock()方法对当前线程进行释放锁操作。
通过
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition();
获得Condition对象(Condition是J.U.C下locks包下的接口)。
通过Condition对象的.await(*),可以将当前线程的线程状态切换到Waiting状态(如果是有参,则是Time Waiting状态)。而.signal(),.signalAll()等方法则正好相反,恢复线程状态为Runnable状态。
ReentrantLock和Synchronized功能类似,更加灵活,当然,也更加手动了。
大家都知道,只有涉及资源的竞争时,采用同步的必要。写操作自然属于资源的竞争,但是读操作并不属于资源的竞争行为。简单说,就是写操作最多只能一个线程(因为写操作涉及数据改变,多个线程同时写,会产生资源同步问题),而读操作可以有多个(因为不涉及数据改变)。
所以在读多写少的场景下,ReentrantLock就比较浪费资源了。这就需要一种能够区分读写操作的锁,那就是ReentrantReadWriteLock。通过ReentrantReadWriteLock,可以获得读锁与写锁。当写锁存在时,有且只能有一个线程持有锁。当写锁不存在时,可以有多个线程持有读锁(写锁,必须等待读锁释放完,才可以持有锁)。
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); readLock.lock(); readLock.unlock(); readLock.newCondition(); ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); writeLock.lock(); writeLock.unlock(); writeLock.newCondition();
与之前ReentrantLock应用的区别,就是需要通过lock.readLock()与lock.writeLock()来获取读锁,写锁,再进行加锁,释放锁的操作,以及Condition的获取操作。
终于上大餐了。
首先第一步操作,我们需要确定我们要做什么。
我们要做一个锁,这里姑且命名为JarryReentrantLock。
这个锁,需要具备以下特性:可重入锁,悲观锁。
另外,为了更加规范,以后更好地融入到AQS中,该锁需要实现Lock接口。
而Lock的方法签名,在文章一开始,就已经写了,这里不再赘述。
当然,我们这里只是一个demo,所以就不实现Condition了。另外tryLock(long,TimeUnit)也不再实现,因为实现了整体后,这个实现其实并没有想象中那么困难。
既然需要已经确定,并且API也确定了。
那么第二步操作,就是简单思考一下,如何实现。
首先,我们需要一个owner属性,来保存持有锁的线程对象。
其次,由于是可重入锁,所以我们需要一个count来保存重入次数。
最后,我们需要一个waiters属性,来保存那些竞争锁失败后,还在等待(不死不休型)的线程对象。
那么接下来,就根据之前的信息,进行编码吧。
package tech.jarry.learning.netease; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; /** * @Description: 仿ReentrantLock,实现其基本功能及特性 * @Author: jarry */ public class JarryReentrantLock implements Lock { // 加锁计数器 private AtomicInteger count = new AtomicInteger(0); // 锁持有者 private AtomicReference<Thread> owner = new AtomicReference<>(); // 等待池 private LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(); @Override public boolean tryLock() { // 判断当前count是否为0 int countValue = count.get(); if (countValue != 0){ // countValue不为0,意味着锁被线程持有 // 进而判断锁的持有者owner是否为当前线程 if (Thread.currentThread() == owner.get()){ // 锁的持有者为当前线程,那么就重入加锁 // 既然锁已经被当前线程占有,那么就不用担心count被其他线程修改,即不需要使用CAS count.set(countValue+1); // 执行重入锁,表示当前线程获得了锁 return true; }else{ // 如果当前线程不是锁的持有者,返回false(该方法是tryLock,即浅尝辄止) return false; } }else { // countValue为0,意味着当前锁不被任何线程持有 // 通过CAS操作将count修改为1 if (count.compareAndSet(countValue,countValue+1)){ // count修改成功,意味着该线程获得了锁(只有一个CAS成功修改count,那么这个CAS的线程就是锁的持有者) // 至于这里为什么不用担心可见性,其实一开始我也比较担心其发生类似doubleCheck中重排序造成的问题(tryUnlock是会设置null的) // 看了下源码,AtomicReference中的value是volatile的 owner.set(Thread.currentThread()); return true; } else { // CAS操作失败,表示当前线程没有成功修改count,即获取锁失败 return false; } } } @Override public void lock() { // lock()【不死不休型】就等于执行tryLock()失败后,仍然不断尝试获取锁 if (!tryLock()){ // 尝试获取锁失败后,就只能进入等待队列waiers,等待机会,继续tryLock() waiters.offer(Thread.currentThread()); // 通过自旋,不断尝试获取锁 // 其实我一开始也不是很理解为什么这样写,就可以确保每个执行lock()的线程就在一直竞争锁。其实,想一想执行lock()的线程都有这个循环。 // 每次unlock,都会将等待队列的头部唤醒(unpark),那么处在等待队列头部的线程就会继续尝试获取锁,等待队列的其它线程仍然,继续阻塞(park) // 这也是为什么需要在循环体中执行一个检测当前线程是否为等待队列头元素等一系列操作。 // 另外,还有就是:处于等待状态的线程可能收到错误警报和伪唤醒,如果不在循环中检测等待条件,程序就会在没有满足结束条件的情况下退出。反正最后无论那个分支,都return,结束方法了。 // 即使没有伪唤醒问题,while还是需要的,因为线程需要二次尝试获得锁 while (true){ // 获取等待队列waiters的头元素(peek表示获取头元素,但不删除。poll表示获取头元素,并删除其在队列中的位置) Thread head = waiters.peek(); // 如果当前线程就是等待队列中的头元素head,说明当前等待队列就刚刚加入的元素。 if (head == Thread.currentThread()){ // 尝试再次获得锁 if (!tryLock()){ // 再次尝试获取锁失败,即将该线程(即当前线程)挂起, LockSupport.park(); } else { // 获取锁成功,即将该线程(等待队列的头元素)从等待队列waiters中移除 waiters.poll(); return; } } else { // 如果等待队列的头元素head,不是当前线程,表示等待队列在当前线程加入前,就还有别的线程在等待 LockSupport.park(); } } } } private boolean tryUnlock() { // 首先确定当前线程是否为锁持有者 if (Thread.currentThread() != owner.get()){ // 如果当前线程不是锁的持有者,就抛出一个异常 throw new IllegalMonitorStateException(); } else { // 如果当前线程是锁的持有者,就先count-1 // 另外,同一时间执行解锁的只可能是锁的持有者线程,故不用担心原子性问题(原子性问题只有在多线程情况下讨论,才有意义) int countValue = count.get(); int countNextValue = countValue - 1; count.compareAndSet(countValue,countNextValue); if (countNextValue == 0){ // 如果当前count为0,意味着锁的持有者已经完全解锁成功,故应当失去锁的持有(即设置owner为null) // 其实我一开始挺纠结的,这里为什么需要使用CAS操作呢。反正只有当前线程才可以走到程序这里。 // 首先,为什么使用CAS。由于count已经设置为0,其它线程已经可以修改count,修改owner了。所以不用CAS就可能将owner=otherThread设置为owner=null了,最终的结果就是彻底卡死 //TODO_FINISHED 但是unlock()中的unpark未执行,根本就不会有其它线程啊。囧 // 这里代码还是为了体现源码的一些特性。实际源码是将这些所的特性,抽象到了更高的层次,形成一个AQS。 // 虽然tryUnlock是由实现子类实现,但countNextValue是来自countValue(而放在JarryReadWriteLock中就是writeCount),在AQS源码中,则是通过state实现 // 其次,有没有ABA问题。由于ABA需要将CAS的expect值修改为currentThread,而当前线程只能单线程执行,所以不会。 // 最后,这里owner设置为null的操作到底需不需要。实际源码可能是需要的,但是这里貌似真的不需要。 owner.compareAndSet(Thread.currentThread(),null); // 解锁成功 return true; } else { // count不为0,解锁尚未完全完成 return false; } } } @Override public void unlock() { if (tryUnlock()){ // 如果当前线程成功tryUnlock,就表示当前锁被空置出来了。那就需要从备胎中,啊呸,从waiters中“放“出来一个 Thread head = waiters.peek(); // 这里需要做一个简单的判断,防止waiters为空时,抛出异常 if (head != null){ LockSupport.unpark(head); } } } // 非核心功能就不实现了,起码现在不实现了。 @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
这里就不进行一些解释了。因为需要的解释,在注释中都写的很明确了,包括我踩的一些坑。
如果依旧有一些看不懂的地方,或者错误的地方,欢迎@我,或者私信我。
与ReentrantLock一样,首先第一步操作,我们需要确定我们要做什么。
我们要做一个锁,这里姑且命名为JarryReadWriteLock。
这个锁,需要具备以下特性:读写锁,可重入锁,悲观锁。
一方面了为了更好理解(第一版本,重在理解基础,不是嘛),另一方面也是为了更好地复用前面ReentrantLock的代码(毕竟ReentrantLock其实就是读写锁的写锁,不是嘛),这里的JarryReadWriteLock的API不再与官方的ReentrantReadWriteLock相同,而是做了小小调整。直接调用相关读锁的加解锁API,已经相关写锁的加解锁API。具体看代码部分。
既然需要已经确定,并且API也确定了。
那么第二步操作,就是简单思考一下,如何实现。
首先,我们需要一个owner属性,来保存持有写锁的线程对象。
其次,由于写锁是可重入锁,所以我们需要一个readCount来保存重入次数。
然后,由于读锁是可以有多个线程持有的,所以我们需要一个writeCount来保存读锁持有线程数。
最后,我们需要一个waiters属性,来保存那些竞争锁失败后,还在等待(不死不休型)的线程对象。
到这这里,就不禁会有一个疑问。如何判断尝试获取锁的线程想要获得的锁是什么类型的锁。在API调用阶段,我们可以根据API判断。但是放入等待队列后,我们如何判断呢?如果还是如之前那样,等待队列只是保存竞争锁的线程对象,是完全不够的。
所以我们需要新建一个WaitNode的Class,用来保存等待队列中线程对象及相关必要信息。所以,WaitNode会有如下属性:
package tech.jarry.learning.netease; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; /** * @Description: * @Author: jarry */ public class JarryReadWriteLock { // 用于读锁(共享锁)的锁计数器 这里真的有必要volatile嘛(Atomic中的value时volatile的),再看看后续代码 // 这里确实不需要volatile,至于源码,更过分,源码是通过一个变量state的位运算实现readCount与writeCount volatile AtomicInteger readCount = new AtomicInteger(0); // 用于写锁(独占锁)的锁计数器 这里之所以不用volatile是因为独占锁,只有一个线程在改变writeCount(即使有缓存,也还是这个线程,所以不会因为缓存问题,导致问题) AtomicInteger writeCount = new AtomicInteger(0); // 用于保存锁的持有者(这里专指写锁(独占锁)的锁持有者) AtomicReference<Thread> owner = new AtomicReference<>(); // 用于保存期望获得锁的线程(为了区分线程希望获得的锁的类型,这里新建一个新的数据类型(通过内部类实现)) public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<>(); // 内部类实现等待队列中的自定义数据类型 class WaitNode{ // 表示该等待者的线程 Thread thread = null; // 表示希望争取的锁的类型。0表示写锁(独占锁),1表示读锁(共享锁) int type = 0; // 参数,acquire,状态相关,再看看 int arg = 0; public WaitNode(Thread thread, int type, int arg) { this.type = type; this.thread = thread; this.arg = arg; } } /** * 尝试获取独占锁(针对独占锁) * @param acquires 用于加锁次数。一般传入waitNode.arg(本代码中就是1。为什么不用一个常量1,就不知道了?)(可以更好的对接AQS) * @return */ public boolean tryLock(int acquires){ //TODO_FINISHED 这里readCount的判断,与修改writeCount的操作可以被割裂,并不是原子性的。不就有可能出现readCount与writeCount的值同时大于零的情况。 // 该示例代码,确实存在该问题,但实际源码,writeCount与readCount是通过同一变量state实现的,所以可以很好地通过CAS确保原子性 // readCount表示读锁(共享锁)的上锁次数 if (readCount.get() == 0){ // readCount的值为0,表示读锁(共享锁)空置,所以当前线程是有可能获得写锁(独占锁)。 // 接下来判断写锁(独占锁)是否被占用 int writeCountValue = writeCount.get(); if (writeCountValue == 0){ // 写锁(独占锁)的锁次数为0,表示写锁(独占锁)并没未被任何线程持有 if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ // 修改writeCount,来获得锁。该机制与ReentrantLock相同 // 设置独享锁的持有者owner owner.set(Thread.currentThread()); // 至此,表示当前线程抢锁成功 return true; } } else { // 写锁(独占锁)的锁次数不为0,表示写锁(独占锁)已经被某线程持有 if (Thread.currentThread() == owner.get()){ // 如果持有锁的线程为当前线程,那就进行锁的重入操作 writeCount.set(writeCountValue+acquires); // 重入锁,表示当前线程是持有锁的 return true; } // 读锁未被占用,但写锁被占用,且占据写锁的线程不是当前线程 } } // 读锁被占据 // 其它情况(1.读锁被占据,2读锁未被占用,但写锁被占用,且占据写锁的线程不是当前线程),都返回false return false; } /** * 获取独占锁(针对独占锁) */ public void lock(){ // 设定waitNote中arg参数 int arg = 1; // 尝试获取独占锁。成功便退出方法,失败,则进入“不死不休”逻辑 if (!tryLock(arg)){ // 需要将当前保存至等待队列,在这之前,需要封装当前线程为waitNote WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg); // 将封装好的waitNode放入等待队列waiters中(offer方法会在队列满时,直接返回false。put则是阻塞。add则是抛出异常) waiters.offer(waitNode); // 如ReentrantLock一般,开始循环尝试拿锁 while (true){ // 获取队列头部元素 WaitNode headNote = waiters.peek(); // 如果等待队列头部元素headNote不为null(有可能是null嘛?),并且就是当前线程,那就尝试获取锁 if (headNote !=null && headNote.thread == Thread.currentThread()){ // 如果再次尝试获取锁失败,那就只能挂起了 if (!tryLock(headNote.arg)){ LockSupport.park(); } else { // 再次尝试获取锁成功,那就将队列头部元素,踢出等待队列waiters waiters.poll(); return; } }else { // 如果headNote不是当前线程的封装,就直接挂起(这里就没处理headNote==null的情况) LockSupport.park(); } } } } /** * 尝试解锁(针对独占锁) * @param releases 用于设定解锁次数。一般传入waitNode.arg * @return */ public boolean tryUnlock(int releases){ // 首先判断锁的持有者是否为当前线程 if (owner.get() != Thread.currentThread()){ // 锁的持有者不是当前线程(即使锁的持有者为null,锁的持有者是null,还解锁,仍然是抛出异常) throw new IllegalMonitorStateException(); } // 锁的持有者就是当前线程 // 首先按照releases进行解锁(经过一番思考后,这里不会出现类似DoubleCheck中的问题(Atomic中的value是volatile的),所以这个值同时只会有一个线程对其操作) int writeCountValue = writeCount.get(); // 为writeCount设置新值 writeCount.set(writeCountValue-releases); // 根据writeCount的新值,判断锁的持有者是否发生变化 if (writeCount.get() == 0){ // writeCount的值为0,表示当前线程已经完全解锁,所以修改锁的持有者为null owner.set(null); // 而这表示完全解锁成功 return true; } else { // writeCount的值不为0,表示当前线程尚未完全解锁,故锁的持有者未发生变化。即尝试解锁失败 return false; } } /** * 解锁(针对独占锁) */ public void unlock(){ // 设定tryUnlock的参数releases int arg = 1; // 先尝试解锁 if (tryUnlock(arg)){ // 获得等待队列的头部元素 WaitNode head = waiters.peek(); // 检测一下头部元素head是否null(也许等待队列根本就没有元素) if (head == null){ // 如果头部元素head为null,说明队列为null,直接return return; } // 解锁成功,就要把等待队列中的头部元素唤醒(unpark) // 这里有一点注意,即使队列的头元素head被唤醒了,也不一定就是这个头元素head获得锁(详见tryLock,新来的线程可能获得锁) // 如果这个头元素无法获得锁,就会park(while循环嘛)。并且一次park,可以多次unpark(已实践) LockSupport.unpark(head.thread); } } /** * 尝试获取共享锁(针对共享锁) * @param acquires * @return */ public boolean tryLockShared(int acquires){ // 判断写锁(独占锁)是否被别的线程持有(这个条件意味着:同一个线程可以同时持有读锁与写锁) // 该方法是为了进行 锁降级****** if (writeCount.get() == 0 || owner.get() == Thread.currentThread()){ // 如果写锁(独占锁)没有别的被线程持有,就可以继续尝试获取读锁(共享锁) // 通过循环实现自旋,从而实现加锁(避免加锁失败) while(true){ // 由于读锁(共享锁)是共享的,不存在独占行为,故直接在writeCount增加当前线程加锁行为的次数acquires int writeCountValue = writeCount.get(); // 通过CAS进行共享锁的次数的增加 if (writeCount.compareAndSet(writeCountValue, writeCountValue+acquires)){ break; } } } // 写锁已经被别的线程持有,共享锁获取失败 return false; } /** * 获取共享锁(针对共享锁) */ public void lockShared(){ // 设定waitNote中arg参数 int arg = 1; // 判断是否获取共享锁成功 if (!tryLockShared(arg)){ // 如果获取共享锁失败,就进入等待队列 // 与获取同步锁操作一样的,需要先对当前线程进行WaitNote的封装 WaitNode waitNode = new WaitNode(Thread.currentThread(),1,arg); // 将waitNote置入waiters(offer方法会在队列满时,直接返回false。put则是阻塞。add则是抛出异常) waiters.offer(waitNode); // 使用循环。一方面避免伪唤醒,另一方面便于二次尝试获取锁 while (true){ // 获取等待队列waiters的头元素head WaitNode head = waiters.peek(); // 校验head是否为null,并判断等待队列的头元素head是否为当前线程的封装(也许head时当前线程的封装,但并不意味着head就是刚刚放入waiters的元素) if (head != null && head.thread == Thread.currentThread()){ // 如果校验通过,并且等待队列的头元素head为当前线程的封装,就再次尝试获取锁 if (tryLockShared(head.arg)){ // 获取共享锁成功,就从当前队列中移除head元素(poll()方法移除队列头部元素) waiters.poll(); // 在此处就是与独占锁不同的地方了,独占锁意味着只可能有一个线程获得锁,而共享锁是可以有多个线程获得的 // 获得等待队列的新头元素newHead WaitNode newHead = waiters.peek(); // 校验该元素是否为null,并判断它的锁类型是否为共享锁 if (newHead != null && newHead.type == 1){ // 如果等待队列的新头元素是争取共享锁的,那么就唤醒它(这是一个类似迭代的过程,刚唤醒的线程会会做出同样的举动) //TODO_FINISHED 这里有一点,我有些疑惑,那么如果等待队列是这样的{共享锁,共享锁,独占锁,共享锁,共享锁},共享锁们被一个独占锁隔开了。是不是就不能唤醒后面的共享锁了。再看看后面的代码 // 这个实际源码,并不是这样的。老师表示现有代码是这样的,不用理解那么深入,后续有机会看看源码 LockSupport.unpark(newHead.thread); } } else { // 如果再次获取共享锁失败,就挂起 LockSupport.park(); } } else { // 如果校验未通过,或等待队列的头元素head不是当前线程的封装,就挂起当前线程 LockSupport.park(); } } } } /** * 尝试解锁(针对共享锁) * @param releases * @return */ public boolean tryUnlockShared(int releases){ // 通过CAS操作,减少共享锁的锁次数,即readCount的值(由于是共享锁,所以是可能多个线程同时减少该值的,故采用CAS) while (true){ // 获取读锁(共享锁)的值 int readCountValue = readCount.get(); int readCountNext = readCountValue - releases; // 只有成功修改值,才可以跳出 if (readCount.compareAndSet(readCountValue,readCountNext)){ // 用于表明共享锁完全解锁成功 return readCountNext == 0; } } // 由于读锁没有owner,所以不用进行有关owner的操作 } /** * 解锁(针对共享锁) */ public boolean unlockShared(){ // 设定tryUnlockShared的参数releases int arg = 1; // 判断是否尝试解锁成功 if (tryUnlockShared(arg)){ // 如果尝试解锁成功,就需要唤醒等待队列的头元素head的线程 WaitNode head = waiters.peek(); // 校验head是否为null,毕竟可能等待队列为null if (head != null){ // 唤醒等待队列的头元素head的线程 LockSupport.unpark(head.thread); } //TODO_FINISHED 尝试共享锁解锁成功后,就应当返回true(虽然有些不大理解作用) // 用于对应源码 return true; } //TODO_FINISHED 尝试共享锁解锁失败后,就应当返回false(虽然有些不大理解作用) // 用于对应源码 return false; } }
这里同样不进行相关解释了。因为需要的解释,在注释中都写的很明确了,包括我踩的一些坑。
如果依旧有一些看不懂的地方,或者错误的地方,欢迎@我,或者私信我。
其实,这两个demo有两个重要的方面。一方面是可以亲自感受,一个锁是怎么实现的,它的方案是怎样的。另一方面就是去思量,其中有关原子性,以及可见性的思量与设计。
你们可以尝试改动一些东西,然后去考虑,这样改动后,是否存在线程安全问题。这样的考虑对自己在线程安全方面的提升是巨大的。反正我当时那一周,就不断的改来改去。甚至有些改动,根本调试不出来问题,然后咨询了别人,才知道其中的一些坑。当然也有一些改动是可以的。
如果有问题,可以@我,或者私信我。
如果觉得这篇文章不错的话,请点击推荐。这对我,以及那些需要的人,很重要。
谢谢。