本篇基于 Java 基础–队列同步器(AQS) ),对重入锁(ReentrantLock)和读写锁(ReentrantReadWriteLock)进行解析。
重入锁(ReentrantLock)表示支持一个线程对资源的重复加锁,也就是说一个线程可以多次获取到同一个锁。重入锁实现了 Lock 接口,内部实现是基于队列同步器(AbstractQueuedSynchronizer):
public class ReentrantLockimplements Lock,java.io.Serializable{ abstract static class Syncextends AbstractQueuedSynchronizer{...} // 非公平 Sync static final class NonfairSyncextends Sync{...} // 公平 Sync static final class FairSyncextends Sync{...} }
重入锁还支持获取锁时的公平性与非公平性选择,可通过其构造方法进行设置,默认是非公平的。
// 构造方法 public ReentrantLock(){ sync = new NonfairSync(); } public ReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); }
关于公平与非公平:如果在绝对时间上,先获取锁的请求一定是先被满足的,那么这个锁就是公平的,反之就是非公平的。也就是说如果在同步队列中排队时间最长或者排在最前面的节点先获取到同步状态,那么就是公平的。
重入锁默认是非公平性的,所以先看看非公平性的实现:
static final class NonfairSyncextends Sync{ private static final long serialVersionUID = 7316153563782823691L; final void lock(){ // 先判断是否可以获取同步状态(锁) if (compareAndSetState(0, 1)) // 缓存当前线程,用来判断下次获取锁的是不是该线程 setExclusiveOwnerThread(Thread.currentThread()); else // 内部调用 tryAcquire() acquire(1); } protected final boolean tryAcquire(int acquires){ return nonfairTryAcquire(acquires); } }
NonfairSync 只有上面 10 来行代码,当获取同步状态失败后就通过 acquire(1)
去获取同步状态,所以看看 nonfairTryAcquire(int acquires)
方法的实现:
final boolean nonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 这部分和前面的一样 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 此时表示该锁已经被当前线程获取过了 // 记录被获取的次数 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 修改同步状态(加 1) setState(nextc); return true; } return false; }
nonfairTryAcquire(int acquires)
方法中也比较简单,首先判断同步状态是否被成功获取过,如果已经被成功获取过了就判断之前获取成功的线程和当前线程是否一样,如果一样就表示当前线程再次获取成功,并给同步状态加 1,此时的同步状态表示被成功获取的次数。
非公平性看完了就来看看公平性的实现:
static final class FairSyncextends Sync{ private static final long serialVersionUID = -3000897897090466540L; final void lock(){ // 内部会调用 tryAcquire() acquire(1); } protected final boolean tryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // hasQueuedPredecessors() 表示是否还有前驱节点 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 此时表示该锁已经被当前线程获取过了 // 记录被获取的次数 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 修改同步状态(加 1) setState(nextc); return true; } return false; } }
可以发现公平与非公平获取锁的区别主要就是 hasQueuedPredecessors()
这句代码,意思是当前节点是否还有前驱节点,因为公平性获取锁表示排在前面的节点一定是先获取到锁的,所以这里多了一个判断。
同样,它们都是通过改变同步状态(加 1)来表示被获取的次数,每获取成功一次就修改一次(加 1)。
在公平性与非公平性获取锁的时候,如果当前线程已经获取过锁了,那么就修改同步状态(加 1),所以当释放锁的时候就必然的需要减少同步状态。重入锁可以通过 unlock()
方法来释放锁:
public void unlock(){ // 释放一次锁 sync.release(1); } public final boolean release(int arg){ if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒处于阻塞的线程 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases){ // 每释放一次锁就减 1 int c = getState() - releases; // 判断当前线程和拥有该锁的线程是否一样 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 当同步状态恢复到 0 才表示真正释放锁完成 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
通过上面可以看到 tryRelease()
方法才是核心,每次调用 unlock()
都会修改同步状态(减 1),当同步状态恢复到 0 时才算是真正的释放锁成功。
可以发现,重入锁是个排他锁,当前线程可以多次获取,但是同一时刻只有一个线程能获取成功。
读写锁维护了一对锁,一个读锁和一个写锁,在同一个时刻可以允许多个读线程访问,但是在写线程访问时,除了当前线程,其他所有线程的读/写操作均被阻塞。所以读锁是个共享锁,而写锁是个独占锁(排他锁),但它们都是重入锁。
读写锁和重入锁类似,都是用同步状态来表示获取锁的次数,而读写锁的表示更复杂些:用同步状态的高 16 位表示读锁获取次数(简称读次数),用低 16 位表示写锁获取次数(简称写次数):
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 65536 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 65535 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 65535 // 读锁获取次数(简称读次数) static int sharedCount(int c){ return c >>> SHARED_SHIFT; } // 写锁获取次数(简称写次数) static int exclusiveCount(int c){ return c & EXCLUSIVE_MASK; }
读写锁实现了 ReadWriteLock 接口,ReadWriteLock 定义:
public interface ReadWriteLock{ LockreadLock(); LockwriteLock(); }
读写锁内部实现是基于队列同步器(AbstractQueuedSynchronizer),同时也支持获取锁时的公平性与非公平性选择,可通过其构造方法进行设置,默认是非公平的。
public class ReentrantReadWriteLockimplements ReadWriteLock,java.io.Serializable{ // 内部类读锁 private final ReentrantReadWriteLock.ReadLock readerLock; // 内部类写锁 private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock(){ this(false); } public ReentrantReadWriteLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); // 创建读/写锁对象 readerLock = new ReadLock(this); writerLock = new WriteLock(this); } // 获取写锁对象 public ReentrantReadWriteLock.WriteLockwriteLock(){ return writerLock; } // 获取读锁对象 public ReentrantReadWriteLock.ReadLockreadLock(){ return readerLock; } abstract static class Syncextends AbstractQueuedSynchronizer{...} // 公平 Sync static final class FairSyncextends Sync{...} // 非公平 Sync static final class NonfairSyncextends Sync{...} }
构造方法中默认创建了读/写锁对象,所以调用 readLock()
和 writeLock()
就可以获取读锁和写锁。
读写锁默认也是非公平性的,所以先看看非公平性的实现:
static final class NonfairSyncextends Sync{ private static final long serialVersionUID = -8159625535654395037L; // 是否阻塞写线程 final boolean writerShouldBlock(){ return false; } // 是否阻塞读线程 final boolean readerShouldBlock(){ return apparentlyFirstQueuedIsExclusive(); } } // 判断队列的第一个(头节点的后继节点)节点是否独占式,即是否是写线程 final boolean apparentlyFirstQueuedIsExclusive(){ Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
writerShouldBlock()
方法直接返回了 false,而 readerShouldBlock()
方法稍微做了下判断,按照重入锁的非公平性实现,此时应该直接返回 false,为什么要有这个判断呢?因为读锁是可以被多个线程获取的,如果某个线程又获取了写锁并更新了数据,那么这个更新对其他获取读锁的线程是不可见的。
再看看公平性的实现:
static final class FairSyncextends Sync{ private static final long serialVersionUID = -2274990926593161451L; // 是否阻塞写线程 final boolean writerShouldBlock(){ // 是否还有前驱节点 return hasQueuedPredecessors(); } // 是否阻塞读线程 final boolean readerShouldBlock(){ // 是否还有前驱节点 return hasQueuedPredecessors(); } }
这个就简单了,直接返回是否还有前驱节点,是我们想要的。
读写锁中用 WriteLock 来表示写锁,它是读写锁的内部类,定义如下:
public static class WriteLockimplements Lock,java.io.Serializable{...}
通过写锁的 lock()
方法可以获取写锁,下面先看看写锁的获取:
public void lock(){ sync.acquire(1); } public final void acquire(int arg){ if (!tryAcquire(arg) && // 构建独占式节点加入同步队列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires){ Thread current = Thread.currentThread(); int c = getState(); // 写次数 int w = exclusiveCount(c); // 锁是否被获取过 if (c != 0) { // 同步状态不为 0 而写次数为 0,说明读锁被获取过 // 此时如果获取写锁的线程就是当前线程,那么仍然可以获取写锁,而如果是不同线程则不能获取写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 说明获取锁的次数是有上限的 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改同步状态 setState(c + acquires); return true; } // 是否阻塞写线程需要根据当前的获取策略:公平与非公平 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 缓存当前的写线程 setExclusiveOwnerThread(current); return true; }
在 tryAcquire(int acquires)
方法中, writerShouldBlock()
需要分以下情况:
所以,假如读/写锁被获取过了( c != 0
),此时如果读锁还没有被获取过或者被当前线程获取过,那么此次获取成功,否则此次获取失败;假如读/写锁都还没有被获取过( c == 0
),此时如果是公平获取,当存在前驱节点时获取失败,否则获取成功,如果是非公平获取,那么将获取成功。
st=>start: Start cond1=>condition: 锁被获取过 cond2=>condition: 读锁被其他线程获取过 cond3=>condition: 公平获取 cond4=>condition: 存在前驱节点 oper1=>operation: 获取失败 oper2=>operation: 获取成功 oper3=>operation: 获取失败 oper4=>operation: 获取成功 e=>end: End st->cond1 cond1(yes)->cond2 cond1(no)->cond3 cond3(yes)->cond4 cond3(no)->oper4 cond4(yes)->oper3 cond4(no)->oper4 cond2(yes)->oper1 cond2(no)->oper2 oper1->e oper2->e oper3->e oper4->e
调用写锁的 unlock()
方法可以释放写锁:
public void unlock(){ sync.release(1); } public final boolean release(int arg){ if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases){ // 判断缓存线程是否是当前线程 // 因为获取写锁之后,当前线程仍可以获取读/写锁,而其他线程都被阻塞 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 每释放一次锁就减 1 int nextc = getState() - releases; // 当写次数为 0 时才表示真正释放成功 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
写锁的释放相对简单,每释放一次锁同步状态就减 1,当写锁的获取次数为 0 时表示真正释放成功。
读写锁中用 ReadLock 来表示读锁,它也是读写锁的内部类,定义如下:
public static class ReadLockimplements Lock,java.io.Serializable{...}
通过读锁的 lock()
方法可以获取读锁:
public void lock(){ sync.acquireShared(1); } public final void acquireShared(int arg){ if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected final int tryAcquireShared(int unused){ Thread current = Thread.currentThread(); int c = getState(); // 其他线程已经获取了写锁,此时不能获取读锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 读次数 int r = sharedCount(c); // 是否阻塞读线程需要分公平与非公平情况 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 第一次获取读锁 // 缓存第一个读线程及该线程获取过次数 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 当前线程重复获取读锁,说明读锁是可重入的 firstReaderHoldCount++; } else { // 其他读线程到来时,说明读锁是共享的 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程的 cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
tryAcquireShared(int unused)
方法中, readerShouldBlock()
方法同样需要分情况:
这里出现了两个对象:cachedHoldCounter、readHolds,看看它们的定义:
static final class HoldCounter{ int count = 0; final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter>{ public HoldCounter initialValue(){ return new HoldCounter(); } } Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds }
根据 ThreadLocal 的特性,所以此处每个线程都有一个自己的 HoldCounter,HoldCounter 里面记录的是当前线程获取锁的次数和当前线程的 ID。
回到 tryAcquireShared(int unused)
方法,什么时候会进入 fullTryAcquireShared(current)
方法呢?
final int fullTryAcquireShared(Thread current){ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { // 其他线程已经获取了写锁,此时不能获取读锁 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 读次数是否达到了 65535 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改同步状态 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { // 第一次获取读锁 // 缓存第一个读线程及该线程获取过次数 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 当前线程重复获取读锁 firstReaderHoldCount++; } else { // 其他读线程到来时 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程的 cachedHoldCounter rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
fullTryAcquireShared(Thread current)
主要处理读线程被阻塞和 CAS 失败的情况,基本上和 tryAcquireShared(int unused)
方法差不多。
所以,整个 tryAcquireShared(int unused)
方法可以分为三部分:
exclusiveCount(c) != 0
调用读锁的 unlock()
方法可以释放读锁:
public void unlock(){ sync.releaseShared(1); } public final boolean releaseShared(int arg){ if (tryReleaseShared(arg)) { // 修改等待状态,唤醒后继节点 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused){ Thread current = Thread.currentThread(); // 当前线程是否是第一个获取读锁的线程 if (firstReader == current) { if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 修改其他线程的 readHolds HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // 获取读锁成功时 +SHARED_UNIT,所以此处要 -SHARED_UNIT int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 当读/锁都释放完后 nextc 才为 0 return nextc == 0; } }
每次释放读锁都会给相应的状态减 1,这个和写锁的释放一样,但是有一点不同的是读锁的释放需要判断写锁是否释放完成。
每次获取写锁的时候修改同步状态都是 +1,释放写锁的时候都是 -1;而获取读锁时是 +SHARED_UNIT,释放读锁是 -SHARED_UNIT。当一个线程获取了写锁后再获取了读锁,在释放锁时如果没有先释放写锁,那么 nextc == 0
将永远不会成立。
读写锁中还有个锁降级的概念,意思就是把写锁降级成为读锁。如果一个线程获取了写锁并释放后再去获取读锁,这个过程不能称为锁降级。锁降级是指在获取了写锁后,在释放写锁之前去获取读锁,然后才释放写锁。看个示例:
class CachedData{ final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); Object data; volatile boolean cacheValid; void processCachedData(){ rwl.readLock().lock(); if (!cacheValid) { // 必须先释放读锁 rwl.readLock().unlock(); // 锁降级从获取写锁开始 rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } // 释放写锁之前获取读锁 rwl.readLock().lock(); } finally { // 释放写锁 rwl.writeLock().unlock(); } } try { use(data); } finally { // 释放读锁 rwl.readLock().unlock(); } } }
上述示例中,在释放写锁之前去获取读锁,然后才释放写锁。这个过程其实就是:一个线程获取写锁后仍然可以获取读锁,而其他线程的读/写都不能获取。
读写锁不支持锁升级,也就是读锁升级成为写锁。因为读锁是共享的,可以被多个线程获取,如果多其中某个线程又获取了写锁并更新了数据,这个更新对其他获取读锁的线程是不可知的。其实从上面的分析获取写锁的过程也能发现:如果其他线程获取了读锁,那么写锁将获取失败。
重入锁和读写锁内部的功能实现都是基于队列同步器(AbstractQueuedSynchronizer),它们都是可重入的,即同一线程可以多次获取。但重入锁是排他锁,即同一时刻只允许一个线程获取成功;在读写锁中,读锁是共享锁,允许多个线程获取,而写锁是排他锁,同一时刻只允许一个线程获取成功。此外,当一个线程获取了写锁后还可以继续获取读/写锁,但是其他线程获取读/写锁将被阻塞。