早些时候(jdk 1.5之前),并发环境下做同步控制,你的选择不多,多半是使用 synchronized
关键字。不管是同步方法还是同步块,总之遇到这个关键字,未获取锁线程就会乖乖等候,直到已获取锁的线程释放掉锁。
而jdk 1.5推出 ReenntrantLock
之后,此工具一度很风靡,当时人们更喜欢用Lock而不是synchronized,主要是因为它用起来灵活吧。(本人到现在为止,用synchronized的场景还是Lock的时候多)直到后来,越来越多的文章,从性能、是否公平、实现原理各个方面对二者比较,大家才对他们有了更直观的认识。
本文旨在分析ReenntrantLock的主要实现逻辑,并初步窥探AQS结构。如果不犯懒的话,希望后续能将AQS做成系列,真正理解Doug Lea大神的这个经典实现。
研究工具的原理之前,要先会使用工具。
public class ReentrantLockTest { Lock lock = new ReentrantLock(); //创建锁 public void doSomething(){ //### 1-尝试获取锁,成功 if(lock.tryLock()){ System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName())); try { //模拟逻辑执行 TimeUnit.MILLISECONDS.sleep(1100L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName())); lock.unlock(); //### 1.1-逻辑执行完,释放锁 } //### 2-尝试获取锁,失败 else { System.out.println(String.format("%s线程,获取锁失败",Thread.currentThread().getName())); } } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 3; while (total>0){ Thread t = new Thread(()->{ test.doSomething(); },"T-"+total); t.start(); total--; TimeUnit.MILLISECONDS.sleep(1000L); } } }
tryLock()
方法会 尝试获取锁,如果获取不到,直接 return false
(不会阻断);如果获取到锁, return true
。
上面的例子,执行结果为:
T-3线程,获取到锁了 T-2线程,获取锁失败 T-3线程,业务执行完毕 T-1线程,获取到锁了 T-1线程,业务执行完毕
修改下上例中的加锁方式:
Lock lock = new ReentrantLock(); public void doSomething2(){ lock.lock(); System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName())); try { TimeUnit.MILLISECONDS.sleep(1000L); //模拟业务逻辑 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName())); lock.unlock(); } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 3; while (total>0){ Thread t = new Thread(()->{ test.doSomething2(); },"T-"+total); t.start(); total--; } }
与tryLock()不通, lock()
方式尝试获取锁,如果获取不到会持续等待 。
执行结果会变为:
T-3线程,获取到锁了 T-3线程,业务执行完毕 T-2线程,获取到锁了 T-2线程,业务执行完毕 T-1线程,获取到锁了 T-1线程,业务执行完毕
ReenntrantLock 加 / 解锁的使用方式就这些,而它是靠编码实现的。下图给出了ReenntrantLock类部分结构:
ReenntrantLock 默认 实现的是 非公平锁 (本文也只分析非公平实现)。
final Sync sync; public ReentrantLock() { sync = new NonfairSync(); //成员变量sync,赋值成NonfairSync的对象 }
先从实现较简单的 tryLock()
研究:
## ReentrantLock类 public boolean tryLock() { return sync.nonfairTryAcquire(1); } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync类 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 1- 获取AQS类中的state状态值 if (c == 0) { // 2- 如果state是0(默认值),将state原子形修改成1 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); // 2.1- 原子修改成功,标记AOS中的exclusiveOwnerThread为当前线程 return true; } } // 3- 此时state不是1,当前线程 == AOS中的exclusiveOwnerThread,将state修改为1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
tryLock()方法, 核心逻辑就是原子修改AQS中的 state
值 , volatile
+ CAS
(jdk9 VarHandle实现)。
具体一些:
实现过程中,只在 首次修改 state
值,即将其从0改成1的时候,采用了原子的 CAS
方式 。
之后 只判断当前线程和 owner线程
(AOS中的exclusiveOwnerThread) 是否一致 , 如果一致state++;不一致,直接 return false
。
unLock()实现同样简单
## ReentrantLock类 public void unlock() { sync.release(1); } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync类 public final boolean release(int arg) { ... tryRelease(arg) //尝试释放 ... } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync类 protected final boolean tryRelease(int releases) { int c = getState() - releases; // state-- // 1-验证线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //2-如果state==0时,将结果赋值为true,清空owner线程 free = true; setExclusiveOwnerThread(null); } setState(c); //state赋值 return free; }
如果 操作线程是 owner
线程 (首次tryLock()时会记录owner):
tryLock()
每次调用, state++
; unLock()
每次调用, state--
(state=0时,清空owner线程)。
Tip: 注释1处,如果当前线程非owner线程,会直接抛出异常!
对于 tryLock() 而言,它在实现上,完全没用到AQS的精华。既然叫Abstract Queued Synchronizer——抽象队列同步器, 队列 、 同步 什么的才是重点。别急, lock()
方法会用到这些。
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //中断interrupt }
对于默认的非公平锁实现, acquire(int arg)
完全可 替换
成如下写法:
public final void acquire(int arg) { ##### tryAcquire(arg) 改成了 tryLock(arg) if (!tryLock(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //线程 interrupt }
如此替换后,逻辑就很好理解了:在用 tryLock()
获取锁失败的情况下,会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
而 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 显然也分成了两个方法 addWaiter
和 acquireQueued
addWaiter(Node.EXCLUSIVE)
部分 : private Node addWaiter(Node mode) { Node node = new Node(mode); //创建node,创建的同时绑定线程 for (;;) { Node oldTail = tail; if (oldTail != null) { //循环2-将node节点和首次循环中初始化的队列关联 node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); //循环1-初始化同步队列 } } }
这里需关注 AQS.Node 类 的一些 关键属性 (已文字标明各属性用途):
## 表示Node节点的状态,有CANCELLED(待取消)、SIGNAL(待唤醒)、CONDITION或默认的0几个状态 volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; volatile Node prev; //prev指向前节点 volatile Node next; //next指向后节点 ## 节点绑定线程 volatile Thread thread;
通过下图,可更清楚的看出 addWaiter方法 的执行过程(此时 线程T-3
在执行中):
addWaiter`会创建队列,并返回尾节点,即图中的`Node2`
acquireQueued(final Node node, int arg)
方法 : final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); //获取pre节点,就是Node1 if (p == head && tryAcquire(arg)) { //### 注释1-再次尝试获取锁 setHead(node); //获取到锁了,去掉Node1,Node2变成新的head节点 p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } ... } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //2次循环,将waitStatus==Node.SIGNAL,renturn true return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { pred.compareAndSetWaitStatus(ws, Node.SIGNAL); //首次循环,将pre节点Node1的waitStatus修改成SIGNAL } return false; }
这里依照上图,详细解释下:
acquireQueued
方法的 入参 前面提到了,就是 addWaiter方法 新增的尾节点 ,即入参 node
= Node2,那么node.predecessor()自然是Node1了—— p
= Node1。
注释1位置,先判断 p
是不是 头结点 :
如果 p
是头节点 (上图中,p就是头结点), tryAcquire(arg)
会再次尝试获取锁。此时也有两种情况:
线程T-3
已经 执行完 并释放了锁,那么当前 线程T-2
可以获取到锁;之后去掉当前头结点Node1,将Node2设置成头结点。 线程T-3
未执行完 ,那么当前 线程T-2
无法获取锁,之后会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法 p
不是头结点 ,同样会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法 而由于 shouldParkAfterFailedAcquire(Node pred, Node node) 方法在循环中,可能会执行两次:
waitStatus
修改成 SIGNAL
( 注意,由于循环的原故,还会再次执行到 注释1
处,也就会再次尝试获取锁 ——上次线程T-3未结束,这次就有可能结束了); waitStatus
已经是 SIGNAL
,直接 return true
。后面的parkAndCheckInterrupt()方法会将当前 线程T-2
阻塞 。 给出 线程T-2
未获取锁情况下的队列情况:
列出 线程T-1
也参与其中的完整队列图。可看到 尾节点之前的节点,绑定的线程都是 阻塞
状态 (park) ,而 waitStatus
都是 待唤醒
状态 (waitStatus = SIGNAL = -1):
总结以上内容,作为 结论2:
acquireQueued`方法,如果当前线程是第1个获取锁失败的线程(例子中“线程T-3”正在执行,“线程T-2”就是第一个获取锁失败的线程),会再尝试2次获取锁; 获取锁失败 或 当前线程非第1个获取锁失败的线程(例子中T-1就不是第一个获取锁失败的线程),将前置节点状态修改成待唤醒,并阻塞关联线程。
为了便于理解,画出整个 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法的逻辑图:
阻塞并非终点,还要再次看下 unlock()
时做了什么。
## ReentrantLock类 public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { //尝试释放,前面的已经分析过了 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // ### 重点看unparkSuccessor(h)方法,入参是`头节点` return true; } return false; } ## AQS类 private void unparkSuccessor(Node node) { // 获取Node节点的waitStatus,如果<0(比如带唤醒SIGNA = -1),原子形还原成0 int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); // 获取头结点的下一个节点,如果是空(CANCELLED可能产生空),链表尾部遍历,取最前面一个waitStatus<0的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) LockSupport.unpark(s.thread); // 唤醒 }
先不考虑CANCELLED情况,那么 第二个节点对应的线程 会被唤醒。第二个节点是什么来路?前面已经分析了, 第1个获取锁失败的线程会和第二个节点绑定 (例子中的Node2,对应的线程自然是T-2,下图):
线程T-2
被唤醒后,会做什么?
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { //循环 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); //### 线程T-2原本被阻塞于此 } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }
很显然,如果 线程T-2
被唤醒后,由于循环的原故,会再次进入如下逻辑:
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); //head易主 p.next = null; return interrupted; } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
tryAcquire(arg)
再次尝试获取锁,显然此时 线程T-3
已经执行完了(不然也不会执行unlock),那么 线程T-2
很可能会获取到锁——
那么, head易主 ,队列发生如下变化:
最后给出 加 / 解锁过程中的队列变化 ,帮助理解。
以上,终于分析完了 ReentrantLock的主要方法的实现。(有点细碎哈)
本系列的下一篇文章会继续探索 ReentrantLock
的公平锁实现 ,敬请期待!