如下图所示是队列同步器的基本结构,通过一个int类型的state对象来表示同步状态,对锁的竞争就是通过修改这个同步状态和对这个状态的值来进行判断实现,使用一个FIFO双向队列来管理竞争同步状态的线程,把这些线程封装成一个队列节点加入到队列中。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }复制代码
以上是这个方法的定义,直接抛出一个异常,这个方法是需要自己去实现的,看一下ReentrantLock中的公平锁是怎么实现它的。 这个方法的实现要先获取同步状态,进行判断是否是预期的状态,然后使用CAS设置同步状态。
protected final boolean tryAcquire(int acquires) { //1. 获取当前线程 final Thread current = Thread.currentThread(); //2. 获取当前的同步状态,这个状态是通过volatile修饰的。 int c = getState(); if (c == 0) { //3. 判断状态是否是0,为0表示没有线程获取到同步状态(锁),CAS设置同步状态 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //4.已经有线程获取了同步状态,判断是不是当前线程 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }复制代码
这个方法用于独占式获取同步状态,如果当前线程获取同步状态成功,那么返回,否则就进入同步队列进行等待,这个方法会调用重写的tryAcquire(int)。
public final void acquire(int arg) { /*1. 先尝试获取锁,如果成功了直接返回*/ if (!tryAcquire(arg) && /*2.如果失败了,那么把当前的节点加入同步队列*/ acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * 这个方法将当前线程构建成一个队列的节点,然后先试用CAS操作快速的插入到队列中, * 如果成功了就返回,如果不成功,那么久执行enq方法试用死循环的方式CAS设置节点,直到设置成功为止 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * 一个死循环的方式进行节点的加入操作,如果队列为空的,那么会先初始化队列,然后CAS操作将当前节点加入到队列尾部 * 这里的这种方式保证了插入到队列的尾部是串行的,通过所有的线程都自旋然后CAS加入来保证。 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * 这个方法主要作用就是当前的这个节点表示的线程进行自旋,然后判断是否能够获取到同步状态 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //1. 获取当前的节点的前驱节点,然后判断前驱节点是否是head节点,如果是head节点,那么尝试去获取同步状态 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //2. 获取同步状态成功,当前节点设置为队列的头结点,所以队列的头节点是获取到同步状态的节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //3.获取锁失败,判断是否需要阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }复制代码
以上的源码就是独占式获取同步状态,总结下来如下:
流程图如下:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }复制代码
上面一段代码是独占式释放锁的代码,非常简单,先尝试释放锁,如果释放成功那么判断队列是否是空和等待状态是否不为0,然后唤醒head的后继节点。
共享式获取锁的时候如果一个线程获取了同步状态,那么其他共享式获取锁的线程也能够获取到同步状态,但是独占式获取将会被拒绝。
public final void acquireShared(int arg) { //1. 尝试共享式获取锁,如果获取到就返回 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected int tryAcquireShared(int arg) { //直接抛出了异常,需要同步组件自己去实现 throw new UnsupportedOperationException(); }复制代码
以CountDownLatch为例分析tryAcquireShared的实现,CountDownLatch
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //1. 初始化的时候指定有多少个线程可以同时获取到同步状态 Sync(int count) { setState(count); } int getCount() { return getState(); } /** * 2. 判断如果同步状态的值为0的时候,表示获取成功,如果不为0,表示已经有线程获取到同步状态,获取失败 **/ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /** * 尝试释放同步状态,如果同步状态为0表示已经都释放了,返回false,如果不为0那么CAS释放同步状态,对于CountDouwnLatch而言, * 只有当所有线程都执行完成之后才能继续,所以只有当前state=0的时候,才算释放成功。 **/ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } /*获取锁,如果获取不到那么等待,直到获取到为止,当state=0的时候获取到锁*/ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { //1. 获取失败的时候创建节点加入队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //2.获取前驱节点 final Node p = node.predecessor(); if (p == head) { //3. 如果是头节点的后继节点,那么尝试获取共享同步状态,如果返回值>0,那么获取成功, int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }复制代码
共享式获取锁和独占式获取锁的区别其实就在于尝试共享式获取锁的时候返回值是否是>1的,关键就在于子类如何去实现了,其他实现和独占式获取锁差不多,都是节点自旋获取锁。
共享式释放锁
/*先尝试释放锁,如果成功那么唤醒后继节点*/ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }复制代码
总结如下:
共享式获取锁的关键要看子类同步状态获取成功失败的定义,但是要遵循一个基本原则就是只要同步状态是>1的,那么就表示获取状态是成功,通过这种方式来支持多个线程同时获取到锁。
参考资料:
《java并发编程的艺术》