今天看Jraft的时候发现了很多地方都用到了读写锁,所以心血来潮想要分析以下读写锁是怎么实现的。
先上一个doc里面的例子:
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { //加上一个读锁 rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock //必须在加写锁之前释放读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. //双重检查 if (!cacheValid) { //设置值 data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock //锁降级,反之则不行 rwl.readLock().lock(); } finally { //释放写锁,但是仍然持有写锁 rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { //释放读锁 rwl.readLock().unlock(); } } }}
我们一般实例化一个ReentrantReadWriteLock,一般是调用空的构造器创建,所以默认使用的是非公平锁
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { //默认使用的是NonfairSync sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } //分别调用writeLock和readLock会返回读写锁实例 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; //位移量 //在读写锁中,state是一个32位的int,所以用state的高16位表示读锁,用低16位表示写锁 static final int SHARED_SHIFT = 16; //因为读锁是高16位,所以用1向左移动16位表示读锁每次锁状态变化的量 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //最大的可重入次数 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //用来计算低16位的写锁状态 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //获取高16位读锁state次数,重入次数 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //获取低16位写锁state次数,重入次数 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //用来记录每个线程持有的读锁数量 static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; // 用于缓存,记录"最后一个获取读锁的线程"的读锁重入次数 private transient HoldCounter cachedHoldCounter; // 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量 private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { // 初始化 readHolds 这个 ThreadLocal 属性 readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } .... }
//readLock#lock public void lock() { //这里会调用父类AQS的acquireShared,尝试获取锁 sync.acquireShared(1); } //AQS#acquireShared public final void acquireShared(int arg) { //返回值小于 0 代表没有获取到共享锁 if (tryAcquireShared(arg) < 0) //进入到阻塞队列,然后等待前驱节点唤醒 doAcquireShared(arg); }
这里的tryAcquireShared是调用ReentrantReadWriteLock的内部类Sync的tryAcquireShared的方法
protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //获取AQS中的state属性值 int c = getState(); //exclusiveCount方法是用来获取写锁状态,不等于0代表有写锁 if (exclusiveCount(c) != 0 && //如果不是当前线程获取的写锁,那么直接返回-1 getExclusiveOwnerThread() != current) return -1; //获取读锁的锁定次数 int r = sharedCount(c); // 读锁获取是否需要被阻塞 if (!readerShouldBlock() && r < MAX_COUNT && //因为高16位代表共享锁,所以CAS需要加上一个SHARED_UNIT compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { //记录一下首次读线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //firstReader 重入获取读锁 firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; // 如果 cachedHoldCounter 缓存的不是当前线程,设置为缓存当前线程的 HoldCounter if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } // return 大于 0 的数,代表获取到了共享锁 return 1; } return fullTryAcquireShared(current); }
//NonfairSync#readerShouldBlock final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } //AQS final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
在非公平模式中readerShouldBlock会调用AQS的方法,判断当前头节点的下一个节点,如果不是共享节点,那么readerShouldBlock就返回true,读锁就会阻塞。
//FairSync#readerShouldBlock final boolean readerShouldBlock() { return hasQueuedPredecessors(); } //AQS public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
在公平模式中会去看看队列里有没有其他元素在队列里等待获取锁,如果有那么读锁就进行阻塞
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //检查是否写锁被占用 if (exclusiveCount(c) != 0) { //被占用,但是占用读锁线程不是当前线程,返回阻塞 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. //检查读锁是否应该被阻塞 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly //首次读线程是当前线程,下面直接CAS if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { //设置最后一次读线程 rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) //如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么执行 remove readHolds.remove(); } } //如果最后读取线程次数为0,那么阻塞 if (rh.count == 0) return -1; } } //如果读锁重入次数达到上限,抛异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //尝试CAS读锁重入次数加1 if (compareAndSetState(c, c + SHARED_UNIT)) { // 这里 CAS 成功,那么就意味着成功获取读锁了 // 下面需要做的是设置 firstReader 或 cachedHoldCounter if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 下面这几行,就是将 cachedHoldCounter 设置为当前线程 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } // 返回大于 0 的数,代表获取到了读锁 return 1; } } }
这个方法主要是用来处理重入锁操作的。首先校验一下写锁是否被占用,如果没有被占用则判断当前线程是否是第一次读线程,如果不是则判断最后一次读线程是不是当前线程,如果不是则从readHolds获取,并判断HoldCounter实例中获取读锁次数如果为0,那么就不是重入。
如果可以判断当前线程是重入的,那么则对state高16进行加1操作,操作成功,则对firstReader或cachedHoldCounter进行设置,并返回1,表示获取到锁。
到这里我们看完了tryAcquireShared方法,我再把acquireShared方法贴出来:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
下面看doAcquireShared方法:
private void doAcquireShared(int arg) { //实例化一个共享节点入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //获取当前节点的上一个前置节点 final Node p = node.predecessor(); //前置节点如果是头节点,那么代表队列里没有别的节点,先调用tryAcquireShared尝试获取锁 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //醒队列中其他共享节点 setHeadAndPropagate(node, r); p.next = null; // help GC //响应中断 if (interrupted) selfInterrupt(); failed = false; return; } } //设置前置节点waitStatus状态 if (shouldParkAfterFailedAcquire(p, node) && //阻塞当前线程 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared方法中会实例化一个共享节点并入队。如果当前节点的前置节点是头节点,那么直接调用tryAcquireShared先获取一次锁,如果返回大于0,那么表示可以获取锁,调用setHeadAndPropagate唤醒队列中其他的线程;如果没有返回则会调用shouldParkAfterFailedAcquire方法将前置节点的waitStatus设值成SIGNAL,然后调用parkAndCheckInterrupt方法阻塞
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //把node节点设值为头节点 setHead(node); //因为是propagate大于零才进这个方法,所以这个必进这个if if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { //获取node的下一个节点 Node s = node.next; //判断下一个节点是否为空,或是共享节点 if (s == null || s.isShared()) //往下看 doReleaseShared(); } }
这个方法主要是替换头节点为当前节点,然后调用doReleaseShared进行唤醒节点的操作
private void doReleaseShared() { for (;;) { Node h = head; // 1. h == null: 说明阻塞队列为空 // 2. h == tail: 说明头结点可能是刚刚初始化的头节点, // 或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了 // 所以这两种情况不需要进行唤醒后继节点 if (h != null && h != tail) { int ws = h.waitStatus; //后面的节点会把前置节点设置为Node.SIGNAL if (ws == Node.SIGNAL) { //1 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 unparkSuccessor(h); } else if (ws == 0 && //2 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //3 如果被唤醒的节点已经占领head了,那么继续循环,否则跳出循环 if (h == head) // loop if head changed break; } }
private void unparkSuccessor(Node node) { //如果当前节点小于零,那么作为头节点要被清除一下状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待 // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread); }
到这里加读锁的代码就讲解完毕了
//ReadLock public void unlock() { sync.releaseShared(1); } // Sync public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
我们先看tryReleaseShared
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //如果当前是firstReader,那么需要进行重置或重入减一 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 这一步将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了 readHolds.remove(); //unlock了几次的话会抛异常 if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // nextc 是 state 高 16 位减 1 后的值 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了 // 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程 return nextc == 0; } }
这个读锁的释放,主要就是将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉。然后是在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程,因为只有读锁是不会被阻塞的,所以等待的线程只可能是写锁的线程。
//WriteLock public void lock() { sync.acquire(1); } //sync public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //AQS protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //获取state的低16位 int w = exclusiveCount(c); //不为零说明读锁或写锁被持有了 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 看下这里返回 false 的情况: // c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有) // c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁 // 也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 这里不需要 CAS,仔细看就知道了,能到这里的,只可能是写锁重入,不然在上面的 if 就拦截了 setState(c + acquires); return true; } //检查写锁是否需要block if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //走到这里说明写锁不需要block,并且CAS成功了 setExclusiveOwnerThread(current); return true; }
我们来看看writerShouldBlock
//NonfairSync final boolean writerShouldBlock() { return false; // writers can always barge } //FairSync final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
如果是非公平模式,那么 lock 的时候就可以直接用 CAS 去抢锁,抢不到再排队
如果是公平模式,那么如果阻塞队列有线程等待的话,就乖乖去排队
public void unlock() { sync.release(1); } //sync public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //如果独占锁释放"完全",唤醒后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //Sync protected final boolean tryRelease(int releases) { //检查一下持有所的线程是不是当前线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //将state减1 int nextc = getState() - releases; //查看低16位是否为0 boolean free = exclusiveCount(nextc) == 0; if (free) //如果为0,那么说明写锁释放 setExclusiveOwnerThread(null); //设置状态 setState(nextc); return free; }