一 概述 二 案例 三 继承体系 四 属性 五 构造 六 内部类 七 锁操作 八 锁计数器 九 问题解答 十 总结 复制代码
ReentrantReadWriteLock从字面意思上就是可重入的读写锁。读写锁的特点就简单来说就是读读之间不互斥,读写或写写之间是互斥的。今天就通过它来解开读写锁的秘密。由于这个类名称太长,后面都简称它为RRWLock。
在解读RRWLock时,先简单看下源码中提供的一个使用场景案例(代码如下),这是一个缓存的实例,因为缓存的特点就是读大于写的,这也符合RRWLock的特性。
class CachedData { // 实际的缓存数据 Object data; // 缓存有效标识 volatile boolean cacheValid; // 读写锁 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); // 处理缓存数据 void processCachedData() { // 先加上读锁 rwl.readLock().lock(); // 如果缓存无效,即不存在时,执行加载缓存操作 if (!cacheValid) { // 写缓存要获取写锁,但是获取写锁之前要释放读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. // 再获取到写锁后再次判断缓存是否已被其他线程加载了(因此在lock()可能会出现自旋或阻塞等待的情况,期间其他线程可能会进行加载缓存的操作)) if (!cacheValid) { // 加载缓存 data = ...; // 更新缓存有效标识 cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock // 在释放写锁前先获取到读锁 rwl.readLock().lock(); } finally { // 释放写锁,此时仍持有读锁 rwl.writeLock().unlock(); // Unlock write, still hold read } } try { // 使用缓存数据 use(data); } finally { // 使用完后释放读锁 rwl.readLock().unlock(); } } } 复制代码
简单总结下这个缓存案例的加锁流程:
这里先抛出2个问题,在文末有答案分析:
可以看出,RRWLock继承自ReadWriteLock接口,它的定义如下:
public interface ReadWriteLock { // 返回用于读的锁 Lock readLock(); // 返回用于写的锁 Lock writeLock(); } 复制代码
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; // 读锁内部类 private final ReentrantReadWriteLock.ReadLock readerLock; // 写锁内部类 private final ReentrantReadWriteLock.WriteLock writerLock; // 执行所有同步机制的AQS final Sync sync; } 复制代码
全局的属性不多,主要包含了读写锁和同步器。
// 默认无参构造方法-默认非公平锁 public ReentrantReadWriteLock() { this(false); } // 提供了是否公平锁的参数构造 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } 复制代码
构造方法就是先创建一个公用的AQS同步器,然后再分别初始化读锁和写锁。
从属性中可以看出,在RRWLock中有三个核心的内部类:ReadLock、WriteLock、Sync,这里先简单看下他们的内部构造和常用方法:
public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock(); public void lockInterruptibly() throws InterruptedException; public boolean tryLock(); public void unlock(); public Condition newCondition(); } 复制代码
public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock(); public void lockInterruptibly() throws InterruptedException; public boolean tryLock( ); public void unlock(); public Condition newCondition(); public boolean isHeldByCurrentThread(); public int getHoldCount(); } 复制代码
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; /* 锁定状态在逻辑上分为两个无符号短路: 1.较低的一个表示独占(写)锁定保持计数, 2.共享(读)锁保持计数的上限。 */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } } 复制代码
它的执行过程如下:
public void lock() { // 调用AQS中的请求共享锁方法 sync.acquireShared(1); } 复制代码
实际调用AQS中的acquireShared方法:
public final void acquireShared(int arg) { // 尝试获取共享锁,如果获取失败则 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 复制代码
尝试获取共享锁方法,先简单单独取一次,成功则返回,失败则进行完整的获取:
protected final int tryAcquireShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取锁状态 int c = getState(); // 如果独占锁数量不为0 并且 持有独占锁的线程不是当前线程,则直接失败返回 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取共享锁的数量 int r = sharedCount(c); // 检查是否需要因为队列策略而阻塞,若不需要则检查共享锁数量是否达到最大值,都没有则CAS更新锁状态 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 共享锁数量为0表示当前线程是第一个获取读锁的线程 if (r == 0) { // 更新第一个读线程变量和数量 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 如果首个获取读锁的线程重复获取读锁时,直接重入并将计数器累加 firstReaderHoldCount++; } else { // 获取当前线程的计数器 HoldCounter rh = cachedHoldCounter; // 如果计数器为空 或者 当前线程还没有创建计数器,则创建计数器并存放到readHolds中,即存放到ThreadLocal中 if (rh == null || rh.tid != getThreadId(current)) // 在ThreadLocal中创建 cachedHoldCounter = rh = readHolds.get(); // 如果当前线程的计数器已存在,且计数值为0,则将该计数器放到readHolds中 else if (rh.count == 0) readHolds.set(rh); // 锁重入次数累加 rh.count++; } return 1; } // 之前因为队列策略或更新锁失败后再通过下面方法进行完整地尝试获取锁 return fullTryAcquireShared(current); } 复制代码
完整地获取共享锁方法,作为tryAcquireShared方法因CAS获取锁失败后的处理。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 循环获取读锁 for (;;) { int c = getState(); // 当前存在独占锁 if (exclusiveCount(c) != 0) { // 并且非当前线程持有该独占锁,则直接返回-1 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 确保我们没有重新获取读锁定 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 共享数量超过最大时抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS更新锁状态,以下逻辑与tryAcquireShared类似 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 复制代码
如果存在独占锁,则需要执行将当前线程加入到AQS同步等待队列中去,自旋一段时间后仍未获取到则进行阻塞处理。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
读锁的释放流程相对而言简单一点,如下:
public void unlock() { sync.releaseShared(1); } 复制代码
AQS中释放共享锁方法
public final boolean releaseShared(int arg) { // 尝试一次释放锁,成功则唤醒同步队列中的写锁 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 复制代码
尝试释放锁方法
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 首个读线程直接更新特有的计数器即可 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 非首读线程则需要更新它的重入次数减1 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // CAS方式更新锁状态 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } } 复制代码
释放读锁后,需要同步唤醒在同步队列中等待的写锁
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
写锁获取其实跟ReentrantLock总体逻辑差不多,先尝试一次获取锁,失败则加入AQS队列中等待获取锁,不同的是它们各自实现了tryAquire()尝试获取锁的方法。具体的介绍如下:
WriteLock.lock()方法实现的Lock接口,内部还是调用AQS的acquire方法请求锁:
public void lock() { sync.acquire(1); } 复制代码
这个acquire()方法在之前介绍ReentrantLock中也讲过,tryAcquire()方法在AQS中没有实现,由每个锁内部去实现。如果第一次尝试获取锁失败则创建一个独占锁节点并加入到AQS队列中。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
下面主要是RWRLock中的AQS实现:
protected final boolean tryAcquire(int acquires) { /* 逻辑: 1.如果读锁计数器不为0 或者 写锁计数器不为0 并且 当前线程不是持有锁的线程,则失败返回; 2.如果计数器达到最大值则失败; 3.第1和第2都不满足时,此时线程可以开始尝试获取锁并更新锁状态; */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 重入锁更新锁状态 setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 复制代码
写锁的释放基本原理也都差不多,只不过释放锁的时候更新了下锁的重入次数。只有当0锁真正释放(即独占锁次数为0)则返回true以继续唤醒等待获取锁的线程。
public void unlock() { sync.release(1); } 复制代码
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } 复制代码
这里存在两种计数器:
通过使用AQS中的锁状态-state属性来存放这两个计数值:
源码中也有详细的说明:
/* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 复制代码
首先要说明的是,在锁状态更新的计数器是共享锁持有的总次数,但是这里由于这里的锁是可重入的,而共享锁又是允许多线程同时持有的,因此需要为每个线程保存各自的重入次数计数器,在这里使用了ThreadLocal对象来保存该计数器。具体的设计如下:
真正计数的类是HoldCounter,它包含一个计数器和线程的独有ID,如下:
static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } 复制代码
而使用的ThreadLocal的一个内部类并继承于ThreadLocal的ThreadLocalHoldCounter,它实现了inittalValue方法,内部存放的是一个HoldCounter对象,具体如下:
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } 复制代码
在tryAquireShared中对共享锁的计数器操作如下:
// 获取最后一个线程的缓存计数器 HoldCounter rh = cachedHoldCounter; // 如果为空 或者 当前线程不是最后一个线程 if (rh == null || rh.tid != getThreadId(current)) // 则为当前线程创建计数器,并放到缓存计数器中 cachedHoldCounter = rh = readHolds.get(); // 否则如果计数器值为0 else if (rh.count == 0) // 则重新初始化下以下值 readHolds.set(rh); // 执行到此,计数器已初始化完毕,此时执行加1操作 rh.count++; 复制代码
独占锁因不存在多线程的问题,因此直接使用锁状态作为重入次数计数器。获取独占锁后的更新操作为:
// acquire = 1 setState(c + acquires); 复制代码
可以看出是直接对锁状态做的加1操作,释放锁时也是直接减1
// releases = 1 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; setState(nextc); return free; 复制代码
在开篇讲完读写锁的案例后提出了两个问题,源码解析后再来回答下这个问题。
解答问题之前,先简单介绍下读写锁的一些概念:
从源码来解答就是获取写锁前会先检查锁状态是否含有读锁,并且它没有区分该读锁持有者是否为自己,如果发现存在读锁则会加入AQS队列自旋或阻塞等待。如下所示:
Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; …… } 复制代码
其实这里讨论的就是锁升级的问题,从源码也可以看出,RRWLock是不支持锁升级的。因此,如果没有释放读锁而直接去获取写锁,会导致写锁阻塞。案例中如果当前线程持有读锁不释放直接获取写锁会发现死锁等待的问题。
这其实就是锁降级问题,首先说明的是,RRWLock是支持锁降级的。从获取读锁的过程中就可以看出:
Thread current = Thread.currentThread(); int c = getState(); // 如果独占锁不为空,并且持有独占锁的非当前线程才会直接失败返回 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; 复制代码
在这个场景中,如果不是采用锁降级的方式,而是直接先释放写锁再去获取读锁的话,会存在这样的问题:当修改完数据并释放写锁后,读锁参与竞争锁,但是另一个写锁请求先获取到了,那么当前的读锁线程就得等待,另一个写锁线程修改完数据后释放锁,当前读锁获取到的数据已经是被修改之后了,而不是当时修改的,此时产生了脏读的问题,因此此处使用锁降级的方案能避免此问题。
行文至此,RRWLock的原理也基本都讲完了,最后也简单总结下它的特点: