写这篇文章之前,还是先安利一本书:《java并发编程的艺术》。这本书对锁的实现的很多细节都解释的还是很清楚的,加上自己配合源码进行理解,读懂ReentrantLock这个类的实现应该不是那么困难。本文只对 独占模式 进行分析。
直接步入正题,先贴一段代码看看如何使用ReentrantLock:
public class ReentrantLockTest { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(true); //1 lock.lock(); //2 try { //do something } finally { lock.unlock(); //3 } } } 复制代码
上面代码的步骤1是调用ReentrantLock构造方法进行初始化,这里ReentrantLock给我们提供了两种锁的实现,一个是公平锁,一个是非公平锁。这两种锁顾名思义,一个排队干活,一个抢着干~~
//默认构造函数,得到的是非公平锁的实现 public ReentrantLock() { sync = new NonfairSync(); } //传入true得到公平锁的实现,传入false则得到公平锁的实现 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 复制代码
ReentrantLock锁的使用的入口在lock方法,下面咱们针对 公平锁 lock方法的实现进行分析一波(能看懂这个相信对非公平锁的lock的实现的理解也就不会有什么难度了)。
这里我把所有的方法都放在一起,方便大家阅读:
//这里在并发情况下会有竞争 final void lock() { acquire(1); } //来至于父类AQS中 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //公平锁自身提供的实现方法,来保证锁的获取是按照FIFO原则.也就是队列模型,先入先出。 protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //拿到锁标记的状态值,为0则代表这把锁没人占用 int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //将干活的人的身份标记一下 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //这里是重入锁的关键代码,只要是获取锁的线程再次去拿这把锁,则可以直接获取成功, //并将state的值+1后重新设置,供后面释放锁的时候进行多次释放使用。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); //这里有个优雅的小细节:咱们发现设置状态时并没有使用compareAndSetState这种方法, //而是直接设置。那是因为在这种条件下不会有竞争,只可能是获取锁的线程才能去改变这个值。 setState(nextc); return true; } return false; } //用来判断是否在它之前已经有人排在队列当中了,如果有,则返回true public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; //这里返回时的判断条件可能有点难理解。假设当前是A线程。 //1.第一种情况发生在有一个B线程进度比A快,已经准备开始排队了。可以看下面addWaiter方法 //的调用,在进行compareAndSetTail交换后,有可能还没来得及将pred.next指向这个新节点node, //这个时候说明已经有人在A线程前面去排队拿锁了。 //2.第二种情况简单明了。A线程不是排在队列的第一个的,也证明了有人排在他前面了。 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } //用来添加新的节点到队列的尾部。 private Node addWaiter(Node mode) { //根据传进来的参数mode=Node.EXCLUSIVE,表示将要构造一个独占锁。 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //tail为空的情况下直接调用enq方法去进行head和tail的初始化。 if (pred != null) { //tail不为空的情况下,将新构造节点的前驱设置为原尾部节点。 node.prev = pred; //使用CAS进行交换,如果成功,则将原尾部节点的后继节点设置为新节点,做双向列表关联; //(这里要注意一点,交换成功的同时有其他线程读取该列表,有可能读取不到新节点。例如A线程 //执行完下方步骤1后,还未执行步骤2,遍历的时候将会获取不到新节点,这也是 //hasQueuedPredecessors方法中的第一种情况) //如果不成功,则代表有竞争,有其他线程修改了尾部,则去调用下方enq方法 if (compareAndSetTail(pred, node)) { //1 pred.next = node; //2 return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //初始化head和tail,初始化完成后,会继续执行外面的死循环,进行compareAndSetTail将 //新节点设置到尾部,和上述执行流程一样,这里就不详述了。 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //再进行一次尝试和进入堵塞 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取当前node的前驱 final Node p = node.predecessor(); //如果前驱是head的话就再进行一次尝试,这种设计会节约很多的资源。 //这里尝试成功后该线程就不会有后续的park和unpark之说了。 if (p == head && tryAcquire(arg)) { //如果获取成功就将head设置成当前node,并将存储的thread和prev都清空 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //来判断进行尝试获取失败后是否进行park private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //node的waitStatus初始化都是0 int ws = pred.waitStatus; if (ws == Node.SIGNAL) //第一次进来肯定不是-1状态的,需要compareAndSetWaitStatus方法进行设置后才会是-1 return true; if (ws > 0) { //这里的作用是用来剔除被cancel后的节点,只要是cancel后的节点waitStatus 都会被标记成1。 //用该状态来过滤掉这些节点。 //由于节点的唤醒是由它的prev节点来进行唤醒的,我们必须要保证它的prev是处于活着的状态 //所以这里一直遍历往上找,总会找到一个正常的prev来帮助其unpark。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //设置prev为-1状态,(该状态下能够唤醒它的下一个去干活)。 //这里结束后会跳到acquireQueued的死循环再次循环一次。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //要执行这个方法的前提是shouldParkAfterFailedAcquire这个方法必须返回true private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } //阻塞线程的方法 public static void park(Object blocker) { Thread t = Thread.currentThread(); // 设置Blocker,设置为当前lock。 setBlocker(t, blocker); // 等待获取许可,这里会进行堵塞,直到有人帮忙调用该线程的unpark方法才会获取到许可, //并继续走下面的流程。 UNSAFE.park(false, 0L); // 设置Blocker,将该线程的parkBlocker字段设置为null,这个是在线程被唤醒后执行的。 setBlocker(t, null); } 复制代码
//调用该方法进行解锁 public void unlock() { sync.release(1); } //改变state的值并唤醒队列中的下一个线程来干活 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //这里会判断头部是不是null,并看其waitStatus 状态是否有唤醒它的后继节点的资格。 //这里的头部其实也就是当前线程所代表的节点。 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //尝试着释放锁 protected final boolean tryRelease(int releases) { //将锁标记state的值-1 int c = getState() - releases; //如果干活的人和自己的身份不一致,则抛异常出去 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //这里判断状态-1后是不是等于0。 //如果不是,则代表重入了很多次,锁暂时不释放。 //如果是,则将free置为true,释放锁,将身份标记置为null。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //去唤醒后继节点中的thread来干活 private void unparkSuccessor(Node node) { int ws = node.waitStatus; //如果head中的waitStatus<0,则置为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //这里会检查head的下一个节点是不是null以及是否是cancel状态 Node s = node.next; if (s == null || s.waitStatus > 0) { //如果next是cancel状态,则将s置为空,并重队列尾部进行往前遍历,直到找到最后 //一个waitStatus <=0的node来做为next节点去唤醒 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //去唤醒s指向的next节点,调用这里可以让UNSAFE.park(false, 0L);处的线程获取到许可。 //到这里解锁的功能就执行完毕了~ LockSupport.unpark(s.thread); } 复制代码