转载

ReentrantLock源码解析

  • abstract static class Sync extends AbstractQueuedSynchronizer
  • static final class NonfairSync extends Sync
  • static final class FairSync extends Sync

代码使用示例

private ReentrantLock reentrantLock = new ReentrantLock();
    public void  test (){
        reentrantLock.lock();
        System.out.println(Thread.currentThread().getName()+ "test....begin");
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+"test....end");
        reentrantLock.unlock();
    }
复制代码
ReentrantLock 提供两种方式加锁 公平和非公平
默认为非公平 如果要使用非公平构造函数传true
加公平锁调用FairSync.lock()
加非公平锁调用NonfairSync.lock()
释放锁调用AbstractQueuedSynchronizer.release(int arg)

代码解析

NonfairSync.lock 代码

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
复制代码
  1. 先调用AbstractQueuedSynchronizer.compareAndSetState(int expect, int update) AQS通过CAS维护了state字段,0代表没有被线程占有 所以非公平锁先尝试通过CAS替换state,如果替换成功代表没有被别的线程占有, 设置当前线程占有,lock方法结束,线程可以继续往下走了
  2. 如果没有替换成功再调用AbstractQueuedSynchronizer.acquire(int arg)

AbstractQueuedSynchronizer.acquire(int arg)代码

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码
  1. tryAcquire(arg) 尝试获取锁 NonfairSync.tryAcquire(int acquires)--> Sync.nonfairTryAcquire(int acquires) 代码片段
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState(); 获取state字段的值
            if (c == 0) { 如果值是0 代表没有线程占用
                if (compareAndSetState(0, acquires)) { 通过CAS替换看自己是否可以占用成功
                    setExclusiveOwnerThread(current);
                    return true; 如果成功就返回true
                }
            }
            如果有线程占用,判断占用的线程和自己是否是同一个线程
            else if (current == getExclusiveOwnerThread()) {
                //如果是同一个线程 state字段加上传入的参数
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //更新state
                setState(nextc);
                //从这可以看出 NonfairSync.lock是重入锁
                return true;
            }
            //如果都没满足,代表加锁不成功
            return false;
        }    
复制代码
  1. 如果tryAcquire(arg)返回ture,代表加锁成功,后面就不用走了 如果加锁不成功,继续acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  2. 先看addWaiter(Node.EXCLUSIVE) 调用的是AbstractQueuedSynchronizer.addWaiter AbstractQueuedSynchronizer.addWaiter代码片段
private Node addWaiter(Node mode) {
        //把自己当前线程封装成一个node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //判断队尾是不是空
        if (pred != null) {
            //如果队尾不是空
            node.prev = pred;
            //就把自己node的上一个结点设置为队尾那个结点
            更新自己结点为队尾结点
            if (compareAndSetTail(pred, node)) {
                //如果更新成功,把上一个结点的下一个结点设置为自己结点
                pred.next = node;
                //返回当前结点
                return node;
            }
        }
        
        
        enq(node);
        return node;
    }
复制代码

如果队尾为空或者更新自己为队尾结点失败继续enq(node) AbstractQueuedSynchronizer.enq片段

private Node enq(final Node node) {
    无限循环CAS操作,直到把自己变成队尾结点
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
复制代码

总结:addWaiter(Node.EXCLUSIVE) 是组装当前线程结点,然后通过CAS添加到队尾

  1. acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 通过addWaiter(Node.EXCLUSIVE)已经把自己加入队列了 那acquireQueued这个方法是做什么呢

AbstractQueuedSynchronizer.acquireQueued代码片段

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获得当前结点的前一个结点
                final Node p = node.predecessor();
                //如果head是前一个结点,我就再次尝试获取锁,万一获取成功了呢
                if (p == head && tryAcquire(arg)) {
                    //如果加锁成功就把自己设置为头结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //获取成功 就代表获得锁
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

如果还没有获得的锁就走shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt() 先看shouldParkAfterFailedAcquire(p, node) AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire代码片段

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //入参是前一个结点和当前结点
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //如果前一个是-1,代表可以安心的等待
            return true;
        if (ws > 0) {
            //如果前一个大于0,代表前一个等待超时或者被中断了,需要从同步队列中取消该Node的结点,所以继续找在等待的前结点,把找到的结点的下一个结点设为当前结点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果是小于0的其他状态都统一设为-1,等待状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
复制代码

这个方法其实就是如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,然后自己设为这个前驱结点的后一个结点

附上node结点的waitStatus含义:

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
  • 0状态:值为0,代表初始状态

如果找到安全休息点后就继续parkAndCheckInterrupt方法 真正让线程等待的方法 AbstractQueuedSynchronizer.parkAndCheckInterrupt代码片段

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//调用park()使线程进入waiting状态
        return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
    }
复制代码

park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位

前面已经介绍了NonfairSync.lock(),再看下FairSync.lock() FairSync.lock()代码片段

final void lock() {
            acquire(1);
        }
复制代码

相比于NonfairSync.lock(),少了一个CAS判断,因为是公平锁,所以不像非公平锁那样直接上来你尝试自己能不能获得锁 后面调用逻辑只是在FairSync.tryAcquire有区别,其他都一样

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
复制代码

比较Sync.nonfairTryAcquire(int acquires)只是多了!hasQueuedPredecessors()这个判断 就是在尝试获取锁的时候,不是直接去获取,而是看队列中有没有其他等待的线程,如果有,自己是不能直接去获取锁的

小结:ReentrantLock的公平锁和非公平锁加锁就讲完了,公平和非公平体现在 公平获取锁时是有先来后到的,非公平在尝试获取锁时,是不管队列中是否有其他线程在等待,自己直接去CAS尝试加锁,如果不成功才放入队列

ReentrantLock.unlock()其实调用的是AbstractQueuedSynchronizer.release(1) AbstractQueuedSynchronizer.release代码片段

public final boolean release(int arg) {
        if (tryRelease(arg)) {//释放可以释放锁资源
            Node h = head; //找到队列头
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//根据头结点唤醒下一个线程
            return true;
        }
        return false;
    }
复制代码

tryRelease调用的是ReentrantLock.tryRelease 代码片段

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;//计算state状态量
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {//如果是0代表可以释放
                free = true;
                setExclusiveOwnerThread(null);//清空锁资源占有的线程
            }
            setState(c);设置变量
            return free;
        }
复制代码

这个是重入锁释放的设计,因为在获取锁时,同一个线程可以多次对同一个资源加锁的,每加一次的时候state都会加1,释放也一样都会减一,只有当state等于0的时候才代表这个线程释放完了,所以在写ReentrantLock时,lock和unlock要成对出现,否则会一直在那占有资源

接下来看AbstractQueuedSynchronizer.unparkSuccessor代码片段

private void unparkSuccessor(Node node) {
        //这里,node一般为当前线程所在的结点
        int ws = node.waitStatus;
        if (ws < 0) //置零当前线程所在的结点状态,允许失败。
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next; //找到下一个需要唤醒的结点s
        if (s == null || s.waitStatus > 0) {//如果为空或已取消
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) //从这里可以看出,<=0的结点,都是还有效的结点
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //唤醒
    }
复制代码

最终就是找到下一个有效的结点,然后唤醒该结点对应的线程

原文  https://juejin.im/post/5c80cb75f265da2db3057e75
正文到此结束
Loading...