转载

AQS系列一:源码分析非公平ReentrantLock

加 / 解锁史

早些时候(jdk 1.5之前),并发环境下做同步控制,你的选择不多,多半是使用 synchronized 关键字。不管是同步方法还是同步块,总之遇到这个关键字,未获取锁线程就会乖乖等候,直到已获取锁的线程释放掉锁。

而jdk 1.5推出 ReenntrantLock 之后,此工具一度很风靡,当时人们更喜欢用Lock而不是synchronized,主要是因为它用起来灵活吧。(本人到现在为止,用synchronized的场景还是Lock的时候多)直到后来,越来越多的文章,从性能、是否公平、实现原理各个方面对二者比较,大家才对他们有了更直观的认识。

本文旨在分析ReenntrantLock的主要实现逻辑,并初步窥探AQS结构。如果不犯懒的话,希望后续能将AQS做成系列,真正理解Doug Lea大神的这个经典实现。

ReenntrantLock使用

研究工具的原理之前,要先会使用工具。

tryLock()

public class ReentrantLockTest {

    Lock lock = new ReentrantLock();    //创建锁

    public void doSomething(){
        //### 1-尝试获取锁,成功
        if(lock.tryLock()){    
            System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName()));
            try {
                //模拟逻辑执行
                TimeUnit.MILLISECONDS.sleep(1100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName()));
            lock.unlock();    //### 1.1-逻辑执行完,释放锁
        }
        
        //### 2-尝试获取锁,失败
        else {    
            System.out.println(String.format("%s线程,获取锁失败",Thread.currentThread().getName()));
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ReentrantLockTest test = new ReentrantLockTest();
        int total = 3;
        while (total>0){
            Thread t = new Thread(()->{
                test.doSomething();
            },"T-"+total);
            t.start();
            total--;
            TimeUnit.MILLISECONDS.sleep(1000L);
        }
    }
}

tryLock() 方法会 尝试获取锁,如果获取不到,直接 return false (不会阻断);如果获取到锁, return true

上面的例子,执行结果为:

T-3线程,获取到锁了
T-2线程,获取锁失败
T-3线程,业务执行完毕
T-1线程,获取到锁了
T-1线程,业务执行完毕

lock()

修改下上例中的加锁方式:

Lock lock = new ReentrantLock();

public void doSomething2(){
    lock.lock();
    System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName()));
    try {
        TimeUnit.MILLISECONDS.sleep(1000L);    //模拟业务逻辑
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName()));
    lock.unlock();
}

public static void main(String[] args) throws InterruptedException {
    ReentrantLockTest test = new ReentrantLockTest();
    int total = 3;
    while (total>0){
        Thread t = new Thread(()->{
            test.doSomething2();
        },"T-"+total);
        t.start();
        total--;
    }
}

与tryLock()不通, lock() 方式尝试获取锁,如果获取不到会持续等待

执行结果会变为:

T-3线程,获取到锁了
T-3线程,业务执行完毕
T-2线程,获取到锁了
T-2线程,业务执行完毕
T-1线程,获取到锁了
T-1线程,业务执行完毕

ReenntrantLock分析

ReenntrantLock 加 / 解锁的使用方式就这些,而它是靠编码实现的。下图给出了ReenntrantLock类部分结构:

AQS系列一:源码分析非公平ReentrantLock

ReenntrantLock 默认 实现的是 非公平锁 (本文也只分析非公平实现)。

final Sync sync;

public ReentrantLock() {
    sync = new NonfairSync();    //成员变量sync,赋值成NonfairSync的对象
}

tryLock()实现

先从实现较简单的 tryLock() 研究:

## ReentrantLock类
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

    ↓↓↓↓↓
    ↓↓↓↓↓

## ReentrantLock.Sync类
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();    // 1- 获取AQS类中的state状态值
    if (c == 0) {    
        // 2- 如果state是0(默认值),将state原子形修改成1
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);    // 2.1- 原子修改成功,标记AOS中的exclusiveOwnerThread为当前线程
            return true;
        }
    }
    // 3- 此时state不是1,当前线程 == AOS中的exclusiveOwnerThread,将state修改为1
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryLock()方法, 核心逻辑就是原子修改AQS中的 statevolatile + CAS (jdk9 VarHandle实现)。

具体一些:

实现过程中,只在 首次修改 state 值,即将其从0改成1的时候,采用了原子的 CAS 方式

之后 只判断当前线程和 owner线程 (AOS中的exclusiveOwnerThread) 是否一致 如果一致state++;不一致,直接 return false

unLock()实现

unLock()实现同样简单

## ReentrantLock类
public void unlock() {
    sync.release(1);
}

    ↓↓↓↓↓
    ↓↓↓↓↓
    
## ReentrantLock.Sync类
public final boolean release(int arg) {
    ...
    tryRelease(arg)    //尝试释放
    ...
}

    ↓↓↓↓↓
    ↓↓↓↓↓
    
## ReentrantLock.Sync类    
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;    // state--
    
    // 1-验证线程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    
    boolean free = false;
    if (c == 0) {    //2-如果state==0时,将结果赋值为true,清空owner线程
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);    //state赋值
    return free;
}

如果 操作线程是 owner 线程 (首次tryLock()时会记录owner):

tryLock() 每次调用, state++unLock() 每次调用, state-- (state=0时,清空owner线程)。

Tip: 注释1处,如果当前线程非owner线程,会直接抛出异常!

lock()实现

对于 tryLock() 而言,它在实现上,完全没用到AQS的精华。既然叫Abstract Queued Synchronizer——抽象队列同步器, 队列同步 什么的才是重点。别急, lock() 方法会用到这些。

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();    //中断interrupt
}

对于默认的非公平锁实现, acquire(int arg) 完全可 替换 成如下写法:

public final void acquire(int arg) {
    ##### tryAcquire(arg) 改成了 tryLock(arg)
    if (!tryLock(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();    //线程 interrupt
}

如此替换后,逻辑就很好理解了:在用 tryLock() 获取锁失败的情况下,会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 显然也分成了两个方法 addWaiteracquireQueued

  • 先看 addWaiter(Node.EXCLUSIVE) 部分
private Node addWaiter(Node mode) {
    Node node = new Node(mode);    //创建node,创建的同时绑定线程

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {    //循环2-将node节点和首次循环中初始化的队列关联
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();    //循环1-初始化同步队列
        }
    }
}

这里需关注 AQS.Node 类 的一些 关键属性 (已文字标明各属性用途):

## 表示Node节点的状态,有CANCELLED(待取消)、SIGNAL(待唤醒)、CONDITION或默认的0几个状态
volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;

volatile Node prev;    //prev指向前节点
volatile Node next;    //next指向后节点

## 节点绑定线程
volatile Thread thread;

通过下图,可更清楚的看出 addWaiter方法 的执行过程(此时 线程T-3 在执行中):

AQS系列一:源码分析非公平ReentrantLock

结论1:

addWaiter`会创建队列,并返回尾节点,即图中的`Node2`
  • 再看 acquireQueued(final Node node, int arg) 方法
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();    //获取pre节点,就是Node1
            if (p == head && tryAcquire(arg)) {    //### 注释1-再次尝试获取锁
                setHead(node);    //获取到锁了,去掉Node1,Node2变成新的head节点
                p.next = null; // help GC
                return interrupted;
            }
            
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    }
    ...
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)    //2次循环,将waitStatus==Node.SIGNAL,renturn true
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);    //首次循环,将pre节点Node1的waitStatus修改成SIGNAL
    }
    return false;
}

这里依照上图,详细解释下:

acquireQueued 方法的 入参 前面提到了,就是 addWaiter方法 新增的尾节点 ,即入参 node = Node2,那么node.predecessor()自然是Node1了—— p = Node1。

注释1位置,先判断 p 是不是 头结点

  • 如果 p 是头节点 (上图中,p就是头结点), tryAcquire(arg) 会再次尝试获取锁。此时也有两种情况:

    • 线程T-3 已经 执行完 并释放了锁,那么当前 线程T-2 可以获取到锁;之后去掉当前头结点Node1,将Node2设置成头结点。
    • 线程T-3 未执行完 ,那么当前 线程T-2 无法获取锁,之后会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法
  • p 不是头结点 ,同样会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法

而由于 shouldParkAfterFailedAcquire(Node pred, Node node) 方法在循环中,可能会执行两次:

  • 首次循环 ,将 pre节点 Node1waitStatus 修改成 SIGNAL 注意,由于循环的原故,还会再次执行到 注释1 处,也就会再次尝试获取锁 ——上次线程T-3未结束,这次就有可能结束了);
  • 有幸进入二次循环 时, pre节点Node1waitStatus 已经是 SIGNAL ,直接 return true 。后面的parkAndCheckInterrupt()方法会将当前 线程T-2 阻塞

给出 线程T-2 未获取锁情况下的队列情况:

AQS系列一:源码分析非公平ReentrantLock

列出 线程T-1 也参与其中的完整队列图。可看到 尾节点之前的节点,绑定的线程都是 阻塞 状态 (park) ,而 waitStatus 都是 待唤醒 状态 (waitStatus = SIGNAL = -1):

AQS系列一:源码分析非公平ReentrantLock

总结以上内容,作为 结论2:

acquireQueued`方法,如果当前线程是第1个获取锁失败的线程(例子中“线程T-3”正在执行,“线程T-2”就是第一个获取锁失败的线程),会再尝试2次获取锁;
获取锁失败 或 当前线程非第1个获取锁失败的线程(例子中T-1就不是第一个获取锁失败的线程),将前置节点状态修改成待唤醒,并阻塞关联线程。

为了便于理解,画出整个 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法的逻辑图:

AQS系列一:源码分析非公平ReentrantLock

阻塞并非终点,还要再次看下 unlock() 时做了什么。

又见unlock()

## ReentrantLock类
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {    //尝试释放,前面的已经分析过了
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);    // ### 重点看unparkSuccessor(h)方法,入参是`头节点`
        return true;
    }
    return false;
}

## AQS类
private void unparkSuccessor(Node node) {
    // 获取Node节点的waitStatus,如果<0(比如带唤醒SIGNA = -1),原子形还原成0
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    // 获取头结点的下一个节点,如果是空(CANCELLED可能产生空),链表尾部遍历,取最前面一个waitStatus<0的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    
    if (s != null)
        LockSupport.unpark(s.thread);    // 唤醒
}

先不考虑CANCELLED情况,那么 第二个节点对应的线程 会被唤醒。第二个节点是什么来路?前面已经分析了, 第1个获取锁失败的线程会和第二个节点绑定 (例子中的Node2,对应的线程自然是T-2,下图):

AQS系列一:源码分析非公平ReentrantLock

线程T-2 被唤醒后,会做什么?

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) { //循环
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();    //### 线程T-2原本被阻塞于此
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

很显然,如果 线程T-2 被唤醒后,由于循环的原故,会再次进入如下逻辑:

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
    setHead(node); //head易主
    p.next = null;
    return interrupted;
}

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

tryAcquire(arg) 再次尝试获取锁,显然此时 线程T-3 已经执行完了(不然也不会执行unlock),那么 线程T-2 很可能会获取到锁——

那么, head易主 ,队列发生如下变化:

AQS系列一:源码分析非公平ReentrantLock

加 / 解锁队列变化

最后给出 加 / 解锁过程中的队列变化 ,帮助理解。

  • 加锁过程

AQS系列一:源码分析非公平ReentrantLock

  • 解锁过程

AQS系列一:源码分析非公平ReentrantLock

后记

以上,终于分析完了 ReentrantLock的主要方法的实现。(有点细碎哈)

本系列的下一篇文章会继续探索 ReentrantLock 的公平锁实现 ,敬请期待!

原文  https://segmentfault.com/a/1190000020257597
正文到此结束
Loading...