AQS支持两种锁一种是独占锁(独占模式),一种是共享锁(共享模式)
ps:共享锁跟独占锁可以同时存在,比如比如读写锁,读锁写锁分别对应共享锁和独占锁
先来介绍一下AQS的几个主要成员变量
//AQS等待队列的头结点,AQS的等待队列是基于一个双向链表来实现的,这个头结点并不包含具体的线程是一个空结点(注意不是null) private transient volatile Node head; //AQS等待队列的尾部结点 private transient volatile Node tail; //AQS同步器状态,也可以说是锁的状态,注意volatile修饰证明这个变量状态要对多线程可见 private volatile int state; 复制代码
static final class Node { //下面两个属性都是说明这个结点是共享模式还是独占模式,具体怎么表示下面会分析 static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; //下面这四个属性就是说明结点的状态 static final int CANCELLED = 1;//由于超时或中断,节点已被取消 static final int SIGNAL = -1;//表示下一个节点是通过park阻塞的,需要通过unpark唤醒 static final int CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入 到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之 后才能返回) static final int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用 volatile int waitStatus; //前驱结点(双链表) volatile Node prev; //后继结点(双链表) volatile Node next; // 结点所包装的线程 volatile Thread thread; //对于Condtion表示下一个等待条件变量的节点;其它情况下用于区分共享模式和独占模式 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } //取得前驱结点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) //null的时候抛出异常 throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } 复制代码
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 复制代码
private Node addWaiter(Node mode) { //首先把当前竞争锁的线程包装成一个节点 Node node = new Node(Thread.currentThread(), mode); //如果以前的尾结点不为null(为null表示当前结点就是等待队列的第一个结点), 就将当前结点设置为尾结点,并通过cas操作更新tail尾新入队的结点 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //cas更新tail失败就以自旋的方式继续尝试入队列 enq(node); return node; } 复制代码
private Node enq(final Node node) { //死循环进行自旋操作 for (;;) { Node t = tail; //这里利用了延迟加载,尾结点为空的时候生成tail结点,初始化 if (t == null) { // 队列为空的时候自然尾结点就等于头结点,所以通过cas操作设置tail = head if (compareAndSetHead(new Node())) tail = head; } else { //尾结点初始化成功后就一直自旋的更新尾结点为当前结点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取结点的前驱结点 final Node p = node.predecessor(); /** 如果结点的前驱结点是head表示当前结点就是等待队列的第一个, 因为head结点并不指向一个实际的线程,所以这个时候就会执行下 tryAcquire函数尝试性的获取下锁。因为这个时候很有可能竞争成功 **/ if (p == head && tryAcquire(arg)) { /** 拿到锁之后就更新头结点为当前结点(这个结点的线程已经拿到锁了, 所以更新为头结点也不会继续参与锁竞争,再次提示头结点不会参加竞争) **/ setHead(node); // 设置以前的头结点不指向其他结点,帮助gc p.next = null; failed = false; return interrupted; } /** 上面前驱不是头结点或者获取锁失败就会执行shouldParkAfterFailedAcquire 函数判断是否应该挂起线程,注意只是判断并不会执行挂起线程的操作,挂起线程的 操作由后面的parkAndCheckInterrupt函数执行 **/ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 当出现异常的时候会执行cancelAcquire方法,来取消当前结点并从等待队列中清除出去 if (failed) cancelAcquire(node); } } 复制代码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取下前驱结点的waitStatus,这个决定着是否要对后续结点进行挂起操作 int ws = pred.waitStatus; /** 如果ws的waitStatus=-1时,证明他的后继结点已经被park阻塞了后面到了竞争的时候会unpark唤醒后继结 点,所以如果结点的前驱结点waitStatus是-1,shouldParkAfterFailedAcquire就会判断需要park当前线程 所以返回true。 **/ if (ws == Node.SIGNAL) return true; //ws>0证明前驱结点已经被取消所以需要往前找到没有被取消的结点ws>0 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /** 前驱结点生成时,ws=0,所以如果是第一次执行shouldParkAfterFailedAcquire函数就会发现前驱结点 的ws = 0就会因为需要阻塞后面的结点设置为-1。 **/ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } /** 注意把前驱结点ws设置为-1之后会虽然返回false,不挂起当前线程,注意只是这一次循环不挂起,因为 acquireQueued函数是一个死循环,所以到下一个循环如果前驱结点不是head并且tryAcquire竞争锁失败还 是会执行shouldParkAfterFailedAcquire方法,这个时候前驱结点已经为-1,所以就会直接返回true **/ return false; } 复制代码
private final boolean parkAndCheckInterrupt() { //park挂起线程 LockSupport.park(this); /** 线程被unpark唤醒的时候会检查终端状态并返回,这个终端状态会在acquireQueued方法中最后返回, 所以acquireQueued函数并不响应中断而是返回中断状态由外层函数处理。 **/ return Thread.interrupted(); } 复制代码
private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; //循环往前找到没有被取消的结点,直到找到正常状态的结点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; //因为要取消当前结点所以修改当前结点得ws为CANCELLED node.waitStatus = Node.CANCELLED; //如果node为尾结点就修改尾结点并将尾结点得next设为null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { //如果不是尾结点 /** 满足下面三个条件,将pred的next指向node的下一节点 1.pred不是head节点:如果pred为头节点, 而node又被cancel,则node.next为等待队列中的第 一个节点,需要unpark唤醒 2.pred节点状态为SIGNAL或能更新为SIGNAL 3.pred的thread变量不能为null **/ int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; // if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //如果pred为头节点,则唤醒node的后节点,注意unparkSuccessor方法为唤醒当前结点得下一个结点 unparkSuccessor(node); } node.next = node; // help GC } } 复制代码
cancleAcquire函数主要是取消当前结点,将当前结点从等待队列中移出
同时遍历前面的结点将被取消的结点从队列中清除出去
unparkSuccessor 方法
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //如果下一个结点为null或者ws为取消状态就未开始遍历找到正常状态的结点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 通过LockSupport.unpark()方法唤醒阻塞的线程,注意被阻塞的线程从哪开始继续执行 LockSupport.unpark(s.thread); } 复制代码
public final boolean release(int arg) { //首先是执行tryRelease()方法,主要如何去释放获取到的锁,这个方法需要子类自己去实现 if (tryRelease(arg)) { //释放成功以后如果发现等待队列还有在等待获取锁的Node就用unparkSuccessor唤醒头结点的下一个结点 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } //tryRelease方法调用失败会返回false return false; } 复制代码
//注意方法会忽略中断,没有selfInterrupt这个方法来响应中断 public final void acquireShared(int arg) { /** 这个tryAcquireShared方法对应独占模式的tryAcquire方法,也是需要子类自己去实现的。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。<0就表示获取失败就进doAcquireShared方法来开始进入等待队列等待获取资源 **/ if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 复制代码
private void doAcquireShared(int arg) { // 构造一个共享模式得结点加入到等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //如果当前结点是除了head得队列中的第一个结点那么就尝试获取资源 int r = tryAcquireShared(arg); //tryAcquireShared返回值>=0资源获取成功,就开始进行更新结点操作 if (r >= 0) { //这里注意下独占模式调用的是setHead方法,但是共享模式调用的是setHeadAndPropagate方法 setHeadAndPropagate(node, r); p.next = null; // help GC //这里注意下,这里响应中断,跟独占模式不同 if (interrupted) selfInterrupt(); failed = false; return; } } // 获取资源失败就开始继续判断是否需要挂起线程,与独占模式得相同 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 出现异常就取消结点 if (failed) cancelAcquire(node); } } 复制代码
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //首先更新head结点 setHead(node); /** 注意propagate表示的上次执行tryAcquireShared 方法后的返回值。>0表示还有剩余资源,既然有剩余资 源就继续唤醒后面等待获取资源的并且是共享模式得 Node 或者h == null 或者当前获取到资源的得结点<0,signal需要唤醒后续结点 **/ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //唤醒后续共享模式得Node结点 doReleaseShared(); } } 复制代码
setHeadAndPropagate方法主要干了两件事
看下doReleaseShared方法
private void doReleaseShared() { for (;;) { Node h = head; //如果头结点不为空,并且不是tail,队列还有结点 if (h != null && h != tail) { int ws = h.waitStatus; //如果head结点ws为signal就更新为0并唤醒后续结点 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } 复制代码
从上面的分析可以知道,独占模式和共享模式的最大区别在于独占模式只允许一个线程持有资源,而共享模式下,当调用doAcquireShared时,会看后续的节点是否是共享模式,如果是,会通过unpark唤醒后续节点; 从前面的分析可以知道,被唤醒的节点是被堵塞在doAcquireShared的parkAndCheckInterrupt方法,因此唤醒之后,会再次调用setHeadAndPropagate,从而将等待共享锁的线程都唤醒,也就是说会将唤醒传播下去;
加入同步队列并阻塞的节点,它的前驱节点只会是SIGNAL,表示前驱节点释放锁时,后继节点会被唤醒。shouldParkAfterFailedAcquire()方法保证了这点,如果前驱节点不是SIGNAL,它会把它修改成SIGNAL。 造成前驱节点是PROPAGATE的情况是前驱节点获得锁时,会唤醒一次后继节点,但这时候后继节点还没有加入到同步队列,所以暂时把节点状态设置为PROPAGATE,当后继节点加入同步队列后,会把PROPAGATE设置为SIGNAL,这样前驱节点释放锁时会再次doReleaseShared,这时候它的状态已经是SIGNAL了,就可以唤醒后续节点了。(补充下,想一下如果不考虑,没有后继结点的时候直接讲ws置为signal,那么每次doReleaseShared执行的之后就直接unparkSuccessor唤醒后继结点那么就没意义,因为没有后继结点。所以在没有后继节点的时候ws = 0,那么就先ws置为PROPAGATE,反正后继结点加入的时候shouldParkAfterFailedAcquire会将前面的结点的ws置为signal)
举例说明:例如读写锁,写读操作和写写操作互斥,读读之间不互斥;当调用acquireShared获取读锁时,会检查后续节点是否是获取读锁,如果是,则同样释放;