上一篇 Java锁之ReentrantLock(二) 分析了ReentrantLock实现利器AQS同步器,通过AQS源码分析,我们知道了同步器通过sate状态进行锁的获取与释放,同时构造了双向FIFO双向链表进行线程节点的等待,线程节点通过waitStatus来判断自己需要挂起还是唤醒去获取锁。那么接下来我们继续分析ReentrantLock的读写锁,ReentrantReadWriteLock锁。
ReentrantReadWriteLock锁 实际也是继承了AQS类来实现锁的功能的,上一篇 Java锁之ReentrantLock(二) 已经详细解析过AQS的实现,如果已经掌握了AQS的原理,相信接下来的读写锁的解析也非常容易。
类 | 作用 |
---|---|
Sync, | 继承AQS,锁功能的主要实现者 |
FairSync | 继承Sync,主要实现公平锁 |
NofairSync | 继承Sync,主要实现非公平锁 |
ReadLock | 读锁,通过sync代理实现锁功能 |
WriteLock | 写锁,通过sync代理实现锁功能 |
我们先分析读写锁中的这4个int 常量,其实这4个常量的作用就是区分一个int整数的高16位和低16位的,ReentrantReadWriteLock锁还是依托于state变量作为获取锁的标准,那么一个state变量如何区分读锁和写锁呢?答案是通过位运算,高16位表示读锁,低16位表示写锁。如果对位运算不太熟悉或者不了解的同学可以看看这篇文章 《位运算》 。既然是分析读写锁,那么我们先从读锁和写锁的源码获取入手分析。
这里先提前补充一个概念:
写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁),这是因为读写锁要保持写操作的可见性,如果允许读锁在被获取的情况下对写锁的获取,那么正在运行的其他读线程无法感知到当前写线程的操作。因此,只有等待其他线程都释放了读锁,写锁才能被当前线程获取,而一旦写锁被获取,其他读写线程的后续访问都会被阻塞。
我们根据内部类WriteLock的调用关系找到源码如下,发现最终写锁调用的是 tryWriteLock()
(以非阻塞获取锁方法为例)
public boolean tryLock( ) { return sync.tryWriteLock(); } final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) {//状态不等于0,说明已经锁已经被获取过了 int w = exclusiveCount(c);//这里是判断是否获取到了写锁,后面会详细分析这段代码 // 这里就是判断是否是锁重入:2种情况 // 1.c!=0说明是有锁被获取的,那么w==0, // 说明写锁是没有被获取,也就是说读锁被获取了,由于写锁和读锁的互斥,为了保证数据的可见性 // 所以return false. //2. w!=0,写锁被获取了,但是current != getExclusiveOwnerThread() , // 说明是被别的线程获取了,return false; if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT)//判断是否溢出 throw new Error("Maximum lock count exceeded"); } // 尝试获取锁 if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }
tryReadLock()
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; //写锁被其他线程获取了,直接返回false int r = sharedCount(c); //获取读锁的状态 if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { //尝试获取读锁 if (r == 0) { //说明第一个获取到了读锁 firstReader = current; //标记下当前线程是第一个获取的 firstReaderHoldCount = 1; //重入次数 } else if (firstReader == current) { firstReaderHoldCount++; //次数+1 } else { //cachedHoldCounter 为缓存最后一个获取锁的线程 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //缓存最后一个获取锁的线程 else if (rh.count == 0)// 当前线程获取到了锁,但是重入次数为0,那么把当前线程存入进去 readHolds.set(rh); rh.count++; } return true; } } }
写锁的释放比较简单,基本逻辑和读锁的释放是一样的,考虑到篇幅,这次主要分析读锁的释放过程:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1)//如果是首次获取读锁,那么第一次获取读锁释放后就为空了 firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { //表示全部释放完毕 readHolds.remove(); //释放完毕,那么久把保存的记录次数remove掉 if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // nextc 是 state 高 16 位减 1 后的值 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) //CAS设置状态 return nextc == 0; //这个判断如果高 16 位减 1 后的值==0,那么就是读状态和写状态都释放了 } }
上面就是读写锁的获取和释放过程源码,暂时先分析简单的非阻塞获取锁方法,根据源码我们可以知道,写锁和读锁的是否获取也是判断状态是否不为0,写锁的状态获取方法是 exclusiveCount(c)
,读锁的状态获取方法是 sharedCount(c)
。那么我们接下来分析下这两个方法是如何对统一个变量位运算获取各自的状态的,在分析之前我们先小结下前面的内容。
a. 读写锁依托于AQS的State变量的位运算来区分读锁和写锁,高16位表示读锁,低16位表示写锁。
b. 为了保证线程间内容的可见性,读锁和写锁是互斥的,这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁。
我们再看看位运算的相关代码(我假设你已经知道了位运算的相关基本知识,如果不具备,请阅读 《位运算》 )
static final int SHARED_SHIFT = 16; //实际是65536 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //最大值 65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 同样是65535 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 获取读的状态 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 获取写锁的获取状态 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
我们按照图示内容的数据进行运算,图示的32位二进制数据为: 00000000000000100000000000000011
00000000000000100000000000000011 >>> 16
,无符号右移16位,结果如下: 00000000000000000000000000000010
,换算成10进制数等于2,说明读状态为: 2
00000000000000100000000000000011 & 65535
,转换成2进制运算为 00000000000000100000000000000011 & 00000000000000001111111111111111
最后与运算结果为: 00000000000000100000000000000011
,换算成10进制为3
不得不佩服作者的思想,这种设计在不修改AQS的代码前提下,仅仅通过原来的State变量就满足了读锁和写锁的分离。
锁降级是指写锁降级为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(之前拥有的写锁的过程)源码示例(来自于《java并发编程的艺术》):
public void processData(){ readLock.lock(); if(!update){ //必须先释放读锁 readLock.unlock(); //锁降级从写锁获取到开始 writeLock.lock(); try{ if(!update){ update =true; } readlock.lock(); }finally{ writeLock.unlock(); }//锁降级完成,写锁降级为读锁 } try{ //略 }finally{ readLock.unlock(); } }
上述示例就是一个锁降级的过程,需要注意的是update变量是一个volatie修饰的变量,所以,线程之间是可见的。该代码就是获取到写锁后修改变量,然后获取读锁,获取成功后释放写锁,完成了锁的降级。注意:ReentrantReadWriteLock不支持锁升级,这是因为如果多个线程获取到了读锁,其中任何一个线程获取到了写锁,修改了数据,其他的线程感知不到数据的更新,这样就无法保证数据的可见性。
cachedHoldCounter
, firstReader
firstReaderHoldCount
等属性,这些属性并没有对理解原理有多少影响,主要是提升性能的作用,所以本文没有讨论。