斜体为抽象类,下横线为接口
Lock接口定义了锁的行为
public interface Lock { //上锁(不响应Thread.interrupt()直到获取锁) void lock(); //上锁(响应Thread.interrupt()) void lockInterruptibly() throws InterruptedException; //尝试获取锁(以nonFair方式获取锁) boolean tryLock(); //在指定时间内尝试获取锁(响应Thread.interrupt(),支持公平/二阶段非公平) boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //解锁 void unlock(); //获取Condition Condition newCondition(); }
//锁具体实现 private final Sync sync; //根据传入参数选择FairSync或NonfairSync实现 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.lock(); } #java.util.concurrent.locks.ReentrantLock.Sync abstract void lock();
加入同步队列(当同步队列为空时会直接获得锁),等待锁
#java.util.concurrent.locks.ReentrantLock.FairSync final void lock() { acquire(1); } #java.util.concurrent.locks.AbstractQueuedSynchronizer public final void acquire(int arg) { if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire():模板方法,获取锁
#java.util.concurrent.locks.ReentrantLock.FairSync protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//当前锁没被占用 if (!hasQueuedPredecessors() &&//1.判断同步队列中是否有节点在等待 compareAndSetState(0, acquires)) {//2.如果上面!1成立,修改state值(表明当前锁已被占用) setExclusiveOwnerThread(current);//3.如果2成立,修改当前占用锁的线程为当前线程 return true; } } else if (current == getExclusiveOwnerThread()) {//占用锁线程==当前线程(重入) int nextc = c + acquires;// if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);//修改status return true; } return false;//直接获取锁失败 }
acquireQueued(addWaiter(Node.EXCLUSIVE), arg):加入同步队列
#java.util.concurrent.locks.AbstractQueuedSynchronizer //1 private Node addWaiter(Node mode) { //生成node Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { //将node加到队列尾部 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果加入失败(多线程竞争或者tail指针为null) enq(node); return node; } //1.1 private Node enq(final Node node) { //死循环加入节点(cas会失败) for (;;) { Node t = tail; if (t == null) { //tail为null,同步队列初始化 //设置head指针 if (compareAndSetHead(new Node()))//注意这里是个空节点!! tail = head;//将tail也指向head } else { node.prev = t;//将当前node加到队尾 if (compareAndSetTail(t, node)) { t.next = node; return t;//注意这里才返回 } } } } //2 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //表示是否被打断 boolean interrupted = false; for (;;) { //获取node.pre节点 final Node p = node.predecessor(); if (p == head //当前节点是否是同步队列中的第二个节点 && tryAcquire(arg)) {//获取锁,head指向当前节点 setHead(node);//head=head.next p.next = null;//置空 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //是否空转(因为空转唤醒是个耗时操作,进入空转前判断pre节点状态.如果pre节点即将释放锁,则不进入空转) parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞) interrupted = true;//如果Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁) } } finally { if (failed)//tryAcquire()过程出现异常导致获取锁失败,则移除当前节点 cancelAcquire(node); } }
过程总结:
注意:这里有两次tryAcquire()过程.第一次,为了避免同步队列为空时还插入队列产生的性能耗费(cas空转).第二次,就是正常的流程.先插入队尾,然后等待唤醒,再获取锁
selfInterrupt(): 唤醒当前线程
static void selfInterrupt() {//在获取锁之后 响应intterpt()请求 Thread.currentThread().interrupt(); }
一阶段
#java.util.concurrent.locks.ReentrantLock.NonfairSync final void lock() { //在acquire()之前先尝试获取锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
二阶段 acquire()流程与公平锁一模一样,唯一区别在于tryAcquire()实现中
#java.util.concurrent.locks.ReentrantLock.NonfairSync protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } #java.util.concurrent.locks.ReentrantLock.Sync final boolean nonfairTryAcquire(int acquires) {//这个过程其实和FairSync.tryAcquire()基本一致 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //唯一区别: 这里不会去判断队列中是否为空 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
区别点 | lock()过程(一阶段) | tryAcquire()过程(二阶段) |
---|---|---|
FairSync | 直接acquire() | 当前若无线程持有锁,如果同步队列为空,获取锁 |
NonFairSync | 先尝试获取锁,再acquire() | 当前若无线程持有锁,获取锁 |
#java.util.concurrent.locks.ReentrantLock public void unlock() { sync.release(1); } #java.util.concurrent.locks.AbstractQueuedSynchronizer public final boolean release(int arg) { if (tryRelease(arg)) {//释放锁 Node h = head; if (h != null &&//head节点为空(非公平锁直接获取锁) h.waitStatus != 0) unparkSuccessor(h);//唤醒同步队列中离head最近的一个waitStatus<=0的节点 return true; } return false; } #java.util.concurrent.locks.ReentrantLock protected final boolean tryRelease(int releases) { int c = getState() - releases; //持有锁的线程==当前线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//重入锁全部释放 free = true; //置空持有锁线程 setExclusiveOwnerThread(null); } //state==0(此时持有锁,不用cas) setState(c); return free; }
lockInterruptibly()与lock()过程基本相同,区别在于Thread.intterpt()的应对措施不同
//lock() final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //表示是否被打断 boolean interrupted = false; for (;;) { //获取node.pre节点 final Node p = node.predecessor(); if (p == head //当前节点是否是同步队列中的第二个节点 && tryAcquire(arg)) {//获取锁,当前head指向当前节点 setHead(node);//head=head.next p.next = null;//置空 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //是否空转(因为空转唤醒是个耗时操作,进入空转前判断pre节点状态.如果pre节点即将释放锁,则不进入空转) parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞) interrupted = true;//如果Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁) } } finally { if (failed)//tryAcquire()过程出现异常导致获取锁失败,则移除当前节点 cancelAcquire(node); } } // lockInterruptibly() private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//唯一区别当Thread.intterpt()打断时,直接抛出异常 throw new InterruptedException(); } } finally { if (failed)//然后移除当前节点 cancelAcquire(node); } }
#java.util.concurrent.locks.ReentrantLock public boolean tryLock() { //尝试获取非公平锁 return sync.nonfairTryAcquire(1); }
#java.util.concurrent.locks.ReentrantLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } #java.util.concurrent.locks.AbstractQueuedSynchronizer public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) ||//获取锁(公平/非公平) doAcquireNanos(arg, nanosTimeout);//在指定时间内等待锁(空转) } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { ... final long deadline = System.nanoTime() + nanosTimeout; //加入队尾 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return true; } //上面与acquireQueued()相同,重点看这里 //计算剩余时间 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //利用parkNanos()指定空转时间 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted())//如果被Thread.interrupt(),则抛异常 throw new InterruptedException(); } } finally { if (failed)//移除节点 cancelAcquire(node); } }
public Condition newCondition() { return sync.newCondition(); } #java.util.concurrent.locks.ReentrantLock.Sync final ConditionObject newCondition() { return new ConditionObject(); }