通过对锁原理的分析,重点分析ReentrantLock和ReentrantReadWriteLock的源码,通过锁的实现更深入的理解AQS。 已同步至我的个人博客: siyuanwang.github.io/2019/12/12/…
在本文的开篇,先提出问题,我们所有的研究,都为了解决这些问题的。
在分析源码之前,先讲解下锁相关的基础。
通常情况下解决多线程共享资源逻辑一致性问题有两种方式:
对于这两种方式没有优劣之分,只有是否适合当前的场景。
但是如果竞争非常激烈的时候,使用自旋锁就会产生一些额外的问题:
解决这些问题其中的一种办法就是使用队列锁,简单来讲就是让这些线程排队获取,下面章节会介绍 CLH锁 ,它是队列锁的一种优秀实现。
无论是简单的非公平自旋锁还是公平的基于排队的自旋锁,由于执行线程均在同一个共享变量上自旋,申请和释放锁的时候必须对该共享变量进行修改,这将导致所有参与排队自旋锁操作的处理器的缓存变得无效。如果排队自旋锁竞争比较激烈的话,频繁的缓存同步操作会导致繁重的系统总线和内存的流量,从而大大降低了系统整体的性能。
所以,需要有一种办法能够让执行线程不再在同一个共享变量上自旋,避免过高频率的缓存同步操作。于是 MCS 和 CLH 锁应运而生,由于MCS和CLH很类似,Java中主要采用了CLH的思想,所以CMS在此就不再研究。
CLH锁的名称都来源于发明人的名字首字母:
Craig、Landin、Hagersten三个人发明了CLH锁。 其核心思想是:通过一定手段将所有线程对某一共享变量的轮询竞争转化为一个线程队列,且队列中的线程各自轮询自己的本地变量 。
上文提到的自旋锁和互斥锁,是天然的非公平锁。在竞争激烈的时候,可能导致一些线程始终无法获取锁(争抢的时候必然是当前活跃线程获得锁的几率大),也就是 饥饿现象 。为了解决饥饿现象所以需要一种公平的方式,让每一个线程都有机会获取到锁。
**公平锁(FairSync)**的定义是:所有竞争资源的线程遵循排队原则,逐一获得锁,通俗一点来讲,就是“先来后到”。非公平锁(NonfairSync)允许“插队行为“。
在现实生活中,大家都对插队行为嗤之以鼻,但是大家完全遵循排队原则,效率(在计算机世界里,效率就是用最少的资源最大化利用CPU的计算能力)就真的高嘛?举一个简单的例子,有10个线程用公平排队的方式等待资源,但是由于线程A有大量的I/O操作,一直占用锁不释放,CPU空置,后面的线程为了遵循公平原则只能干瞪眼,此刻CPU资源不能最大化的利用。
那如果执行较快的线程B在到的时候,资源恰好是无竞争状态,那它可以直接“插队“进来优先被执行,但是由于线程B执行的速度很快,所以线程A也没有蒙受多大的‘’损失‘’‘。 所以非公平锁可以尽量利用系统的计算资源,省去了线程的唤醒,用户态到内核态切换等等消耗资源的操作,从大局上提高系统的效率 。
但是,所有的事物都要辩证的去看。举一个例子,如果线程A是一个很重要的线程,但是由于锁竞争很激烈,线程A始终拿不到锁,那么就会导致一些关键性功能受到影响,这就是 锁饥饿 。 所以非公平锁在锁竞争很激烈的情况下,会存在锁饥饿现象 。
公平和非公平各有利弊,但是大多数的锁的实现,默认实现的是非公平锁,如果没有特殊的场景,非公平锁就能满足你的大多数需求。
先说场景来理解可重入性。如下代码会最终打印100。调用test的时候,主线程获得了锁,但是在主线程还未释放锁的时候,由于递归调用又再次获取了锁。 一个线程可以多次获取同一把锁的行为,叫做锁的可重入 。如果不可重入,如下代码就会造成 死锁 。
public class TestClass { public static void main(String[] args) { AtomicInteger i = new AtomicInteger(0); test(i); System.out.println(i); } public static synchronized void test(AtomicInteger i) { if (i.get() < 100) { i.incrementAndGet(); test(i); } } } ##结果 100 复制代码
乐观锁对应于生活中乐观的人总是想着事情往好的方向发展,悲观锁对应于生活中悲观的人总是想着事情往坏的方向发展。这两种人各有优缺点,不能不以场景而说一种人好于另外一种人。
Java中synchronized和ReentrantLock就是悲观锁思想实现的,数据库中的表锁和行锁,也都是悲观锁的实现。
Java中 java.util.concurrent.atomic
包下面的各种原子类,就是使用了乐观锁的一种实现方式CAS实现。
从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种。 乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量 。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般 多写的场景下用悲观锁 就比较合适。
乐观锁的实现,一般会通过 版本号机制 或者 CAS算法 实现。
一般在数据中,增加一个 version
字段,表示字段被修改的次数,经典的实现有数据库的乐观锁设计,ElasticSearch的_version机制等。用如下伪代码的形式,解释下版本号的思想。
var user = select user.age,user.id,user.version from t_user where id = 1001; user.age++; int oldVersion = user.version; user.version++; int result = update t_user set user.age=#{user.age},version=#{user.version} where id = 1001 and version=#{oldVersion}; if(result == 1) { return 200; } else { while(true){ //在执行一次如上逻辑 if(result == 1) { break; } } } 复制代码
具体的CAS问题可以参考此文,作者写的也是比较详细,ABA的例子也来自他的博文。
CAS即 compare and swap(比较与交换) ,是一种有名的无锁算法。CAS算法涉及到三个操作数
当且仅当V的值等于A时,CAS通过原子方式用新值B来更新V的值,否则不会执行任何操作( 比较和替换是原子操作 ),通过自旋不断地重试。
但是CAS机制会存在 ABA问题 ,举一个生活中取钱的例子。
在多线程场景下 CAS
会出现 ABA
问题,关于ABA问题这里简单科普下,例如有2个线程同时对同一个值(初始值为A)进行CAS操作,这三个线程如下
线程 1
抢先获得CPU时间片,而线程 2
因为其他原因阻塞了,线程 1
取值与期望的A值比较,发现相等然后将值更新为B,然后这个时候 出现了线程 3
,期望值为B,欲更新的值为A ,线程3取值与期望的值B比较,发现相等则将值更新为A,此时线程 2
从阻塞中恢复,并且获得了CPU时间片,这时候线程 2
取值与期望的值A比较,发现相等则将值更新为B,虽然线程 2
也完成了操作,但是线程 2
并不知道值已经经过了 A->B->A
的变化过程。
ABA
问题带来的危害 : 小明在提款机,提取了50元,因为提款机问题,有两个线程,同时把余额从100变为50 线程1(提款机):获取当前值100,期望更新为50, 线程2(提款机):获取当前值100,期望更新为50, 线程1成功执行,线程2某种原因block了,这时,某人给小明汇款50 线程3(默认):获取当前值50,期望更新为100, 这时候线程3成功执行,余额变为100, 线程2从Block中恢复,获取到的也是100,compare之后,继续更新余额为50!!! 此时可以看到,实际余额应该为100(100-50+50),但是实际上变为了50(100-50+50-50)这就是ABA问题带来的成功提交。
解决方法: 在变量前面加上版本号,每次变量更新的时候变量的 版本号都 +1
,即 A->B->A
就变成了 1A->2B->3A
。
synchronized是Java中的内置锁,具体的实现在JVM中。synchronized可以给修饰代码块,修饰方法等,由于它的用法过于基础,不在本文中赘述。
synchronized的特点是使用简单,不需要手动指定锁的获取和锁的释放。被大家所诟病的性能问题,在JDK1.6之后不复存在。在JDK1.6中,引入了两种新型锁机制: 偏向锁 和 轻量级锁 ,它们的引入是为了解决在没有多线程竞争或基本没有竞争的场景下因使用传统锁机制带来的性能开销问题。
关于重量级锁、轻量级锁、偏向锁,可以看此文,作者写的很用心。
总之在使用synchronized的时候,偏向锁、轻量级锁、重量级锁,分别对应锁只被一个线程持有,不同线程持交替持有,多线程竞争锁三种情况。当条件不满足时,锁会按偏向锁->轻量级锁->重量级锁的顺序 升级 。JVM的锁也是能降级的,不过条件十分苛刻。
针对问题:ReentrantLock与Syncronized有何不同?回答是:
在JDK1.6之前,由于还没有引入偏向锁和轻量级锁,JVM对synchronized的实现比较重,导致它的性能存在一定的问题,ReentrantLock的性能要完胜synchronized的。但是随着JVM对synchronized的不断优化,在jdk1.6之后,两者的差距越来越小,synchronized的底层实现主要依靠 Lock-Free 的队列,基本思路是 自旋后阻塞 , 竞争切换后继续竞争锁 , 稍微牺牲了公平性,但获得了高吞吐量 ,性能差距可以忽略不计。所以在一般情况下,性能不是两者最大的差距。真正的差距是在于两者的使用上,synchronized是很死板的,只能对一个代码块或者一个方法,提供非公平独占锁的实现。跟synchronized相比,ReentrantLock是非常灵活的,既可以公平锁也可以非公平锁,可以 newCondition()
,来实现锁的阻塞队列,但是灵活性带来的就是使用的门槛较高,使用不规范就很容易造成死锁!不要小看死锁,解决死锁的方式是进程重启!但是在使用synchronized时,是不用考虑死锁问题的。
void lock()
获取锁
void lockInterruptibly()
获取锁除非线程被中断
InterruptedException
boolean tryLock()
仅当其他线程在此刻没有持有该锁时,才会获取该锁,立即返回并返回 true
,设置标志位为1。如果当前线程已经持有该锁,立即返回,标志位递增并返回 true
。如果该锁被其他线程持有,将立即返回 false
。
boolean tryLock(long timeout, TimeUnit unit)
具体逻辑跟tryLock相同但是多了一个等待的时间,如果在等待期间,线程被中断,还会抛出 InterruptedException
void unlock()
试图释放锁,当前线程拥有该锁,则计数器减1,直到计数器为0,则释放锁。如果当前线程不持有该锁,则抛出 IllegalMonitorStateException
异常
Condition newCondition()
返回锁的Condition实例。Condition实例的作用与 Object.wait()
、 Object.notify()
、 Object.notifyAll()
相同。如果lock没有被任何线程持有,调用 Condition.awart()
或者 Condition.signal
,会抛出 IllegalMonitorStateException
。如果线程被中断,则waiting将会终止,并抛出 InterruptedException
,线程的中断标识位将被清除。
int getHoldCount()
,锁CLH队列的数量
boolean isHeldByCurrentThread()
锁是否被当前线程持有
boolean isLocked()
锁是否被线程持有
boolean isFair()
是否是公平锁
Thread getOwner()
返回当前持有锁的线程,如果锁没有被任何线程持有则返回 null
如下代码只保留了关键性代码。 Sync
通过继承 AbstractQueuedSynchronizer
来获得基本的同步控制能力, NofairSync
和 FairSync
通过实现AQS模板方法,来指定获取锁的资格算法。
abstract static class Sync extends AbstractQueuedSynchronizer { ... //如果线程可以获取锁,则返回true,否则立即返回false final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果状态为零,则设置ownerThread,返回true if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果状态不为零,判断当前线程是不是等于ownerThread,如果是则递增state,并立即返回true else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //其他状态均返回false return false; } ... } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { //比较如果status=0则status=1,并设置ownerThread,通过这种方式使nonfairTryAcquire方法返回true if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //公平锁的实现中,比非公平锁多了一句hasQueuedPredecessors(),只有没有后继者并且status=0才可以有获取锁的资格,是非常之公平 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 复制代码
在上一章节公平锁&非公平锁中,可以看到 current == getExclusiveOwnerThread()
,status会加上acquires值,来做递增并返回true,所以同一个线程进来之后,不会加入到CLH同步队列中。
在公平模式&非公平模式的章节中,我们会发现无论是公平锁还是非公平锁,他们最终都会调用 acquire
方法进入到同步队列中。 acquire
方法包括如下流程
tryAcquire
尝试获取锁,如果能获取锁,则直接返回。 tryAcquire
是一个模板方法,由实现AQS的类实现。在ReentrantLock中,分别有公平锁和非公平锁两种实现,具体实现可以参考对应章节。 addWaiter
方法,把 currentThread
包装成 Node
,并加入到同步队列中。这里有一个细节 addWaiter(Node.EXCLUSIVE)
,ReentrantLock是独占锁的思想,所以需要指定模式为 独占模式 。 acquireQueued
方法,采用自旋+阻塞的方式,控制同步队列。只有在waitStatus=SIGNAL状态下,才会阻塞线程,每次自旋都会检测当前节点node的前置节点,是否为 head
节点,如果是head节点并且能获取到锁,则跳出自旋。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } //独占模式的acquire方法 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋+park for (;;) { final Node p = node.predecessor(); //如果前置节点时head,注意此处还会再尝试一下能否获得到锁 if (p == head && tryAcquire(arg)) { //这直接把node设置为head setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果前置节点不是head或者tryAcquuire返回false,则需要park节点 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //pred为前置节点,node为当前节点 //当获取锁失败时,检查和更新node.waitStatus,如果线程应该被阻塞,则返回true. //这个方法是自旋锁中最重要的控制手段 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取到前置节点的waitStatus int ws = pred.waitStatus; //如果前置节点的状态为SIGNAL,则返回true,意味着当前节点node应该被park if (ws == Node.SIGNAL) return true; //如果前置节点被CANCEL的,则移除 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //原子的设置前置节点为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //阻塞线程并返回线程是否中断 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } protected final boolean tryRelease(int releases) { //重置标识位为0,这个很关键 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //释放锁 public final boolean release(int arg) { //判定currentThread是否等于ownerThread,如果不相等则抛出IllegalMonitorStateException //然后,判断status属性是否等于0,如果等于0则返回true,同时清空ownerThread。代表资源已经被释放掉 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //unpark线程 unparkSuccessor(h); return true; } return false; } 复制代码
可能看完源码还是一头雾水,我本人是看了很多次,才明白了AQS具体的执行流程。整理了一个流程图,希望大家仔细的按照图的流程梳理一遍整个流程,才能真正的理解AQS。
如图所示:
ReentrantLock
实例,此刻head=null,tail=null,status=0 lock.lock()
,由于线程A是最先获取锁的,status=1,根据 acquire
方法的逻辑,不设置任何Node到同步队列,所以head和tail依旧为null。 lock.lock()
,由于此刻status=1,所以 tryAcquire
返回false。触发了 addWariter
逻辑,增加了Node和NodeB两个节点,head和tail指针也指向对应的链表头和链表尾。 acquireQueued
开始自旋操作,第一次自旋由于 Node.waitStatus != SIGNAL
,所以设置前置节点waitStatus=SIGNAL后,继续自旋,再次判定 shouldParkAfterFailedAcquire
方法,发现 Node.waitStatus == SIGNAL
,这时会阻塞ThreadB。 lock.lock()
,执行逻辑同上。最终会形成Node<->NodeB<->NodeC这种的CLH同步队列。 lock.unlock()
,会最终调用 release
方法,把status=0,并唤醒head的后继节点NodeB,NodeB唤醒之后继续自旋,最终NodeB成为head节点,跳出自旋,执行ThreadB的逻辑。 lock.lock()
,最终会唤醒NodeC,最终全部线程执行完毕。 通过如下代码给当前的Lock对象创建一个Condition对象。它的作用等同于 Object.wait()
和 Object.notify()
,条件队列是一个FIFO队列,可以通过 signalAll
解锁全部的线程,也可以通过 signal
单独解锁线程。
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); 复制代码
首先是 await
的逻辑:
然后是 signal
的逻辑:
firstWaiter
,如果first.nextWaiter == null 则置空 lastWaiter
//等待Conditon public final void await() throws InterruptedException { //第一步,判断中断则抛出InterruptedException if (Thread.interrupted()) throw new InterruptedException(); //第二步,在FIFO条件队列(condition queue)增加节点,节点状态为CONDITION,在增加节点的同时清除掉状态为CANCELED的Node Node node = addConditionWaiter(); //第三步,把该节点从同步队列(sync queue)中去除 int savedState = fullyRelease(node); int interruptMode = 0; //第四步,判定节点是否在同步队列中,只有不在同步队列,才阻塞线程 while (!isOnSyncQueue(node)) { //阻塞node LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //第五步,当node被signal之后,把node加入到同步队列中,准备获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //第六步,移除掉所有状态为CANCELED的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //第七步,报告中断,如果在signal之前线程被中断则抛出中断异常,如果在signal之后线程被中断,则再次中断,有调用代码自行处理中断标识 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //新的waiter节点加入条件队列 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } //轮询整个条件队列(condition queue),去掉节点状态不是CONDITION的节点 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; //如果t的状态不是CONDITION,则把t节点从链表中摘除 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } //从同步队列中移除node final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } //判定node是否在同步队列中,条件队列的next和prev都一定是null,条件队列是nextWaiter的单向链表 final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果node.next != null,说明存在后继节点,说明它一定在同步队列中 if (node.next != null) // If has successor, it must be on queue return true; //从tail轮询同步队列,判定node是否在同步队列中 return findNodeFromTail(node); } //检测中断,返回THROW_IE如果中断发生在signalled之前,返回REINTERRUPT如果中断发生在中断之后,返回0表示没有发生中断 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } //wait结束后报告中断 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } //释放Condition public final void signal() { //模板方法,由实现AQS的实现类实现,在ReentrantLock中,判定current线程是否等于ownerThread if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } //转换node从 条件队列-> 同步队列 final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //解除节点阻塞 LockSupport.unpark(node.thread); return true; } 复制代码
在使用Condition的时候,一定要注意对中断的处理。 Condition.await()
跟 Object.wait()
相比,后者只支持中断抛出 InterruptedException
异常,而前者即抛出 InterruptedException
异常,也会再次重置中断标识位,Condition处理中断的逻辑是 如果在signal之前线程被中断则抛出中断异常,如果在signal之后线程被中断,则再次中断,有调用代码自行处理中断标识 。如下示例代码所示。
定义三个线程 t1
, t2
, t3
,在 t2
执行 signal()
之前, t3
线程执行了 t1.interrupt()
,则t1线程中捕获中断异常。如果不执行 t3
线程并且在 t2
执行 signal()
之后再调用 t1.interrupt()
,则t1线程不会捕获中断异常,但是 Thread.currentThread().isInterrupted() == true
。
/** 在退出wait之后再次中断 */ private static final int REINTERRUPT = 1; /** 在退出wait之后抛出InterruptedException异常 */ private static final int THROW_IE = -1; public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Thread t1 = new Thread(() -> { try { lock.lock(); condition.await(); System.out.println("aaa=" + Thread.currentThread().isInterrupted()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); Thread t2 = new Thread(() -> { try { Thread.sleep(2000); lock.lock(); condition.signal(); //t1.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IllegalMonitorStateException e) { e.printStackTrace(); } finally { lock.unlock(); } }); Thread t3 = new Thread(() -> { try { Thread.sleep(1000); t1.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); t3.start(); } 复制代码
由于ReentrantLock是AQS的最基础实现,理解ReentrantLock十分重要。通过对ReentrantLock的源码分析,可以总结为如下结论:
status
字段,同步队列节点为内部类 Node
,Node的状态通过 waitStatus
控制,共有 SIGNAL
, CANCELED
, CONDITION
, PROPAGATE
四种状态, nextWaiter
是条件队列的单向链表。 通过学习ReentrantLock,我们掌握了ReentrantLock和AQS的关系,知道了是如何通过AQS来实现一个独占锁。但是很多情况下,独占锁也是一种很重的操作,比如缓存这种读多写少的情况,多并发读取缓存值是很频繁的事情,如果采用独占锁,会大大降低缓存的吞吐。而读操作本身就不存在共享变量同步,如果资源能够同时被多个读线程访问而且能保证同步,则缓存的吞吐能力将大大提升。
那么 ReentrantReadWriteLock
的出现,就是要解决以上问题的利器。读写锁的定义为: 一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程 。
ReadLock 和 WriteLock 中的方法都是通过 Sync 这个类来实现的。Sync 是 AQS 的子类,然后再派生了公平模式和不公平模式。
从它们调用的 Sync 方法,我们可以看到: ReadLock 使用了共享模式,WriteLock 使用了独占模式 。
AQS 的精髓在于内部的属性 state:对于独占模式来说,通常就是 0 代表可获取锁,1 代表锁被别人获取了,重入例外而共享模式下,每个线程都可以对 state 进行加减操作也就是说,独占模式和共享模式对于 state 的操作完全不一样,那读写锁 ReentrantReadWriteLock 中是怎么使用 state 的呢?答案是将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { /** 内部类 读锁 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 内部类 写锁 */ private final ReentrantReadWriteLock.WriteLock writerLock; /** AQS 读写锁实现类 */ final Sync sync; /** * 默认创建非公平锁 */ public ReentrantReadWriteLock() { this(false); } /** * 1、构建同步器:公平的&非公平的 * 2、构建读锁和死锁实例 */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync{} static final class FairSync extends Sync{} public static class ReadLock implements Lock, java.io.Serializable{} public static class WriteLock implements Lock, java.io.Serializable{} } 复制代码
在 AQS 中,如果 tryAcquireShared(arg) 方法返回值小于 0 代表没有获取到共享锁(读锁),大于 0 代表获取到。
AQS共享模式:tryAcquireShared 方法不仅仅在 acquireShared 的最开始被使用,这里是 try,也就可能会失败,如果失败的话,执行后面的 doAcquireShared,进入到阻塞队列,然后等待前驱节点唤醒。唤醒以后,还是会调用 tryAcquireShared 进行获取共享锁的。当然,唤醒以后再 try 是很容易获得锁的,因为其他节点都还出于阻塞状态,此刻参与竞争的,只有被唤醒的节点或者其他新的线程。
所以,你在看下面这段代码的时候,要想象到两种获取读锁的场景,一种是新来的,一种是排队排到它的。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //返回-1代表不能获取读锁 //返回1代表可以获取读锁 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //如果独占锁的数量不等于零,说明有写锁,而且当前线程不等于ownerThread,说明不是重入线程,所以返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获得共享锁数量 int r = sharedCount(c); //如果读锁不应该阻塞且CAS更新成功 //readerShouldBlock在公平模式和非公平模式有不同的实现 //在公平模式,只要同步队列中有节点在排队,则新的节点就乖乖的去排队 //在非公平模式,只有同步队列的head是写锁,才会去排队,否则不需要阻塞 //这段代码中对firstReader、firstReaderHoldCount、readHolds等设定,都是出于性能考量,可以直接返回1 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //首次获取锁 if (r == 0) { //出于性能考虑,可以 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { //出于性能考虑 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } 复制代码
回顾下这段代码,这个判定条件是重点 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARD_UNIT)
,共有三个判定条件:
readerShouldBlock()
在 FairSync
中,如果同步队列中有其他元素在等待锁,你就需要乖乖的去排队。
final boolean readerShouldBlock() { return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 复制代码
在 NonFairSync
中,如果head的后继节点是写锁线程节点,则阻塞读锁的获取,先让写锁执行。如果head.next不是写锁,非公平模式下是允许竞争的。
final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } 复制代码
r < MAX_COUNT
, static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
,MAX_COUNT一般情况下是不会达到的,最大值是65535。这个条件可以忽略不计。
compareAndSetState(c, c + SHARD_UNIT)
,CAS竞争失败,可以另一个读锁竞争,也可能是写锁操作竞争。
/** * 这个方法是为了解决: * 1、CAS竞争失败,因为经过readerShouldBlock方法的验证,线程不需要阻塞,如果仅仅是因为CAS竞争失败而导致进 * 入同步队列,就太可惜了。 * 2、处理重入 * */ final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //自旋 for (;;) { //获取状态 int c = getState(); //获取写锁数量,如果此刻有线程获取到了写锁,则宣告读线程进入队列排队 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //再次判定reader是否需要进入队列 } else if (readerShouldBlock()) { if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { //为了确保读锁重入操作能成功,而不是被塞到同步队列中等待 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //判定共享锁的数量,最大为65535 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //再次参加CAS竞争,如果还没有竞争到则继续自旋 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 复制代码
读锁释放的过程还是比较简单的,主要就是将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉。
然后是在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程。
//共享模式release public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
先说重点
1、写锁是独占锁 2、如果读锁被占用,写锁获取时是要进入阻塞队列中等待的
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //获取status值和写锁数量 int c = getState(); int w = exclusiveCount(c); if (c != 0) { //如果存在读锁,则写锁进入队列阻塞 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } //判定写锁是否应该阻塞,或者CAS静态失败,都会导致写锁阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //设置独占锁ownerThread setExclusiveOwnerThread(current); return true; } static final class NonfairSync extends Sync { //非公平模式,多个写锁之间就靠CAS去抢锁了,抢不到进入队列中排队 final boolean writerShouldBlock() { return false; } } static final class FairSync extends Sync { //公平模式如果队列存在节点则进入队列排队 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } } 复制代码
写锁释放就很简单了,独占锁的释放是线程安全的,不需要CAS,就是把state - 1。
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } 复制代码
Doug Lea 没有说写锁更 高级 ,如果有线程持有读锁,那么写锁获取也需要等待。
不过从源码中也可以看出,确实会给写锁一些特殊照顾。如非公平模式下,为了提高吞吐量,write.lock的时候不需要管同步队列中是否存在等待节点,直接CAS去竞争。read.lock的时候,如果发现 head.next 是获取写锁的线程,就会建议阻塞读锁。
Doug Lea 将持有写锁的线程,去获取读锁的过程称为 锁降级(Lock downgrading) 。这样,此线程就既持有写锁又持有读锁。
但是, 锁升级 是不可以的。线程持有读锁的话,在没释放的情况下不能去获取写锁,因为会发生 死锁 。
回去看下写锁获取的源码:
//锁不能升级的实现 protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 看下这里返回 false 的情况: // c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有) // c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁 // 也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; ... } ... } //锁可以降级的实现 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //重点!看这里,如果写锁数量不为0,代表有线程持有写锁。 //但是!如果current == ownerThread ,则跳出这个判定,有机会获取读锁,这就是锁降级。 //如果current != ownerThread,则读锁线程进入队列阻塞 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { ... } ... } 复制代码
如果线程 a 先获取了读锁,然后获取写锁,那么线程 a 就到阻塞队列休眠了,自己把自己弄休眠了,而且可能之后就没人去唤醒它,造成 死锁 。
如果线程 a 先获取了写锁,然后获取读锁,那么线程 a 就不会被阻塞,继续有机会获取读锁。
读写锁虽然适合读多写少的场景,提高了吞吐能力。但是其使用难度比 ReentrantLock
更高,必须要细心谨慎的操作锁,一旦出现死锁情况,只能靠重启进程来解决。设想一下刚重启完进程2分钟又进入死锁,线程池被占满又要重启的恐怖场景!所以,使用锁是一把双刃剑,要 仔细检查,反复测试 。
Case1: 锁升级造成死锁。一个线程要获取读锁之后,想要获取写锁前一定要先unlock读锁。
public static void main(String[] args) { try { System.out.println("获取读锁"); lock.readLock().lock(); System.out.println("获取写锁"); lock.writeLock().lock(); lock.writeLock().unlock(); System.out.println("释放写锁"); lock.readLock().unlock(); } catch (Exception e) { e.printStackTrace(); } finally { } } 复制代码
Case2: lock两次, unlock一次,由于粗心大意。一定要多次检查lock和unlock是不是成对出现。
public static void main(String[] args) { try { Random random = new Random(); lock.writeLock().lock(); lock.writeLock().lock(); System.out.println("获取写锁"); lock.readLock().lock(); System.out.println("获取读锁"); lock.readLock().unlock(); System.out.println("释放读锁"); lock.writeLock().unlock(); new Thread(() -> { lock.readLock().lock(); System.out.println("另外线程获取读锁"); lock.readLock().unlock(); }).start(); System.out.println("释放写锁" + lock.writeLock().isHeldByCurrentThread()); } catch (Exception e) { e.printStackTrace(); } finally { } } 复制代码
1、读写锁分为了 读锁 和 写锁 ,可以多个线程共享读锁,但是只有一个线程能获取写锁。在获取写锁之后,读锁都会在同步队列中等待写锁释放。这样做最大化的提高了吞吐性能的同时,保证了数据一致性。
2、持有写锁的线程获取读锁的过程叫做 锁降级 ,持有读锁的线程获取写锁的过程叫做 锁升级 。读写锁只能锁降级,锁升级会造成死锁。
3、读锁通过AQS共享模式实现,写锁通过AQS独占模式实现。AQS的 status
, 分为高 16 位和低 16位,分别用于共享模式和独占模式。
4、读锁和写锁最大是65535,超过这个值会抛出 Error
。当然一般是不会超过的。
相信大家在仔细研读可重入锁和读写锁的实现之后,对AQS已经不再陌生了。
AQS作为JUC中最基础的框架,光说它就可以写一篇论文了。本文不打算AQS长篇阔论,而是把上文中讲述的AQS知识再总结提炼一下。能看懂ReentrantLock和ReentrantReadWriteLock,就对AQS的精髓有一定掌握了。
/** * * AQS是Node节点构成的同步队列 * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * */ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** 标识为共享模式 */ static final Node SHARED = new Node(); /** 表示为独占模式 */ static final Node EXCLUSIVE = null; private transient volatile Node head; private transient volatile Node tail; /** * 同步位,可重入,state标识重入次数。为零时代表资源可以被获取 * 读写锁时,把32位的int类型拆分出高16位和低16位,来区分读锁和写锁的状态 */ private volatile int state; static final class Node { /** 标识为共享模式,赋值给nextWaiter */ static final Node SHARED = new Node(); /** 标识为独占模式,赋值给nextWaiter */ static final Node EXCLUSIVE = null; /** waitStatus状态>0是取消状态,是不可逆的状态 */ static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** * Node状态字段,包括如下值: * SIGNAL: 该节点的后继节点是(或即将是)阻塞状态(通过LockSupport.park方法), * 当前节点释放锁或者被取消时,需要调用unpark函数来激活它的后继节点。 为了避免锁竞争,acquire方法必须明确指出一个信号。 * CANCELLED: 节点因为中断或者超时变更为取消状态. * canceled是最终态,不可逆变为其他状态. 被取消的节点不会继续阻塞. * CONDITION: 节点当前在条件队列中(condition queue).这个节点不会被当做同步队列节点 直到它从条件队列中释放,这时waitStatus的值被设置为0. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. * PROPAGATE: releaseShared应该传播到其他节点。 在doReleaseShared中对此进行了设置(仅适用于头节点), 以确保传播继续进行,即使此后进行了其他操作也是如此。 * 0: 除以上四种情况外 */ volatile int waitStatus; volatile Node prev; volatile Node next; /** * 等待锁的线程 */ volatile Thread thread; /** * condition的阻塞单向链表,阻塞队列只有在独占模式下才有(AQS还有共享模式) */ Node nextWaiter; } } 复制代码