ReentrantReadWriteLock即可重入读写锁,同样也依赖于AQS来实现。在介绍ReentrantLock我们知道其依托AQS的同步状态来判断锁是否占有,而ReentrantReadWriteLock既有读锁又有写锁,是如何依靠一个状态来维持的?
ReentrantReadWriteLock读写锁,与ReentrantLock一样默认非公平,内部定义了读锁ReadLock()和写锁WriteLock(),在 同一时间允许被多个读线程访问,但在写线程访问时,所有读线程和写线程都会被阻塞 。读写锁主要特性: 公平性、可重入性、锁降级
写锁是一个支持重进入的排它锁,其获取的核心方法:
protected final boolean tryAcquire(int acquires) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取ReentrantReadWriteLock锁整体同步状态 int c = getState(); // 获取写锁同步状态 int w = exclusiveCount(c); // 存在读锁或写锁 if (c != 0) { // c != 0 && w == 0 即若存在读锁或写锁持有线程不是当前线程,获取写锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 最多65535次重入,若超过报错 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 可重入,设置同步状态 setState(c + acquires); return true; } // 公平与非公平,同步队列是否有节点,同时cas设置状态 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 设置获取锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } 复制代码
从源码中我们可以发现getState()获取的是 读锁与写锁总同步状态 ,再通过exclusiveCount()方法单独获取写锁同步状态
static final int SHARED_SHIFT = 16; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 复制代码
ReentrantReadWriteLock通过按位切割state变量, 同步状态的低16位表示写锁获取次数,高16位表示读锁获取次数 ,如图示意
写锁获取整体思路: 当读锁已经被读线程获取或者写锁已经被其他写线程获取,则写锁获取失败;否则,获取成功并可重入,增加写锁同步状态
protected final boolean tryRelease(int releases) { // 若释放的线程不为锁的持有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 重新设置同步状态 int nextc = getState() - releases; // 若新的写锁持有线程数为0,则将锁的持有线程置为null boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); // 更新同步状态 setState(nextc); return free; } 复制代码
写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0 时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见
读锁相对于写锁(独占锁或排他锁),读锁是一个支持重进入的 共享锁 ,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态
protected final int tryAcquireShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); int c = getState(); // 若写锁已被占有,且写锁占有线程不是当前线程,读锁获取失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 读锁状态 int r = sharedCount(c); // 判断读锁是否需要公平,读锁持有线程数是否小于极值,CAS设置读锁状态 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 若读锁未被线程占有,则更新firstReader和firstReaderHoldCount if (r == 0) { firstReader = current; firstReaderHoldCount = 1; // 如果获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1 } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } 复制代码
读锁获取整体思路:
①.判断写锁是否被占有,写锁占有线程是否不是当前线程,若成立则读锁获取失败
②.判断读锁是否需要公平,读锁持有线程数是否小于极值,CAS设置读锁状态成功,若条件不满足,会调用fullTryAcquireShared()方法自旋再次尝试获取读锁;若条件满足修改当前线程HoldCounter的值
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 若写锁已被占有,且写锁占有线程不是当前线程 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 公平性 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 读锁占有线程达到极值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // cas设置成功 if (compareAndSetState(c, c + SHARED_UNIT)) { // 若读锁未被线程占有,则更新firstReader和firstReaderHoldCount if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; // 如果获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1 } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 复制代码
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 若当前线程为第一个获取读锁的线程 if (firstReader == current) { // 若只有获取一次,将firstReader置为null if (firstReaderHoldCount == 1) firstReader = null; // 若多次,firstReaderHoldCount-1 else firstReaderHoldCount--; } else { // 更新当前线程获取锁次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 自旋CAS更新读锁同步状态 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
HoldCounter在读锁中起到了很重要的作用,用来计算每个线程的读锁重入次数,并使用ThreadLocal类型的HoldCounter,可以记录每个线程的锁的重入次数。 cachedHoldCounter记录了最后1个获取读锁的线程的重入次数。 firstReader指向了第一个获取读锁的线程,firstReaderHoldCounter记录了第一个获取读锁的线程的重入次数
static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient HoldCounter cachedHoldCounter; private transient int firstReaderHoldCount; private transient Thread firstReader = null; 复制代码
复制代码
复制代码
锁降级指的是写锁降级成为读锁,即 先获取写锁、获取读锁在释放写锁的过程 ,目的为了保证数据的可见性。假设有两个线程A、B,若线程A获取到写锁,不获取读锁而是直接释放写锁,这时线程B获取了写锁并修改了数据,那么线程A无法知道线程B的数据更新。如果线程A获取读锁,即遵循锁降级的步骤,则线程B将会被阻塞,直到线程A使用数据并释放读锁之后,线程B才能获取写锁进行数据更新。