转载

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

java.util.concurrent是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如 线程池、阻塞队列、计时器、同步器、并发集合 等等。

二、介绍Lock

Lock最为重要的特性就是解决并发程序的安全性问题。在JUC大部分组件都使用了Lock,所以了解和使用Lock显得尤为重要。Lock在JUC中本质上是以一个接口的形势表现的。

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

我们可以从上面的图中可以看出关于锁有很多不同的实现类。下面来简单介绍一翻吧。

2.1 ReentrantLock(重入锁)

ReentrantLock实现了Lock接口,表示重入锁。是线程在获得锁之后,再次获取锁不需要阻塞,而是直接关联一次计数器增加重入次数。 后面我们重点分析ReentrantLock的原理。

2.2 ReentrantReadWriteLock(重入读写锁)

ReentrantReadWriteLock实现了ReadWriteLock接口,其中有两把锁,一个 ReadLock ,一个 WriteLock ,它们分别实现了 Lock 接口。 适合读多写少的场景。 基本原则:

  • 读和读不互斥;
  • 读和写互斥;
  • 写和写互斥。

2.3 StampedLock(改进版读写锁)

StampedLock是JDK1.8引进的新的锁机制,它是读写锁的一个改进版。一种乐观的读策略,使得乐观锁完全不阻塞写线程。

三、ReentrantLock设计

说到重入锁ReentrantLock,就是再次获取锁的同时,只是对重入次数进行计数,而不需要阻塞来获取锁。先来看一个案例代码,这样容易理解重入锁的概念。

我们在测试代码中调用test()方法获得了当前对象的锁,然后在这个方法中去调用test1()方法,test2()中也存在一个实例锁,这个时候当前线程无法获取test1()中的对象锁而阻塞, 这样就会产生死锁。R eentrantLock重入锁的目的就是为了避免线程产生死锁。

public class ReentrantLockDemo {

    public synchronized void test() {
        System.out.println("Begin test...");
        test1();
    }

    public void test1() {
        System.out.println("Begin test1...");
        synchronized (this) {

        }
    }

    public static void main(String[] args) {
        ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
        new Thread(reentrantLockDemo::test).start();
    }

}
复制代码

3.1 ReentrantLock重入锁使用案例

public class ReentrantLockDemo {

    private static int count = 0;

    static Lock lock = new ReentrantLock(true);

    public static void incr() {
        //线程A获取锁,计数state = 1
        lock.lock();
        try {
            //退出线程 中断的过程往下传递.  true
            // sleep/ join/ wait
            //while()
            // ...
            Thread.sleep(1);
            count++;
//            decr();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            //线程A释放锁,state=1-1=0
            lock.unlock();
        }
    }


    public static void decr() {
        //线程A再次获取锁,计数加1,state = 2
        lock.lock();
        try{
            count--;
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(()->{
           ReentrantLockDemo.incr();
        });
        t1.start();
        t1.interrupt();//线程中断


        for(int i = 0 ; i < 1000; i++) {
            new Thread(()->{
                ReentrantLockDemo.incr();
            }).start();
        }
        Thread.sleep(3000);
        System.out.println("result = " + count);
    }
}
复制代码

3.2 ReentrantReadWriteLock重入读写锁案例

读写锁维护了一个读锁,一个写锁。一般情况下读写锁比排它锁的性能要好一些,因为大多数的场景是读多写少的。

public class ReentrantReadWriteLockDemo {

    static Map<String, Object> map = new HashMap<>();

    static ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock();
    static Lock read = rrwl.readLock();
    static Lock write = rrwl.writeLock();

    public static Object get(String key) {
        System.out.println("Begin reading data...");
        read.lock();
        try{
            return map.get(key);
        }finally {
            read.unlock();
        }
    }

    public static Object put(String key, Object obj) {
        System.out.println("Begin writing data...");
        write.lock();
        try{
            return map.put(key, obj);
        }finally {
            write.unlock();
        }
    }


}
复制代码
  • 读锁与读锁可以共享;
  • 读锁与写锁不可以共享(排他);
  • 写锁与写锁不可以共享(排他。

3.3 ReentrantLock的实现原理

我们在Synchronized中分析了 偏向锁、轻量级锁、重量级锁 。它们是基于 乐观锁 以及 自旋锁 来优化synchronized加锁的开销,在 重量级锁阶段 是通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。

那么在ReentrantLock也一定存在这样的问题,那么它是怎么去解决的呢?这里我们需要引入AQS(AbstractQueueSynchronizer)。

3.3.1 什么是AQS

在Lock中,AQS是一个同步队列,它是一个同步工具,也是Lock用来实现线程同步的核心组件。

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

3.3.2 AQS的独占锁和共享锁

  • 独占锁:每次只有一个线程持有锁, ReentrantLock 的独占锁方式;
  • 共享锁:允许多个线程同时获得锁,并访问共享资源, ReentrantReadWriteLock 的共享锁方式。

3.3.3 AQS的内部实现

AQS内部维护的是一个FIFO的双向链表,这种数据结构的特点就是有 两个指针 ,分别指向直接的后继节点next和直接的前驱节点prev。当线程抢占锁失败后,会封装成一个 Node 直接放入到AQS阻塞队列中。

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

3.3.4 AQS中的Node

先上AQS中的源码:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;//前驱节点

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;//后继节点

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;//当前线程

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;//存储在condition队列中的后继节点

        /**
         * Returns true if node is waiting in shared mode.
         */
        //是否为共享锁
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }
        //将线程组装成一个Node,添加到队列中
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        //在condition队列中进行使用
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
复制代码

四、ReentrantLock的源码分析

4.1 画出ReentrantLock时序图

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

4.2 ReentrantLock.lock()

源码如下:

public void lock() {
    sync.lock();
}
复制代码
JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

根据源码可以看到具体的实现,分别是FairSync(公平)和NonFairSync(非公平)两个类。

  • FairSync:所有线程严格按照FIFO规则获取锁;
  • NonFairSync:可以存在抢占锁的功能,不管队列上是否存在其他线程等待,新线程都有机会抢占锁。

4.3 NonFairSync.lock()

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //对于非公平锁,一开始就CAS抢占一下
        //如果CAS成功了,就表示获得了锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else//如果CAS失败了,调用acquire()方法走竞争逻辑
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
复制代码

4.3.1 CAS的实现原理

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码

CAS 就是 Unsafe 类中提供的一个原子操作。

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出
  • var1:需要改变的对象;
  • var2:偏移量(headOffset的值);
  • var4:期待的值;
  • var5:更新后的值。

整个方法更新成功返回true,失败则返回false。

state是AQS中的一个属性,对于重入锁(ReentrantLock)而言,它表示一个同步状态。有两层含义:

  • 当state=0时,表示无锁状态;
  • 当state>0时,表示线程获得了锁,state+1,重入多少次数,state会递增;而当锁释放的时候,state次数递减,直到state=0其它线程才有资格抢占锁

接下来我们来看unsafe.cpp文件中最终执行的源码方法吧。

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  //将Java对象解析成JVM的oop
  oop p = JNIHandles::resolve(obj);
  //根据对象p和地址偏移量找到地址
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  //基于 cas 比较并替换, x 表示需要更新的值,addr 表示 state 在内存中的地址,e 表示预期值
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
复制代码

4.3.2 Unsafe类

属于sun.misc包,不属于Java标准。但是很多 Java 的基础类库,包 括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty、 Hadoop、Kafka 等;

Unsafe 可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、 线程的挂起和恢复、CAS、线程同步、内存屏障等。

4.4 AQS.acquire()

从下面源码分析来看,如果CAS未能操作成功,说明state已经不等于0了,此时需要执行acquire(1)方法。

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
复制代码

接着看acquire(1)方法的源码吧。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
复制代码
  • 尝试使用tryAcquire(arg)获得独占锁,如果成功返回true,失败返回false;
  • 如果tryAcquire失败,则通过addWaiter方法将当前线程封装成Node对象加入到AQS队列尾部;
  • acquireQueued,将Node作为参数,通过自旋的方式获得锁,下面是对于的源码。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//自旋方式获得锁
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

4.4.1 NonfairSync.tryAcquire()

这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
复制代码

下面来看此方法的具体实现:

  • 获得当前线程,判断当前锁的状态;
  • 如果state=0表示无锁状态,通过CAS更新state状态的值;
  • 如果当前线程属于重入,则增加重入次数
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
复制代码

4.5 AQS.addWaiter()

当tryAcquire()获取锁失败时,则会调用此方法来将当前线程封装成Node对象加入到AQS队列尾部。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //tail表示AQS队列的尾部,默认为null
    Node pred = tail;
    if (pred != null) {
        //当前线程的prev执行tail
        node.prev = pred;
        //通过CAS把node加入到队列中,并设置为tail
        if (compareAndSetTail(pred, node)) {
            //设置成功后,把tail节点的next指向当前node
            pred.next = node;
            return node;
        }
    }
    //tail为null时,把node加入到同步队列
    enq(node);
    return node;
}
复制代码

enq(node)方法通过自旋的方式,把当前节点node加入到同步队列中去,下面看一下enq源码吧:

private Node enq(final Node node) {
    for (;;) {
        //将新的节点prev指向tail
        Node t = tail;
        if (t == null) { // Must initialize
            //通过CAS将tail设置为新的节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                //将原来的tail的next节点指向新的节点
                t.next = node;
                return t;
            }
        }
    }
}
复制代码
JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

4.6 AQS.acquireQueued()

通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给 acquireQueued 方法,去竞争锁。

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        //线程中断标记
        boolean interrupted = false;
        for (;;) {
            //获得当前节点的prev节点
            final Node p = node.predecessor();
            //如果是head节点,说明有资格去抢占锁
            if (p == head && tryAcquire(arg)) {
                //获取锁成功,线程A已经释放了锁,然后设置head为线程B获得执行权限
                setHead(node);
                //把原来的head节点从链表中移除,弱引用
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //线程A可能还没释放锁,使得线程B在执行tryAcquire时返回false
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //当前线程在等待过程中有没有中断
                interrupted = true;
        }
    } finally {
        //取消锁的操作
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

4.6.1 shouldParkAfterFailedAcquire()

线程A的锁可能还没释放,那么此时线程B来抢占锁肯定失败,就会调用此方法。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前置节点
    int ws = pred.waitStatus;
    //如果前置节点为 SIGNAL,意味着只需要等待其他前置节点的线程被释放
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        //返回true,可以放心挂起了
        return true;
    //ws 大于 0,意味着 prev 节点取消了排队,直接移除这个节点就行
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            //相当于: pred=pred.prev;node.prev=pred;
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);//这里采用循环,从双向列表中移除 CANCELLED 的节点
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //利用 cas 设置 prev 节点的状态为 SIGNAL(-1)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
复制代码

Node的状态有5种,默认状态是0,以下是其它四种状态:

//在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点, 其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化
static final int CANCELLED =  1;
//只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
static final int SIGNAL    = -1;
//表示该线程在condition队列中阻塞
static final int CONDITION = -2;
//共享模式下,PROPAGATE 状态的线程处于可运行状态
static final int PROPAGATE = -3;
复制代码

4.6.2 parkAndCheckInterrupt()

使用LockSupport.park(this)挂起当前线程为WAITING状态

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
复制代码

Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是 thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识 true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味 着在 acquire 方法中会执行 selfInterrupt()。

4.6.3 selfInterrupt()

当前线程在acquireQueued中被中断过,则需要产生一个中断请求,原因是线程在调用acquireQueued方法的时候不会响应中断请求。

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
复制代码

4.6.4 LockSupport

从Java6开始引用的一个提供了基本的线程同步原语的类,LockSupport本质还是调用了Unsafe中的方法:

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  Parker* p = NULL;
  if (jthread != NULL) {
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    if (java_thread != NULL) {
      jlong lp = java_lang_Thread::park_event(java_thread);
      if (lp != 0) {
        // This cast is OK even though the jlong might have been read
        // non-atomically on 32bit systems, since there, one word will
        // always be zero anyway and the value set is always the same
        p = (Parker*)addr_from_java(lp);
      } else {
        // Grab lock if apparently null or using older version of library
        MutexLocker mu(Threads_lock);
        java_thread = JNIHandles::resolve_non_null(jthread);
        if (java_thread != NULL) {
          JavaThread* thr = java_lang_Thread::thread(java_thread);
          if (thr != NULL) {
            p = thr->parker();
            if (p != NULL) { // Bind to Java thread for next time.
              java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
            }
          }
        }
      }
    }
  }
  if (p != NULL) {
#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK(
                          (uintptr_t) p);
#endif /* USDT2 */
    p->unpark();
  }
UNSAFE_END
复制代码
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN(
                             (uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END(
                          (uintptr_t) thread->parker());
#endif /* USDT2 */
  if (event.should_commit()) {
    oop obj = thread->current_park_blocker();
    event.set_klass((obj != NULL) ? obj->klass() : NULL);
    event.set_timeout(time);
    event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
    event.commit();
  }
UNSAFE_END
复制代码

4.7 ReentrantLock.unlock()

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

在unlock()方法中,会调用release()方法来释放锁:

public void unlock() {
    sync.release(1);
}
复制代码
public final boolean release(int arg) {
    //释放锁成功
    if (tryRelease(arg)) {
        //得到AQS队列中的head节点
        Node h = head;
        //如果head不为空并且状态不等于0,调用unpark唤醒后续节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
复制代码

4.7.1 tryRelease()

protected final boolean tryRelease(int releases) {
    //state状态减掉传入的参数1
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //如果结果为0,将排它锁的Owner设置为null
    //解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
复制代码

4.7.2 unparkSuccessor()

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        //获得head节点的状态
        int ws = node.waitStatus;
        if (ws < 0)
            //设置head节点的状态为0
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //得到head节点的下一个节点
        Node s = node.next;
        //如果下一个节点为 null 或者 status>0 表示 cancelled 状态
        if (s == null || s.waitStatus > 0) {
            s = null;
            //通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //next 节点不为空,直接唤醒这个线程即可
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码

4.7.3 为什么释放锁的时候是从tail节点开始扫描的?

我们在加锁的enq()方法中,在 cas 操作之后,t.next=node 操作之前。 存在其他线程调用 unlock 方法从 head开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问题。

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

4.8 原本挂起的线程如何执行呢?

通过ReentrantLock.unlock()将原本挂起的线程换唤醒后继续执行,原来被挂起的线程是在 acquireQueued () 方法中,所以被唤醒以后继续从这个方法开始执行.

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

五、公平锁与非公平锁的区别

锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序 就应该符合请求的绝对时间顺序,也就是 FIFO 。 只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点 有两个:

1、FairSync.lock()方法

final void lock() {
    acquire(1);
}
复制代码

2、NonfairSync.lock()方法

非公平锁在获取锁的时候,会先通过 CAS 进行抢占,而公平锁则不会。

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
复制代码
原文  https://juejin.im/post/5eede96ce51d45741017d83a
正文到此结束
Loading...