AbstractQueuedSynchronizer
抽象同步队列简称 AQS
,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。AQS是一个 FIFO的双向队列
,其内部通过节点 head
和 tail
记录队首和队尾元素,队列元素类型为 Node
。AQS采用 模板方法模式
,父类抽象出通用模板, 将方法延迟到子类加载
。
当一个线程调用 acquire(int arg)
获取独占资源时,会首先使用 tryAcquire
方法尝试获取资源,具体是设置 state
值,成功则直接返回,失败则将当前线程封装为类型为 Note.EXCLUSIVE的Node
节点后插入到AQS阻塞队列的尾部,并调用 LockSupport.park(this)
方法挂起自己。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
当一个线程 release(int arg)
方法时会尝试使用 tryRelease
操作释放资源,这里是设置状态变量 state
的值,然后调用 LockSupport.unpark(thread)
方法激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用 tryAcquire
尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续执行,否则还是会被放入AQS队列并被挂起。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
当线程调用 acquireShared(int arg)
获取共享资源时,会首先使用 tryAcquireShared
尝试获取资源,具体是设置状态变量 state
的值,成功则直接返回,失败则将当前线程封装为类型为 Note.SHARED
的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 复制代码
当一个线程调用 releaseShared(int arg)
时会尝试使用 tryReleaseShared
操作释放资源,这里是设置状态变量 state
的值,然后使用 LockSupport.unpark(thread)
激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryReleaseShare查看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下执行,否则还是会被放入AQS队列并被挂起。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 复制代码
入队时会先判断,是否 需要初始化
,如果队尾指针指向null,则进行初始化,创建哨兵节点,首尾都指向哨兵节点。如果已经存在了,则将node的前置节点指向t,然后tail节点指向新node元素,前尾结点t的后置节点指向node,最后返回。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
notify
和 wait
是配合 synchronized
内置锁实现线程间同步的基础设施一样,条件变量的 signal
和 await
方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。如我们经常用到的 LinkedBlockingQueue
,采用了 ReentrantLock
和条件队列,当进行写入的时候如果队列没有满,则唤醒在 notFull条件队列
中等待的线程继续写入;当进行取出的时候如果队列中任有数据可以消费,则唤醒 notEmpty条件队列
中等待的线程继续消费。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); 复制代码
当线程调用条件变量 await()
方法时(必须先调用 lock()
方法获取锁),在内部会构造一个类型为 Node.CONDITION
的 node
节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 复制代码
AQS
是并发包下锁的最底层实现,典型的 ReentrantLock
、 ReentrantReadWriteLock
以及 CountDownLatch
、 CyclicBarrier
等,都是基于AQS实现的, AQS
核心主要包括 state
、同步队列以及 模板方法模式
对操作队列的算法进行了封装,留下了独占/共享锁的获取/释放由子类去实现,基本思路是子类通过state来完成获取/释放锁的操作。以前觉得并发、锁这些东西很深奥,不可理解,AQS离我也很遥远,但是有幸学习了AQS之后再反观锁的源码,醍醐灌顶,因为写blog文笔较烂,偏向于源码,所以有什么不清楚的可以评论一起讨论学习。关于并发包下的 Lock和synchronized的区别
后面专门梳理一篇blog进行分析和讲解。