ReentrantLock
是排它锁,它在同一时刻只允许一个线程进行访问。在很多场景中,读服务远多于写服务,而读服务之间不存在数据竞争问题,在一个线程读数据时禁止其他读线程访问,会导致性能降低。
所以就有了读写锁,它在同一时刻可以允许多个读线程访问,但在写线程访问时,则所有的读线程和其他写线程都会被阻塞。读写锁内部维护了一个读锁和一个写锁,如此将读写锁分离,可以很大地提升并发性和吞吐量。
ReadWriteLock 接口定义了读锁和写锁的两个方法:
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); } 复制代码
其中 readLock()
方法用于返回读操作的锁, writeLock()
用于返回写操作的锁。
ReentrantReadWriteLock
实现了 ReadWriteLock
接口,它的几个重要属性如下:
// 内部类 ReadLock,读锁 private final ReentrantReadWriteLock.ReadLock readerLock; // 内部类 WriteLock 写锁 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器,读写和写锁依赖于它 final Sync sync; 复制代码
其中有两个构造方法,主要如下:
public ReentrantReadWriteLock() { this(false); } // 指定公平性 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } 复制代码
public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ··· } public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ··· } 复制代码
可以看到, ReentrantReadWriteLock
锁的主体依然是 Sync
,读锁和写锁都依赖与 Sync
来实现,它们使用的是同一个锁,只是在获取锁和释放锁的方式不同。
在 ReentrantLock
中使用一个 int
型变量 state
来表示同步状态,该值表示锁被一个线程重复获取的次数,而读写锁中需要一个 int
型变量上维护多个读线程和一个写线程的状态。
所以它将该变量分为两部分,高 16
位表示读,低 16
位表示写。分割之后通过位运算来计算读锁和写锁的状态。
static final int SHARED_SHIFT = 16; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 读锁状态 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 写锁状态 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 复制代码
另外, ReentrantReadWriteLock
还提供了返回内部工作状态的方法。
方法名 | 描述 |
---|---|
getReadLockCount | 返回读锁被获取的次数(锁重入次数也会加 1) |
isWriteLocked | 返回写锁是否被获取 |
getWriteHoldCount | 返回当前线程获取写锁的次数 |
getReadHoldCount | 返回当前线程获取读锁的次数 |
前面三个方法都比较简单:
final int getReadLockCount() { return sharedCount(getState()); // c >>> SHARED_SHIFT } final boolean isWriteLocked() { return exclusiveCount(getState()) != 0; } // 由于写锁只会被一个线程获取 // 所以,如果是当前线程,则通过 c & EXCLUSIVE_MASK 直接计算即可 final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; } 复制代码
最后一个方法,首先来看一下 Sync
类的几个属性:
// 当前线程持有的读锁数量 private transient ThreadLocalHoldCounter readHolds; // HoldCounter 的一个缓存,减少 ThreadLocal.get 的次数 private transient HoldCounter cachedHoldCounter; // 第一个获取到读锁的读线程 private transient Thread firstReader = null; // 第一个读线程持有的读锁数量 private transient int firstReaderHoldCount; // 上面三个都是为了提高效率,如果读锁仅有一个或有缓存了,就不用去 ThreadLocalHoldCounter 获取 // 读线程持有锁的计数器,需要与线程绑定 static final class HoldCounter { int count = 0; // 持有线程 id,在释放锁时,判断 cacheHoldCounter 缓存的是否是当前线程的读锁数量 final long tid = getThreadId(Thread.currentThread()); } // 通过 ThreadLocal 将 HoldCounter 绑定到线程上 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } 复制代码
getReadHoldCount()
方法用于获取当前线程获取读锁的次数。
final int getReadHoldCount() { // 如果读锁被获取的次数为 0,那么当前线程获取读锁的次数肯定也为 0 if (getReadLockCount() == 0) return 0; Thread current = Thread.currentThread(); // 如果当前线程是第一个获取读锁的线程,则直接返回 firstReaderHoldCount if (firstReader == current) return firstReaderHoldCount; // 缓存的 HoldCounter 绑定的线程是否是当前线程,如果是则直接返回读锁数量 HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == getThreadId(current)) return rh.count; // 否则从 ThreadLocalHoldCounter 中获取 HoldCounter,再获取读锁数量 int count = readHolds.get().count; if (count == 0) readHolds.remove(); // 防止内存泄露 return count; } 复制代码
写锁是一个支持可重入的排它锁。
WriteLock
的 lock()
方法如下,可以看到,这里调用的是 AQS
的独占式获取锁方法。
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
在获取写锁时,调用 AQS
的 acquire
方法,其中又调用了 Sync
自定义组件实现的 tryAcquire
方法:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); // 写锁个数 if (c != 0) { // c != 0 && w == 0 表示有线程获取了读锁 // 或者当前线程不是持有锁的线程,则失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果写锁会超过范围,抛出异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 当前线程获取写锁,可重入 setState(c + acquires); return true; } // 如果没有任何线程获取读锁和写锁,当前线程尝试获取写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 复制代码
这里如果有线程获取了读锁,则当前线程不能再获取写锁。因为读写锁需要确保获取写锁的线程的操作对于读锁的线程是可见的,如果存在读锁时再允许获取写锁,则获取读锁的线程可能无法得知当前获取写锁的线程的操作。
判断获取写锁的线程是否应该被阻塞,公平锁和非公平中实现不同。
static final class NonfairSync extends Sync { // 对于非公平锁,直接返回 false final boolean writerShouldBlock() { return false; } } static final class FairSync extends Sync { // 对于公平锁,则需要判断是否有前驱节点 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 复制代码
unlock()
方法如下,其中调用了 AQS
的 release
方法:
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒后继节点 unparkSuccessor(h); return true; } return false; } 复制代码
release()
方法首先调用 Sync
中的 tryRelease()
方法,然后唤醒后继节点:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } 复制代码
该方法首先减少写状态值,如果写状态为 0
,则表示写锁已经被释放,将持有锁的线程设置为 null
,并更改同步状态值。
读锁是一个支持可重入的共享锁,它能被多个线程同时获取。
ReadLock
的 lock()
方法如下,其中调用了 AQS
的共享式获取锁方法:
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 复制代码
在 acquireShared
方法中,又调用了 Sync
的 tryAcquireShared
方法:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 存在写锁,并且写锁被其他线程持有,则失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 获取读锁的线程是否需要阻塞 // 读锁小于 MAX_COUNT(1 << 16) // 使用 CAS 更新状态为 c + 1 << 16 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 没有读锁 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 读锁仅有一个 firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //更新缓存 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 将 HoldCounter 设置到 ThreadLocal 中 readHolds.set(rh); // 读锁数量加 1 rh.count++; } return 1; } return fullTryAcquireShared(current); } 复制代码
该方法中,如果满足上述三个条件,则获取读锁成功,会对 firstReaderHoldCount
等值进行设置,稍后详细介绍。如果不满足时,会调用 fullTryAcquireShared
方法:
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 如果写锁不为 0 if (exclusiveCount(c) != 0) { // 当前线程不是持有写锁的线程,返回 if (getExclusiveOwnerThread() != current) return -1; // 读锁是否需要被阻塞 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 读锁超出最大范围 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 使用 CAS 更新状态值,尝试获取锁 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 复制代码
判断读锁是否应该被阻塞,公平锁和非公平锁实现不同,
static final class NonfairSync extends Sync { // 对于非公平锁,需要判断同步队列中第一个结点是否是独占式(写锁) final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } static final class FairSync extends Sync { // 对于公平锁,需要判断是否有前驱节点 final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } 复制代码
ReadLock
的 unlock
方法如下,其中调用的是 AQS
的共享式释放锁方法:
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 复制代码
releaseShared
方法中又调用了 Sync
的 tryReleaseShared
方法:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 如果当前线程是第一个获取读锁的线程 if (firstReader == current) { if (firstReaderHoldCount == 1) // 仅获取了一次,将 firstReader 置为 null firstReader = null; else // 否则将 firstReadHoldCount 减 1 firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 缓存如果有效,直接使用;否则重新获取 rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 循环使用 CAS 更新状态值 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
ReentrantReadWriteLock
允许锁降级,也就是写锁降级为读锁。它是指先获取写锁,再获取到读锁,最后释放写锁的过程。但锁升级是不允许的,也就是先获取读锁,再获取写锁,最后释放读锁的过程。
在获取读锁的 tryAcquireShared
方法中:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 存在写锁,并且写锁被其他线程持有,则失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); ··· } 复制代码
如果存在写锁,并且写锁被其他线程持有时,才会失败。说明如果当前线程持有了写锁,也可以再获取读锁。最后释放写锁,这称为锁降级。
为何要这样做呢?试想如果一个线程获取了写锁,这个时候其他任何线程都是无法再获取读锁或写锁的,然后该线程再去获取读锁,也就不会产生任何的竞争。通过这种锁降级机制,就不会有释放写锁后,再去竞争获取读锁的情况,避免了锁的竞争和线程的上下文切换,也就提高了效率。