Java在java.util.concurrent.locks包中提供了一系列的显示锁类,其中最基础的就是Lock接口,该接口提供了几个常见的锁相关的操作。
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); } 复制代码
下面分别进行介绍:
获取锁。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。
如果当前线程未被中断,则获取锁。如果锁可用,则获取锁,并立即返回。与lock()接口唯一的区别是可以被中断。
试图获取锁,若锁可用,则获取锁,并立即返回值true。若锁不可用,则此方法将立即返回值false。
与上个方法不同的就是给定了超时时间,若锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
返回绑定到此 Lock 实例的新 Condition 实例。
通常使用显示锁Lock时,会采用下面的操作流程:
lock.lock(); try { //...需要保证线程安全的代码。 } finally { lock.unlock(); } 复制代码
Lock的lock()方法保证了只有一个线程能够执有此锁。对于任何一个lock()方法,都需要一个unlock()方法与之对应,通常情况下为了保证unlock()方法总是能够执行,unlock()方法被置于finally中。
Synchronized是Java的关键字,当它用来修饰一个方法或一个代码块时,能够保证在同一时刻最多只有一个线程执行该代码。因为当调用Synchronized修饰的代码时,并不需要显示的加锁和解锁的过程,代码简洁,一般称之为隐式锁。
Lock是一个接口,提供了无条件的、可轮询的、定时的、可中断的锁获取操作,所有的加锁和解锁操作方法都是显示的,因而称为显示锁。
可重入锁ReentrantLock是对Lock接口的一种实现,支持当一个线程获取锁以后,可以再次得到该对象锁。
ReentrantLock在初始化时,需要设定该锁的公平性:
ReentrantLock的特性如下:
synchronized和ReentrantLock均有可重入性,即一个线程请求得到一个对象锁后再次请求此对象锁,可以再次得到该对象锁。
在使用synchronized时,当一个线程已经进入到synchronized方法/块中时,可以进入到本类的其他synchronized方法/块中。
在lockInterruptibly()锁定的同时,还可以响应中断通知。一旦接收到中断通知,就会抛出InterruptedException异常。
这点与synchronized不同,在synchronized加锁的代码中,无法获取中断通知。
ReentrantLock.tryLock()方法用于尝试锁定。参数为等待时间。该方法返回boolean值。若锁定成功,则返回true。锁定失败,则返回false。tryLock方法在超时不能获得锁时,就返回false,不会永久等待构成死锁。
ReentrantLock内部利用AQS的线程队列,可以实现公平锁,但是性能相比非公平锁会差一点。
在构造方法中,ReentrantLock(boolean fair),fair默认为false,当设置为true时,及表示当前构造的锁是公平锁。
当需要可定时的、可轮询的与可中断的锁获取操作,公平队列,或者非块结构的锁,建议使用ReentrantLock。否则,请使用synchronized。在Java 1.6之后,ReentrantLock和synchronized性能相差不大,所以一般情况下,使用synchronized就足够了,只有当有特定需求时,可以使用可重入锁。
利用Lock和Condition可以实现消息的等待和通知,这里我们利用ReentrantLock来进行举例。
注意在使用condition时,需要首先lock.newCondition来获取Condition对象,如果有多个条件,需要针对不同的条件来获取condition。
发送信号,调用condition.signal()方法;等待,调用condition.await()方法。
注意与notify与wait的区别,后者Object的方法,一般用在一个对象上进行等待,等待的线程和某个特定的对象绑定。当需要notify所有线程时,为了保证我们的消息被所有线程接收到,通常使用notifyAll发送消息。但是使用condition对象,await和signal操作都是在condition对象是进行的,所以使用signal通知时,不会存在等待其他消息的线程阻止消息传递,所以通常使用signal而不是signalAll。
public class ExpressCond { public final static String CITY = "ShangHai"; private int km;/*快递运输里程数*/ private String site;/*快递到达地点*/ private Lock lock = new ReentrantLock(); private Condition keCond = lock.newCondition(); private Condition siteCond = lock.newCondition(); public ExpressCond() { } public ExpressCond(int km, String site) { this.km = km; this.site = site; } /* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/ public void changeKm(){ lock.lock(); try { this.km = 101; keCond.signal(); }finally { lock.unlock(); } } /* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/ public void changeSite(){ lock.lock(); try { this.site = "BeiJing"; siteCond.signal(); }finally { lock.unlock(); } } /*当快递的里程数大于100时更新数据库*/ public void waitKm(){ lock.lock(); try { while(this.km<=100) { try { keCond.await(); System.out.println("check km thread["+Thread.currentThread().getId() +"] is be notifed."); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }finally { lock.unlock(); } System.out.println("the Km is "+this.km+",I will change db"); } /*当快递到达目的地时通知用户*/ public void waitSite(){ lock.lock(); try { while(CITY.equals(this.site)) { try { siteCond.await(); System.out.println("check site thread["+Thread.currentThread().getId() +"] is be notifed."); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }finally { lock.unlock(); } System.out.println("the site is "+this.site+",I will call user"); } } 复制代码
下面是测试函数,将会唤醒一个等待km变化的线程。
public class TestCond { private static ExpressCond express = new ExpressCond(0,ExpressCond.CITY); /*检查里程数变化的线程,不满足条件,线程一直等待*/ private static class CheckKm extends Thread{ @Override public void run() { express.waitKm(); } } /*检查地点变化的线程,不满足条件,线程一直等待*/ private static class CheckSite extends Thread{ @Override public void run() { express.waitSite(); } } public static void main(String[] args) throws InterruptedException { for(int i=0;i<3;i++){ new CheckSite().start(); } for(int i=0;i<3;i++){ new CheckKm().start(); } Thread.sleep(1000); express.changeKm();//快递里程变化 } } 复制代码
ReadWriteLock接口提供了单独的读锁和写锁,
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); } 复制代码
ReentrantReadWriteLock类是ReadWriteLock接口的一个实现,它与ReentrantLock类一样提供了公平竞争与不公平竞争两种机制,默认也是使用非公平竞争机制。
ReentrantReadWriteLock的可以被多个读者访问和一个写者访问,提供了读写分离功能:
ReentrantReadWriteLock在读多写少的场景下,具有很强的性能优势。
1.重入方面其内部的WriteLock可以获取ReadLock,但是反过来ReadLock无法获得WriteLock。
2.WriteLock可以降级为ReadLock,顺序是:先获得WriteLock再获得ReadLock,然后释放WriteLock,这时候线程将保持Readlock的持有。反过来ReadLock想要升级为WriteLock则不可能。
4.不管是ReadLock还是WriteLock都支持Interrupt,语义与ReentrantLock一致。
5.WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常。
ReentrantLock是排他锁,使用非公平竞争机制时,抢占的机会相对还是比较少的,只有当新请求恰逢锁释放时才有机会抢占,所以发生线程饥饿的现象几乎很少。
ReentrantReadWriteLock是共享锁,或者说读读共享,并且经常使用于读多写少的场景,即请求读操作的线程多而频繁而请求写操作的线程极少且间隔长,在这种场景下,使用非公平竞争机制极有可能造成写线程饥饿。比如,R1线程此时持有读锁且在进行读取操作,W1线程请求写锁所以需要排队等候,在R1释放锁之前,如果R2,R3,...,Rn 不断的到来请求读锁,因为读读共享,所以他们不用等待马上可以获得锁,如此下去W1永远无法获得写锁,一直处于饥饿状态。
参考链接:
LockSupport是一个方便的线程阻塞工具,它可以在线程的任何位置让线程阻塞。与Thread.suspend()方法相比,它弥补了由于resume()方法导致线程无法继续执行的情况。和Object.wait()方法相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。
LockSupport主要有两个方法,
park()方法会阻塞当前线程(线程进入Waiting状态),除非它获取了"许可证"。
unpark(Thread t)方法会给线程t颁发一个"许可证"。
LockSupport使用了类似信号量的机制,它为每一个线程准备了一个许可,如果许可可用,park()方法会立刻返回,并且消费这个许可(也就是将许可变为不可用);如果许可不可用,就会阻塞,而unpack()方法则使得一个许可变为可用(但是和信号量不同的是,许可不可累加,永远只能拥有不超过一个许可)。
AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),是JUC并发包中的核心基础组件。JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。
AQS解决了实现同步器时涉及到的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
AQS使用了模板方法设计模式,子类通过继承同步器并实现它的抽象方法来管理同步状态。
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了如下三个方法来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
getState():返回同步状态的当前值;
setState(int newState):设置当前同步状态;
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态
tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
tryRelease(int arg):独占式释放同步状态;
acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
tryReleaseShared(int arg):共享式释放同步状态;
acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
releaseShared(int arg):共享式释放同步状态;
当在实现自己的lock类时,需要子类覆盖如下方法, 独占式获取 tryAcquire 独占式释放 tryRelease 共享式获取 tryAcquireShared 共享式释放 tryReleaseShared 这个同步器是否处于独占模式 isHeldExclusively
CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程以及等待状态等信息打包成一个节点(Node),并将其加入到CLH同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),
其定义如下:
static final class Node { /** 共享 */ static final Node SHARED = new Node(); /** 独占 */ static final Node EXCLUSIVE = null; /** * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态; */ static final int CANCELLED = 1; /** * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */ static final int SIGNAL = -1; /** * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步状态获取将会无条件地传播下去 */ static final int PROPAGATE = -3; /** 等待状态 */ volatile int waitStatus; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 获取同步状态的线程 */ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } 复制代码
对于CLH同步队列,一般有如下几种操作:
队列的主要变化是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。
整个流程图如下:
具体实现可以查看addWaiter(Node node)方法:
private Node addWaiter(Node mode) { //新建Node Node node = new Node(Thread.currentThread(), mode); //快速尝试添加尾节点 Node pred = tail; if (pred != null) { node.prev = pred; //CAS设置尾节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //多次尝试 enq(node); return node; } 复制代码
addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点
private Node enq(final Node node) { //多次尝试,直到成功为止 for (;;) { Node t = tail; //tail不存在,设置为首节点 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //设置为尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。
首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。
流程图如下:
AQS提供了acquire(int arg)方法来进行独占式同步状态获取,实现如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
其中相关函数的定义为:
acquireQueued方法为一个自旋的过程,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //中断标志 boolean interrupted = false; /* * 自旋过程,其实就是一个死循环而已 */ for (;;) { //当前线程的前驱节点 final Node p = node.predecessor(); //当前线程的前驱节点是头结点,且同步状态成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //获取失败,线程等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,主要是为了保持FIFO同步队列原则。头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。
在上面的流程中,当获取失败时,需要判断是否阻塞当前线程,
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; 复制代码
在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点 int ws = pred.waitStatus; //状态为signal,表示当前线程处于等待状态,直接放回true if (ws == Node.SIGNAL) return true; //前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } //前驱节点状态为Condition、propagate else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
这段代码主要检查当前线程是否需要被阻塞,具体规则如下:
如果当前线程的前驱节点状态为SIGNAL,则表明当前线程需要被阻塞,直接返回true,当前线程阻塞
如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
如果前驱节点非SIGNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SIGNAL,返回false
如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。
当线程释放同步状态后,则需要唤醒该线程的后继节点:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; } 复制代码
调用unparkSuccessor(Node node)唤醒后继节点:
private void unparkSuccessor(Node node) { //当前节点状态 int ws = node.waitStatus; //当前状态 < 0 则设置为 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //当前节点的后继节点 Node s = node.next; //后继节点为null或者其状态 > 0 (超时或者被中断了) if (s == null || s.waitStatus > 0) { s = null; //从tail节点来找可用节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继节点 if (s != null) LockSupport.unpark(s.thread); } 复制代码
可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。
以上就是整个独占式获取和释放的过程,流程图如下:
AQS提供了acquire(int arg)方法以供独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } 复制代码
首先校验该线程是否已经中断了,如果是则抛出InterruptedException,否则执行tryAcquire(int arg)方法获取同步状态,如果获取成功,则直接返回,否则执行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定义如下:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码
doAcquireInterruptibly(int arg)方法与acquire(int arg)方法仅有两个差别。
1.方法声明抛出InterruptedException异常。
2.在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。
AQS除了提供上面两个方法外,还提供了一个增强版的方法:tryAcquireNanos(int arg,long nanos)。该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。
AQS提供acquireShared(int arg)方法共享式获取同步状态:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //获取失败,自旋获取同步状态 doAcquireShared(arg); } 复制代码
从上面程序可以看出,方法首先是调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败则调用doAcquireShared(int arg)自旋方式获取同步状态,共享式获取同步状态的标志是返回 >= 0 的值表示获取成功。
private void doAcquireShared(int arg) { /共享式节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //前驱节点 final Node p = node.predecessor(); //如果其前驱节点,获取同步状态 if (p == head) { //尝试获取同步 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
tryAcquireShared(int arg)方法尝试获取同步状态,返回值为int,当其 >= 0 时,表示能够获取到同步状态,这个时候就可以从自旋过程中退出。
默认AQS没有提供tryAcquireShared的实现,需要子类自己实现该方法,
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } 复制代码
注意到独占式获取锁不同的是,如果tryAcquireShared的返回值大于0,会进行setHeadAndPropagate的操作,下面是该方法的实现,可以看到当某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。doReleaseShared后面会仔细分析。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 复制代码
获取同步状态后,完成相应的任务之后,需要调用release(int arg)方法释放同步状态,方法如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 复制代码
在doReleaseShared中,如果头节点的状态为SIGNAL,则通过CAS将头节点的状态设置为0,并且唤醒后续阻塞的线程;接着再通过CAS设置节点的状态为Node.PROPAGATE。
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
关于doReleaseShared的几点分析:
参考:
在之前的例子中,使用Condition和Lock实现了消息的等待和通知,这节介绍Condiition在AQS中的实现。
JDK的Object对象提供了wait/notify的机制,也能实现消息的等待与通知,Condition与之的差别主要体现在以下几点:
在AQS的Condition实现中,和独占锁的争夺类似的是,每创建一个Condtion对象就会对应一个Condtion队列,每一个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中,就像这样:
同样的,在Condition中也会用到之前介绍的同步队列,当等待队列中的节点获得信号通知时,会将等待队列的节点移到同步队列。
以下是await时节点的变化,
以下是signal信号发出时节点的变化,
Condition的整个await/signal流程如下:
1、Condition提供了await()方法将当前线程阻塞,并提供signal()方法支持另外一个线程将已经阻塞的线程唤醒。 2、Condition需要结合Lock使用 3、线程调用await()方法前必须获取锁,调用await()方法时,将线程构造成节点加入等待队列,同时释放锁,并挂起当前线程 4、其他线程调用signal()方法前也必须获取锁,当执行signal()方法时将等待队列的节点移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点
下面结合源代码进行分析:
调用await阻塞当前线程
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //将当前线程封装成Node加入到等待队列尾部 Node node = addConditionWaiter(); //释放锁 int savedState = fullyRelease(node); int interruptMode = 0; //判断当前节点是否已经在同步队列中,如果是则退出循环,如果不是就阻塞当前线程 //其他线程如果发出了signal信号之后,会把等待队列的线程移入同步队列,此时就会退出循环,进入下面的重新获取锁的acquireQueued while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //其他发出signal信号的线程释放锁之后,该线程被唤醒并重新竞争锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //线程加入等待队列尾部 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) {//清除cancell态的节点 unlinkCancelledWaiters(); t = lastWaiter;//t指向最后一个状态正确的节点 } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null)//列表为空,初始化为第一个节点 firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } 复制代码
将等待队列的节点移入同步队列(signalAll只是循环执行signal而已)
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null;//得到firstWaiter } while (!transferForSignal(first) && (first = firstWaiter) != null); } //将节点从等待队列移入同步队列 final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;//cas节点状态错误,说明已经cancell了,直接返回false /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node);//加入同步队列 int ws = p.waitStatus; //设置前置节点状态为signal,可重入锁那篇文章分析过,为了唤醒线程而设置 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);//特殊情况下唤醒线程并重新同步,一般情况下这里不会执行 return true; } 复制代码
参考:
ReentrantReadWriteLock在内部也是利用了AQS进行锁的竞争与释放,同时也实现了ReadWriteLock接口。
为了同时保存读锁和写锁的状态,在内部用一个int保存读和写的状态。读状态从高16位读出,写状态从低16位读出,在保证读写锁互斥的前提下,直接利用了AQS现有的数据结构。
static final int SHARED_SHIFT = 16; //实际是65536 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //最大值 65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 同样是65535 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 获取读的状态 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 获取写锁的获取状态 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 复制代码
写锁为独占式的,因此读锁的获取和释放和AQS原生的实现一致。 读锁是共享式的,获取读锁的状态,并且加1.
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; //写锁被其他线程获取了,直接返回false int r = sharedCount(c); //获取读锁的状态 if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { //尝试获取读锁 if (r == 0) { //说明第一个获取到了读锁 firstReader = current; //标记下当前线程是第一个获取的 firstReaderHoldCount = 1; //重入次数 } else if (firstReader == current) { firstReaderHoldCount++; //次数+1 } else { //cachedHoldCounter 为缓存最后一个获取锁的线程 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //缓存最后一个获取锁的线程 else if (rh.count == 0)// 当前线程获取到了锁,但是重入次数为0,那么把当前线程存入进去 readHolds.set(rh); rh.count++; } return true; } } } 复制代码
读锁的释放:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1)//如果是首次获取读锁,那么第一次获取读锁释放后就为空了 firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { //表示全部释放完毕 readHolds.remove(); //释放完毕,那么久把保存的记录次数remove掉 if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // nextc 是 state 高 16 位减 1 后的值 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) //CAS设置状态 return nextc == 0; //这个判断如果高 16 位减 1 后的值==0,那么就是读状态和写状态都释放了 } } 复制代码
锁降级算是获取读锁的特例,如在A线程已经获取写锁的情况下,再调取读锁加锁函数则可以直接获取读锁,但此时其他线程仍然无法获取读锁或写锁,在A线程释放写锁后,如果有节点等待则会唤醒后续节点,后续节点可见的状态为目前有A线程获取了读锁。
下面的例子里,利用AQS实现了三元共享锁,也就是当前锁只能被三个线程获取。
public class TripleLock implements Lock { //为3表示允许两个线程同时获得锁 private final Sync sync = new Sync(3); private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); } public int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } final ConditionObject newCondition() { return new ConditionObject(); } } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } @Override public Condition newCondition() { return sync.newCondition(); } } 复制代码
测试程序中,主线程每隔一秒钟打印换行,工作线程直接打印当前的线程名,从结果可以看到,每一个时刻只有三个工作线程在同时运行。
public class testTripleLock { public void test() { final Lock lock = new TripleLock(); class Worker extends Thread { public void run() { lock.lock(); try { System.out.println(Thread.currentThread().getName()); SleepTools.second(2); } finally { lock.unlock(); } SleepTools.second(2); } } // 启动10个子线程 for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.setDaemon(true); w.start(); } // 主线程每隔1秒换行 for (int i = 0; i < 10; i++) { SleepTools.second(1); System.out.println(); } } public static void main(String[] args) { testTripleLock testMyLock = new testTripleLock(); testMyLock.test(); } } 复制代码
本文由『后端精进之路』原创,首发于博客 teckee.github.io/ , 转载请注明出处
搜索『后端精进之路』关注公众号,立刻获取最新文章和 价值2000元的BATJ精品面试课程 。