Java并发包(JUC:java.util.concurrent)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,ReentrantLock、Semaphore,它们的实现都用到了一个共同的基类--AbstractQueuedSynchronizer,简称AQS。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
AQS负责管理同步器类中的状态,他管理了一个整数状态信息,可以通过getState、setState、compareAndSetState等类型方法来进行操作。ReentrantLock表示所有线程已经重复获取所的次数,Semaphore表示剩余许可数量。
ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)
ReentrantLock锁在同一个时间点只能被一个线程锁持有。
可重入:ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO(先进先出)的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会尝试获取锁而不需要马上排队( 获取不到锁再排队 )。
通常我们会将ReentrantLock和synchronized做比较,两者孰优孰劣,其实在java官方对synchronized进行了诸多优化之后( 偏向锁、轻量锁、自适应自旋、锁消除、锁粗化... ),两者的性能差距并不大,只是在某些方面存在差异而已,在此用一张表格做一个简单的对比:
// **************************Synchronized的使用方式************************** // 1.用于代码块 synchronized (this) {} // 2.用于对象 synchronized (object) {} // 3.用于方法 public synchronized void test () {} // 4.可重入 for (int i = 0; i < 100; i++) { synchronized (this) {} } // **************************ReentrantLock的使用方式************************** public void test () throw Exception { // 1.初始化选择公平锁、非公平锁 ReentrantLock lock = new ReentrantLock(true); // 2.可用于代码块 lock.lock(); try { try { // 3.支持多种加锁方式,比较灵活; 具有可重入特性 if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ } } finally { // 4.手动释放锁 lock.unlock() } } finally { lock.unlock(); } }
上面我们对ReentrantLock有了一个大概的了解,接下来可以来看看ReentrantLock和AQS之间到底是什么关系呢,其实ReentrantLock的底层就是使用的AQS实现的,我们一起来看看源码:
/**ReentrantLock 实现了Lock类 其中有一个字段,Sync **/ public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; private final Sync sync; /** * Sync就是继承了AQS的抽象类,此锁的同步控制基础。 * 下面将Sync细分为公平锁和非公平锁。 * 使用AQS的state来表示锁的重入次数。 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 抽象的lock方法,分别给公平锁和非公平锁具体实现。 */ abstract void lock(); ... } /** * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; ... } /** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } ... } /** * ReentrantLock的默认构造方法,默认创建非公平锁 */ public ReentrantLock() { sync = new NonfairSync(); } /** * 通过传入boolean参数来决定创建公平锁还是非公平锁。 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock中的公平锁和非公平锁实现方式差别不大,差别在于公平锁判断是否直接进入队列,先看看非公平锁的加锁流程源码:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /**加锁方法**/ final void lock() { //若通过CAS设置变量State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //若通过CAS设置变量State(同步状态)失败,也就是获取锁失败,则进入Acquire方法进行后续处理。 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
再看下公平锁源码中获锁的方式:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } ... }
通过查看公平锁和非公平锁获取锁的源码得知,都会通过lock()方法区上锁,但是lock()方法到底是怎么上锁的呢?带着这样的疑问,我们继续跟踪一下会发现,这个方法属于FairSync和NonfairSync的父类 AQS (AbstractQueuedSynchronizer)的核心方法。
在了解AQS之前,我们带着几个疑问:
下面我们会对AQS以及ReentrantLock和AQS的关联做详细介绍。
了解AQS之前,先看看AQS的整体框架:
AQS维护一个用 Volatile的int类型state的共享资源 ,通过内置的FIFO来完成获取资源线程的排队工作。( 这个内置的同步队列称为"CLH"队列 )。该队列由一个一个的Node结点组成,每个Node结点维护一个prev引用和next引用,分别指向自己的前驱和后继结点。AQS维护两个指针,分别指向队列头部head和尾部tail。
其实就是个变体 双端双向链表 。
当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个结点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。
static final class Node { /** waitStatus值,表示线程已被取消(等待超时或者被中断)*/ static final int CANCELLED = 1; /** waitStatus值,表示后继线程需要被唤醒(unpaking)*/ static final int SIGNAL = -1; /**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */ static final int CONDITION = -2; /** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去 static final int PROPAGATE = -3; /** 等待状态,初始为0 */ volatile int waitStatus; /**当前结点的前一个结点 */ volatile Node prev; /** 当前结点的后一个结点 */ volatile Node next; /** 与当前结点关联的排队中的线程 */ volatile Thread thread; /** ...... */ }
/** * The synchronization state. */ private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
通过AQS中state的源码可以看出,关于state的方法都是final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的 独占模式和共享模式(加锁过程) 。
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
final boolean nonfairTryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取state的值 int c = getState(); //如果state为0,说明当前没有其他线程占用共享资源,可以尝试获取锁 if (c == 0) { //用CAS修改state的值 if (compareAndSetState(0, acquires)) { //修改成功,获取锁成功,处理当前线程 setExclusiveOwnerThread(current); //返回获取锁成功 return true; } } //如果state不为0,判断占用锁的线程是否是当前线程 else if (current == getExclusiveOwnerThread()) { //如果是当前线程,对state做增量递增 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置state setState(nextc); //返回获取锁成功(可重入的原理) return true; } return false; }
protected final boolean tryAcquire(int acquires) { //获取当前线程; final Thread current = Thread.currentThread(); //获取state的值; int c = getState(); //如果state为0,说明当前没有其他线程占用共享资源,可以尝试获取锁 if (c == 0) { //判断当前等待队列中是否存在有效节点的方法, //如果返回False,说明当前线程可以争取共享资源; //如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。 //用CAS修改state的值;修改成功,获取锁成功,处理当前线程; if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果state不为0,判断占用锁的线程是否是当前线程; else if (current == getExclusiveOwnerThread()) { //如果是当前线程,对state做增量递增; int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); //返回获取锁成功(可重入的原理)。 setState(nextc); return true; } return false; }
/**判断当前等待队列中是否存在有效节点的方法**/ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized 其正确性取决于是否初始化头结点 // before tail and on head.next being accurate if the current 如果当前节点在头结点和尾结点之间 // thread is first in queue. 线程在队列中第一个 // 获取当前尾结点 Node t = tail; // Read fields in reverse initialization order // 获取当前尾结点 Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。 真正的第一个有数据的节点,是在第二个节点开始的。 当h != t时: 1. 如果(s = h.next)==null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail, 此时队列中有元素,需要返回True(这块具体见下边addWaiter()已经enq()代码分析)。 2. 如果(s = h.next) != null,说明此时队列中至少有一个有效节点, 3. 如果此时s.thread ==Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同, 那么当前线程是可以获取资源的; 4. 如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。
当尝试获取锁成功,则直接结束并返回;
当尝试获取锁失败,则进入下一步:线程加入等待队列 addWaiter()
private Node addWaiter(Node mode) { //用当前线程和传入的锁模式新建一个Node Node node = new Node(Thread.currentThread(), mode); //先获取到当前的尾结点,将尾结点指向到Node pred Node pred = tail; //如果获取到的尾结点不为空 if (pred != null) { //将新建节点的前置节点设置为上面获取到的tail(tail节点的后直接点此时为null) node.prev = pred; //利用CAS修改尾结点的值(将新建的node节点设置为tail尾结点) if (compareAndSetTail(pred, node)) { //如果设置成功了,将之前获取到的pred(修改之前的尾结点)的后置节点修改为当前新建的node节点 pred.next = node; //返回修改成功的新的节点(此时此新建的这个node节点为新的tail节点) return node; } } //如果获取到的tail节点为null,(说明等待队列中没有元素), //或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改), //enq()中处理了以上两种情况 enq(node); return node; }
如果没有被初始化,需要进行初始化一个头结点出来。但请注意, 初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点 。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
private Node enq(final Node node) { for (;;) { //获取当前的尾结点指向到t Node t = tail; //如果此时尾结点为空 if (t == null) { // Must initialize 必须要初始化一个头结点 //利用CAS初始化一个头部(调用无参构造,头肩点内部都为null) if (compareAndSetHead(new Node())) //将尾结点设置为头节点(此时就一个节点头结点和尾结点是一个) tail = head; } else { //如果尾结点不为null,将新建的节点node的前置节点设置为t node.prev = t; //通过CAS修改尾结点,将尾结点指向为新的node, //这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的, //那么设置Tail的值为Update的值。 if (compareAndSetTail(t, node)) { //修改成功,将之前的尾结点(现在的倒数第二个节点)的后置节点设置为新的尾结点(node) //返回倒数第二个节点 t.next = node; return t; } } } }
通过上面的addWaiter()方法我们可以知道,通过将一个新建的Node节点放入到队列里等候获取锁的资格,那么进入队列之后,怎么才能获取到锁呢?接下来我们来看看线程怎么获取锁。
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
final boolean acquireQueued(final Node node, int arg) { // 标记是否成功拿到资源 boolean failed = true; try { //标记是否被中断过 boolean interrupted = false; //开始自旋 for (;;) { //获取当前节点的前置节点 final Node p = node.predecessor(); //如果前置节点是头节点,说明当前节点是第一个有效节点(头节点是虚节点),开始尝试获取锁 if (p == head && tryAcquire(arg)) { //获取到锁,头指针移动到当前node //setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 说明是p不为头结点 或者 p为头节点且当前没有获取到锁(可能是非公平锁被抢占了), //这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
/**判断当前node是否要被阻塞**/ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前置节点的阻塞状态 int ws = pred.waitStatus; //如果 Node.SIGNAL -1 状态,说明前置节点的状态为唤醒状态 if (ws == Node.SIGNAL) //可以直接park(直接阻塞) return true; //waitStatus>0是取消状态 如果前置节点是取消状态 if (ws > 0) { // 循环向前查找取消节点,把取消节点从队列中剔除 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 设置前置节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private void cancelAcquire(Node node) { // 将无效节点过滤 if (node == null) return; // 设置该节点不关联任何线程,也就是虚节点 node.thread = null; Node pred = node.prev; // 通过前驱节点,跳过取消状态的node while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取过滤后的前驱节点的后继节点 Node predNext = pred.next; // 把当前node的状态设置为CANCELLED node.waitStatus = Node.CANCELLED; // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点 // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功 // 如果1和2中有一个为true,再判断当前节点的线程是否为null // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }
以上从ReentrantLock的获取锁,上锁来对AQS的获取锁过程做了介绍,后续会继续将AQS释放锁的原理做介绍,未完待续...