AQS(AbstractQueuedSynchronizer),队列同步器,Java中很多同步类都是基于AQS实现的,比如:ReentrantLock、Semaphore、CountDownLatch等。
AQS提供了一个实现阻塞锁和相关同步组件的框架,这个框架实现依赖于FIFO(first-in-first-out,先进先出)的等待队列来完成获取资源线程的排队工作。AQS是绝大多数同步器的基础,它的内部使用一个 int 型的变量表示同步状态(资源状态),AQS并不维护这个状态的值,只是提供了一系列的原子更新方法,getState、setState、compareAndSetState,而由继承AQS的子类去重写特定定的方法实现对共享资源的获取和释放,而其他的比如线程排队、线程挂起、线程唤醒出队等已经在AQS中实现好了(典型的设计模式中模板方法模式的使用)。
AQS定义了两种资源共享的方式:
上面提到过AQS需要继承它的子类去重新特定的方法,而不同方式(共享和独占)需要重写的方法也不一样,下面来看看AQS中定义的可以重写的方法:
看看tryAcquire(int arg),发现AQS中并没有把其定义为抽象方法,而是抛出UnsupportedOperationException异常,像上面所说的,不同的共享方式覆盖特定的方法,而不用实现其所不需要的方法,提供了灵活性。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
在具体分析之前,先看看AQS是如何实现FIFO的等待队列的,AQS的等待队列是”CLH” (Craig, Landin, and Hagersten) 队列的一种变体,关于”CLH”锁可以参照—— 自旋锁&CLH锁&MCS锁学习记录 。在每个节点中保存了前后节点的引用,节点中的“waitStatus”字段用于表示线程的状态。节点的前驱节点在释放资源的时候发出信号,通知节点可以竞争资源。如果线程是队列中的第一个线程,则可能尝试获取资源,但是并不保证一定成功,队列中的第一个线程只是具有了竞争资源的权利。
AQS注释中给出的CLH队列结构如下:
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
下面是AQS中等待队列实现的部分代码:
static final class Node { //用于共享模式的节点声明 static final Node SHARED = new Node(); //用于独占模式的节点声明 static final Node EXCLUSIVE = null; /**一下是waitStatus的值的状态*/ //表示线程被取消 static final int CANCELLED = 1; //等待触发 static final int SIGNAL = -1; //线程等待条件 static final int CONDITION = -2; //状态需要向后传播 static final int PROPAGATE = -3; //线程状态,具有上面4个状态 volatile int waitStatus; //前驱节点 volatile Node prev; //后继节点 volatile Node next; //当前线程 volatile Thread thread; Node nextWaiter; Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
waitStatus表示线程的状态:
在AQS类注释中,有一段独占锁代码的实现,以下面的例子来分析AQS的源码:
package io.github.brightloong.concurrent.aqs; import java.io.IOException; import java.io.ObjectInputStream; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * Created by BrightLoong on 2018/6/19. */ public class Mutex implements Lock, java.io.Serializable { // Our internal helper class private static class Sync extends AbstractQueuedSynchronizer { // Reports whether in locked state protected boolean isHeldExclusively() { return getState() == 1; } // Acquires the lock if state is zero public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // Provides a Condition Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // The sync object does all the hard work. We just forward to it. private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
看上面的代码,加锁lock()函数,调用的是sync.acquire(1),从acquire(int arg)函数入手,依次分析加锁过程中涉及到的函数代码。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
用于将当前线程加入到等待队列中,并返回当前节点。
private Node addWaiter(Node mode) { //按照给定的方式构造队列,上面提到的EXCLUSIVE(独占模式)和Share(共享模式) Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //快速入队,尝试将节点放到队尾,获取前驱节点,如果获取到了表示有线程占用了资源。 Node pred = tail; if (pred != null) { //将新建节点的前驱节点设置为获取到的前驱节点 node.prev = pred; //原子更新tail,将tail更新为当前节点,可能更新失败,因为在执行step1前可能有其他线程更新了tail if (compareAndSetTail(pred, node)) {//step1 pred.next = node; return node; } } //如果上面的操作执行失败,执行enq(node) enq(node); return node; }
将当前线程对应的节点添加到等待队列中,不断循环直到添加成功。
private Node enq(final Node node) { //不断循环,直到节点成功添加到队列中。 for (;;) { Node t = tail; //tail等于null,表示当前资源没有被占用 if (t == null) { //原子操作,初始化head节点,操作可能失败,因为可能有其他线程在这个时候已经初始化成功了。 if (compareAndSetHead(new Node())) //成功后将tail指向head tail = head; } else { //如果tail不等于null,将当前节点的前驱节点设置为tail。 node.prev = t; //将tail原子更新成当前节点,可能失败,因为tail可能被其他线程更新。 if (compareAndSetTail(t, node)) { //构建为双向队列 t.next = node; //添加成功,放回当前节点。 return t; } } } }
在队列中尝试获取资源,获取失败后判断是否真正需要进入阻塞状态,如果是将阻塞线程,直到被唤醒,并返回中断状态。不断循环,直到获取到资源或者进入阻塞状态等待被唤醒。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取前驱节点 final Node p = node.predecessor(); //如果前驱节点为head节点,表示具有竞争资源的机会,使用tryAcquire(arg)获取同步状态 //如果成功,表示获取到资源,将head设置成当前节点(所以可以认为head其实是当前获取到资源的线程节点,最后始终要执行到这里),返回中断状态为false。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果获取资源过程中执行失败,停止获取资源 if (failed) cancelAcquire(node); } }
获取资源失败后判断线程是否需要真正进入阻塞,只有在前驱节点waitStatus值为SIGNAL,当前节点的线程才需要进入阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前驱节点的waitStatus值 int ws = pred.waitStatus; //如果状态为SIGNAL,表示当前节点可以进入等待的状态,返回true。 if (ws == Node.SIGNAL) return true; if (ws > 0) { //4个状态中大于0的状态是CANCELLED,如果线程已经放弃了,那就是所谓的占着厕所不拉屎(话糙理不糙,哈哈) //这个时候就往前找,一直找到,直到找到状态正常的那个节点,并让自己排在它的后面。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果状态不是CANCELLED,也就是说状态正常,将前驱节点的状态设置为SIGNAL,有可能失败, //前驱状态有可能发生了改变 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
终于,在这里线程进入了阻塞,调用 LockSupport.park(this)阻塞线程
private final boolean parkAndCheckInterrupt() { //调用LockSupport.park()使线程进入waiting状态。 LockSupport.park(this); //当线程被唤醒,返回中断状态() return Thread.interrupted(); }
获取资源的流程如下所示,可以看到流程中有两个循环。
从release(int arg)函数开始,一步步分析独占方式锁的释放。
public final boolean release(int arg) { //调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }
唤醒后继节点
private void unparkSuccessor(Node node) { int ws = node.waitStatus; //修改当前节点的状态为0,允许失败 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //一般来说需要唤醒的就是下一个节点,但是下一个节点可能是null //或者其状态是取消状态,所以从tail开始先前查找,一直找到状态正常的节点。 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒线程 LockSupport.unpark(s.thread); }
AQS类注释中同样提供了一个简单的实现:
package io.github.brightloong.lab.concurrent.cas; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * BooleanLatch class * * @author BrightLoong * @date 2018/6/21 */ public class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
和CountDownLatch很类似可以多个线程await(),但是只需要调用一次signal() 就可以启动阻塞的线程。
共享模式从 acquireShared(int arg)入手来进行分析,与独占模式不同的是,共享模式下同一时刻可以有多个线程获取到资源执行。
获取同步状态。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
使用tryAcquireShared(arg) 获取资源状态
获取失败执行doAcquireShared(arg)
private void doAcquireShared(int arg) { //添加共享模式的节点到等待队列中,添加成功后返回当前节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //获取前驱节点 final Node p = node.predecessor(); //如果前驱节点是head,表示可能具有竞争资源的机会,可能head释放资源后来唤醒自己 if (p == head) { //尝试获取资源,获取同步状态。 int r = tryAcquireShared(arg); //大于等于0表示资源获取成功 if (r >= 0) { //更新头节点,如果还有资源可用,向后传播,唤醒后继节点。 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) //中断补偿 selfInterrupt(); failed = false; return; } } //获取资源失败后判断线程是否真正需要挂起,和独占方式相同 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //将当前节点设置为head setHead(node); //同步状态大于0,表示资源还可以被获取,唤醒后继节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
因为获取资源成功,将当前节点设置为head,并唤醒后继节点
相对的分析一下 acquireShared(int arg)
private void doReleaseShared() { //因为在共享模式下,获取同步状态和释放同步状态可能同时进行,用CAS保证原子性 for (;;) { //获取head Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //节点状态为SIGNAL,可以唤醒下一个节点 if (ws == Node.SIGNAL) { //设置waitStatus为初始状态0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继节点 unparkSuccessor(h); } //设置为PROPAGATE,表示可以向后传播 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //head变化了继续循环,共享模式下每唤醒一个后继节点,head//就会指向他,这样就可以保证唤醒所有的能获取到资源的后继节点 if (h == head) // loop if head changed break; } }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //唤醒后继节点,并且检查是否可以向后传播 doReleaseShared(); return true; } return false; }