观察这些类的源码,其实可以总结出一个应用 aqs
的 模式 。 下面用自定义实现一个 MutexLock
来演示这个模式。
public class MutexLock{ /* 1. 定义一个继承自aqs的静态内部类Sync 该类去覆盖实现tryAcquire/tryRelease 或者tryAcquireShared/tryReleasedShared 这里是独占式的互斥锁,因此只用实现前者 若是Semaphore或者CountdownLatch则只用实现后者 */ private static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if(arg != 1) throw new IllegalArgumentException; if(compareAndSetState(0, 1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { if(arg != 1) throw new IllegalArgumentException; if(getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { return getState() == 1; } final ConditionObject newCondition(){ return new ConditionObject(); } } /*2. 类内部持有一个Sync对象*/ private final Sync sync; public MutexLock() { this.sync = new Sync(); } /* 3. 根据类的每个方法的具体功能分别去调用Sync的acquire/release/try */ @Override public void lock() { sync.acquire(1); } @Override public void unlock() { sync.release(1); } } 复制代码
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
多线程计算,最后主线程将所有计算结果汇总,再计算。也就是主线程一定要保证上一步的计算中,每个线程都计算完毕,都执行完毕。有点类似join,不过是等待所有线程都结束,而不是某个特定的线程。
count--
CountdownLatch维护的Sync对像有了新的构造函数。state可以为int范围内的任意数。
Sync(int count) { setState(count); } 复制代码
CountdownLatch维护的Sync对象重写了aqs的共享式尝试获取同步状态的两个方法: tryAcquireShared
tryReleaseShared
//只有当state也就是countDownLatch中的count减少到0时 //才会返回正数 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
具体到 await方法
和 countDown
方法:
//选择使用可中断的acquire方法 //与aqs的acquire类似 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //每次只释放一个permit,也就是state只会减少1 public void countDown() { sync.releaseShared(1); } 复制代码
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each { acquire } blocks if necessary until a permit is available, and then takes it. Each { release } adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the { Semaphore } just keeps a count of the number available and acts accordingly.
控制并发访问的线程个数,相当于操作系统里的信号量.
维护了两个 aqs
子类对象, FairSync
, NonFairSyn
, 它们都继承自 Sync
。他们共享父类的 tryReleaseShared
方法:
protected final boolean tryReleaseShared(int releases) { //经典的CAS死循环,类比原子类的getAndIncrement() //保证多个线程同时释放对象的线程安全 for (;;) { int current = getState(); //归还releases数量的permits到state中 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } 复制代码
公平与非公平主要是判断对锁 新的申请 , 是不是要排队 。所以他们都各自重写了 aqs
的 tryAcquireShared
方法。
protected int tryAcquireShared(int acquires) { for (;;) { /* 如果当前aqs维护的CLH队列中有在等待的节点 并且这个节点的下一个节点的线程不是当前线程 那么会直接失败,失败后会怎样呢? 参考acquireShared的源码 尝试失败后会直接加入队列的尾部(addWaiter()) */ if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 复制代码
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } //父类Sync中的方法 final int nonfairTryAcquireShared(int acquires) { //还是经典的CAS循环 for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 复制代码
NonFair版本中调用了父类中的 nonfairTryAcquireShared
,这个方法和 Fair 版本的 唯一区别 就是少了对 hasQueuedPredecessors()
的判断。 也就是说它会不断的通过CAS操作去设置 state
,一旦设置成功, remaining >= 0
时,它会直接忽略CLH队列中等待的节点, 优先 的进入临界区。 设置失败后( remaining < 0
),那么还是和 Fair 版本的一样,也得 乖乖的去排队 。
那么具体到 Semaphore
的核心方法上:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void release() { sync.releaseShared(1); } 复制代码
该锁对比文章开头自定义的MutexLok, 增加了了公平与非公平锁的功能,还实现了 可重入性 。 关于公平与非公平,上面的 Semaphore
已经提到过,只是 tryAcquire
方法中有没有对 hasQueuedPredecessors
加以判断。 那么我们来重点关注一下它的 可重入性 。
可重入性,也就是同一个线程在已经获取锁的情况下,可以多次的再次获取锁。 不可重入的锁,在递归调用的方法上加锁,就会出现死锁。 因此出现了可重入锁。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果当前没有任何线程获取该锁 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //该锁已经被某个线程获得 //判断这个线程是不是当前线程,如果是,那么可以再次的获取锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 复制代码
读写分离的、 可重入 的锁。 使用它可以提高并发的 吞吐量 ,提高了 读 取操作的效率。
将aqs中的 int
型变量 state
划分成了两个十六位的 unsigned short
。
同时写了两个快捷获取读锁/写锁状态的函数: sharedCount(int c)
和 exclusiveCount(int c)
。
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 复制代码
先来看看 读锁的获取 。
//传入的int型参数,没有被这个函数用过 //虽然unused, 但为了维持这个接口还是不改变函数签名 protected final int tryAcquireShared(int unused) { /* * 1. 如果其他线程持有写锁,失败(返回小于0的值) * 2. 否则,这个线程是一个可以去锁定写锁状态的线程 * 那么去看看它由于排队的策略 * 以及避免饥饿的启发式算法,它是否应该阻塞。 * 重入性请求被延迟到full version处理。 * 3. 如果第二步失败,执行full version */ Thread current = Thread.currentThread(); int c = getState(); //其他线程持有写锁,直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); /* 去往full version 的三个条件: 1. 根据启发式的避免饥饿算法,当前读者应当阻塞 2. 当前读者数量超过上限 3. CAS设置失败 */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { //ThreadLocal线程封闭,让每个线程可以获取当前获得锁的数量 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } 复制代码
读锁的释放相对简单一些。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //HoldCount相关操作 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //释放锁的核心操作CAS循环 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) /* 读锁的释放不会对读者产生影响 但它也许会唤醒写者 */ return nextc == 0; } } 复制代码
写锁的获取。
protected final boolean tryAcquire(int acquires) { /* 1. 读锁不为空,或者写锁不为空并且持有线程不是当前线程,fail 2. 写锁饱和,fail.(发生原因:重入次数过多,超过1 << 16 - 1.) 3. 否则,它有机会去获取写锁 */ Thread current = Thread.currentThread(); int c = getState(); //w -> 写锁数量 int w = exclusiveCount(c); //state != 0 -> 必定存在写锁/或者读锁被某个线程持有 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } //state == 0 -> 没有任何线程持有读锁/写锁 //如果非公平锁,writerShouldBlock()始终返回false //公平锁,writerShouldBlock()会调用hasQueuedPredecessors()作为返回值 //如果当前写者不用阻塞那么就尝试去CAS设置state if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 复制代码
protected final boolean tryRelease(int releases) { //如果当前线程不是独占的获取了锁 //就没有资格释放锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } 复制代码
关于 公平性 ,还有 写者饥饿 的问题要具体看NonFairSync和FairSync对读者/写者应否阻塞的控制。 NonFair版本: 写者优先 。
static final class NonfairSync extends Sync { final boolean writerShouldBlock() { return false; // 写者总是插队 } final boolean readerShouldBlock() { /* 启发式的避免饥饿的方法 */ return apparentlyFirstQueuedIsExclusive(); } } //父类aqs中的方法 final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; /* 返回为假的四种情况 1. 队列中首节点为空 -> 就是没有等待的节点,那么不需要阻塞读者线程 2. 首节点的下一个节点为空 -> 只有一个等待的节点,那么也不需要阻塞 3. 首节点的下一个节点是共享模式 -> 队列中还有未出队的读者线程 4. 下一个节点中没有线程或者被中断 */ return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } 复制代码
Fair版本: 读者和写者公平排队。
static final class FairSync extends Sync { final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } 复制代码
在《Java 并发编程的艺术》一书中说道:
如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。
并且书中是用数据可见性来解释锁降级的必要性的。 而得到这个结论的依据是 “如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程 T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新”。 那么就有一个问题: 如果另一个线程获取了写锁(并修改了数据),那么这个锁就被独占了,没有任何其他线程可以读到数据,谈什么 “感知数据更新”?
我的理解是,锁降级只是一种 特殊的重入机制 。相比释放写锁后再获取读锁这样的 手动降级 ,有两个最明显的优点。
Condition其实是一个 接口 。
aqs中的 内部类ConditionObject 实现了这个接口。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //fullyRelease(node)将所有的state释放掉 //相当于隐式的释放掉所有的锁 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //当线程已经转移到Sync队列上以后正常的去尝试获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 复制代码
唤醒线程:其实把线程从 condition queue 转移到了 sync queue 。
/** * 将节点从condition queue 转移到 sync queue中。 * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node); 复制代码
并且 Lock
接口定义了一个 newCondition()
方法,规定所有的锁要提供一个获取与这个锁对应的Condition对象。 ReentrantLock的实现中,在同步器Sync中增加了一个 newCondition
方法, 该方法新建一个ConditionObject对象返回。 注意,这个ConditionObject是aqs的一个 内部类 ,通过这个ConditionObject可以访问aqs的 任何成员以及方法 。 这也是使用 内部类 一个最大的 优点 。
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
和CountdownLatch比较类似,但CyclicBarrier更加注重的是集合内的线程同步,线程组内的所有线程都必须等待组内其他线程运行到一个 barrier point
,才能继续执行。能够处理更加 复杂的场景 。并且CyclicBarrier内有一个Generation对象,可以 一代一代的重用 下去。
CyclicBarrier维护了一个ReentrantLock对象 lock
,以及一个由该对象生成的Condition对象 trip
。 通过这两个对象可以保证对 count
操作的线程安全,可以控制 线程之间 的 阻塞与唤醒 。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } 复制代码
该方法直接调用了核心代码dowait()方法,并且指定false, 也就是不支持超时返回,时间为0L(unused)。
/** * 主要的barrier代码,包含了各种策略 */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //lock保证对CyclicBarrier对象的线程安全 lock.lock(); //这个try块内容非常多 //包括count--操作以及循环阻塞的过程 try { //中断或者barrier损坏,抛出对应异常 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //没有损坏则将count-- //因为加了锁,所以不用担心count数据线程不安全 //index -> 可以推断到达barrier的先后次序 int index = --count; //index == 0 -> 这是最后到达barrier的线程 // -> tripped if (index == 0) { // tripped //根据初始化的参数决定是否执行barrierCommand boolean ranAction = false; try { final Runnable command = barrierCommand; //command != null //初始化时已经设置了barrierCommand,需要在此处执行 if (command != null) command.run(); ranAction = true; /* //这一个generation结束,开始新的一轮计数同步 //并且生产一个新的Generation对象 private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); } */ nextGeneration(); //dowait()返回 //意味着所有线程可以接着执行下面的代码了 return 0; } finally { //try块中的ranAction没有得到执行 //就自动将这个barrier中断,可能是因为 //command.run() 被中断或者异常 if (!ranAction) breakBarrier(); } } //循环直到所有线程到达barrier //或者barrier被中断,或者超时 for (;;) { try { //如果没有时间限制 //无限等待 //trip是一个Condition接口的对象 if (!timed) trip.await(); //有时间限制,超时等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } //相应中断相关操作 catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); //在执行过nextGeneration后 //这个全局的generation对象被赋予了新的值 //当前线程接收到nextGeneration中的signalAll()方法后 //返回当初记录它来到barrier的次序 if (g != generation) return index; //超时,中断barrier,抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } 复制代码
看完这段代码,似乎还是有一个 不解之问 。就是 dowait()
方法的代码都是在 临界区 里执行的,包括 for
循环。 而我们的功能要求我们应该有多个线程并发的执行 dowait
方法,但是,当第一个线程 dowait
后,就 上锁 了,并且直到中断或者tripped才 释放锁 。 那么其他的线程根本就访问不了CyclicBarrier这个对象了,更别提一起到达 barrier
,将 count
减少到0了。
其实在for循环开始时,执行 trip.await()
时,会去执行ConditionObject里的 await()
方法,这个方法会首先执行一个 fullyRelease(Node node)
函数, 将 state
尝试置为0。就相当于是 隐式 的 释放 掉了当前 所有 的锁了。所以后续的线程调用 dowait()
方法,不会被锁在 临界区 外。因为一旦有线程 trip.wait()
, 锁的状态就会清零 。