之前我们已经讲解过关于AQS的独占锁,这一章节主要讲解AQS的共享锁,以 Semaphore
信号量来进行讲解,相信通过看了本章节内容的同学可以对AQS的共享模式有一个了解, Semaphore
信号量提供了用于控制资源同时被访问的个数,也就是它会维护一个许可证,访问资源之前需要申请许可证,申请许可证成功后才可以进行访问,如果申请访问资源获取的了许可证,则可以进行资源访问,同时颁发许可证中心的许可证会进行增加,等到访问资源的线程释放资源后,许可证使用情况会进行减少。
public class SemaphoreDemo { private static final Semaphore semaphore = new Semaphore(3); private static final AtomicInteger atomicInteger = new AtomicInteger(); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "开始执行"); Thread.sleep(100); System.out.println(Thread.currentThread().getName() + "执行完毕"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } 复制代码
运行结果如下:
pool-1-thread-2开始执行 pool-1-thread-3开始执行 pool-1-thread-1开始执行 pool-1-thread-4开始执行 pool-1-thread-6开始执行 pool-1-thread-5开始执行 pool-1-thread-7开始执行 pool-1-thread-9开始执行 pool-1-thread-8开始执行 pool-1-thread-10开始执行 复制代码
通过一个例子来拨开 Semaphore
的面纱,上面我们定义了信号量为3个,也就是同时可以获得锁的线程只有3个,通过调用 semaphore.acquire();
申请信号量,如果申请成功则执行下面的逻辑,在后面通过 semaphore.release();
释放掉信号量的占用,也就是说通过 semaphore.acquice
从信号量中获取一个许可,如果许可通过则执行下面的语句,如果没有许可可发放则等待,然后通过 semaphore.release();
归还一个许可,上面例子中的输出内容也可清晰的看到同事只有两个线程能够访问资源。
借助图示来说明一下:
首先初始化的时候下面图示内容是放置了3个许可证,如下图所示:
左边是线程,中间是要访问的资源文件,最右侧代表的是许可证池,当第一个线程尝试访问资源时,需要先从信号量中获得一个许可,如果信号量中可以获得许可则可以访问资源,如果没有许可可以颁发则表示需要等待,下面来看一下操作:
首先线程1访问资源时,在没有许可的访问时,是行不通的,是需要先进行许可的申请,申请成功后才可以访问资源,此时信号量中的许可会进行减少,看上图中只剩下了两个许可,如果四个线程同时访问时必然会有一个线程处于等待状态,如下图所示:
如上图所示中间两个线程简化了申请的过程,直接拥有许可访问权限,如果申请许可失败了,则会等待其他线程归还许可,发现线程4正在等待许可中,当其他线程调用release
方法后会将许可归还给信号量中,如下图所示:
这是线程4发现已经有可用的许可了,他就会不在等待拿着许可赶紧去访问资源,这里我就不再画图了,和上面一样,大致我们已经了解了关于信号量的内容,接下来我们就要对源码进行分析。
Semaphore`主要方法有以下内容:
方法名 | 描述 |
---|---|
acquire() | 尝试获得一个准入许可,如无法获得,则线程等待,直到有线程释放一个许可或当线程被中断。 |
acquire(int permits) | 尝试获得permits个准入许可,如无法获得,则线程等待,直到有线程释放permits个许可或当线程被中断。 |
acquireUninterruptibly() | 尝试获得一个准入许可,如无法获得,则线程等待,直到有线程释放一个许可,但是不响应中断请求 |
acquireUninterruptibly(int permits) | 尝试获得permits个准入许可,如无法获得,则线程等待,直到有线程释放permits个许可,但是不响应中断请求 |
release() | 用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。 |
release(int permits) | 用于在线程访问资源结束后,释放permits个许可,以使其他等待许可的线程可以进行资源访问。 |
tryAcquire() | 尝试获得一个许可,如果获得许可成功返回true,如果失败则返回fasle,它不会等待,立即返回 |
tryAcquire(int permits) | 尝试获得permits个许可,如果获得许可成功返回true,如果失败则返回fasle,它不会等待,立即返回 |
tryAcquire(int permits, long timeout, TimeUnit unit) | 尝试在指定时间内获得permits个许可,如果在指定时间内没有获得许可则则返回false,反之返回true |
tryAcquire(long timeout, TimeUnit unit) | 尝试在指定时间内获得一个许可,如果在指定时间内没有获得许可则则返回false,反之返回true |
availablePermits(): | 当前可用的许可数 |
通过上面方法的大致介绍, Semaphore
提供了对信号量获取的操作,获取的过程中有等待操作,也有立即返回的方法,有的响应中断有的又不响应中断,下面会以一些简单的例子,进行分析一下源码内容,针对下面的例子来进行分析:
public class SemaphoreDemo { private static final Semaphore semaphore = new Semaphore(1); private static final AtomicInteger atomicInteger = new AtomicInteger(); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 2; i++) { Thread.sleep(100); executorService.execute(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "开始执行"); Thread.sleep(10); System.out.println(Thread.currentThread().getName() + "执行完毕"); // semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } 复制代码
针对上面的例子,我们先初始化了1个信号量,线程池提交了2个任务,这时候大家也想到了运行结果,运行结果就是只有1一个线程能够执行完毕,其余的线程都需要等到操作,因为信号量被消耗了,下面是输出结果:
pool-1-thread-1开始执行 pool-1-thread-1执行完毕 复制代码
我们这里特别在提交到线程池任务的时候睡眠了一会,其实想要达到的目的是能够让每个线程执行按照顺序排下去,不然可能顺序就不定了,当然也没有太大影响,这里只是为了方便分析,当第一个线程提交任务到线程池时,它会先经过 semaphore.acquire()
方法来进行获得一个许可操作,下面我们来看一下源码:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 复制代码
我们可以看到它调用了 sync.acquireSharedInterruptibly(1)
方法,这个 snyc
其实是 Semaphore
内部类 Sync
的实例对象,那么问题来了,这个 sync
变量是什么时候初始化的呢?其实当我们初始化 Semaphore
,就已经将 sync
变量初始化了,接下来我们看一下 Semaphore
构造函数:
// 非公平模式 public Semaphore(int permits) { sync = new NonfairSync(permits); } // fair=true为公平模式,false=非公平模式 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 复制代码
/** * 非公平模式 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } //实现AQS的tryAcquireShared方法,尝试获得锁。 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * 公平模式 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } //实现AQS的tryAcquireShared方法,尝试获得锁。 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } 复制代码
我们可以发现 Semaphore
提供了两种模式的锁机制,一种是公平模式,一种是非公平模式,公平模式其实就是如果发现了有线程在排队等待,则自觉到后面去排队,而非公平模式则不一样,它不管你有没有在排队的线程,谁先抢到是谁的,说到这里我们发现上例子中当声明 Semaphore
时,其实默认使用了非公平模式 NonfairSync
,指定了信号量数量为1个,其实它内部 Sync
中调用了 AQS
的 setState
方法,设置同步器状态 state
为1,详细如下图所示:
接下来我们在回到 acquire
方法中,它调用了 sync.acquireSharedInterruptibly(1);
,细心地朋友会发现 NonfairSync
和父类 Sync
中并没有该方法,其实该方法是 AQS
提供的方法,接下来我们看一下这个方法到底做了什么?源码内容如下所示:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) //如果线程被中断则抛出异常。 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取信号量,如果获得信号量小于0,则代表获取失败则运行下面的语句。 doAcquireSharedInterruptibly(arg); //将当前线程节点加入到队列中,等到其他线程释放信号量。 } 复制代码
doAcquireSharedInterruptibly
接下来我们看一下 tryAcquireShared
方法实现, tryAcquireShared
这个方法是 AQS
提供给子类实现的方法,它自身并没有实现,只是抛出了异常,实现它的类必然是 Semaphore
的 Sync
类,我们发现实现该方法有两个,包括非公平模式的 NonfairSync
,另外一个是公平模式下的 FairSync
,由于我们上例子中采用的是非公平模式,我们看一下非公平模式下的 tryAcquireShared
实现逻辑:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } 复制代码
它( NonfairSync
)内部调用了父类 Sync
的 nonfairTryAcquireShared
方法,继续刨根问底看一下这个方法:
final int nonfairTryAcquireShared(int acquires) { //这里上来就是个死循环 for (;;) { //获取可用的信号量数量。 int available = getState(); //剩余线程数,其实就是当前可用信号量数量-申请的信号量数量 int remaining = available - acquires; //1. 如果剩余信号量数量小于0,代表没有信号量可用 //2. 修改state成功,则代表申请信号量成功。 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 复制代码
如果读过前面 ReentrantLock
源码的朋友会发现,它对 state
是增加的,也就是如果 state
值被设置上了值,则代表已经有线程获得了锁,其他线程不允许获取当前锁,如果是当前线程重新获得锁,则 state
值会增加,这也是重入锁的关键点,而 Semaphore
与之不同点在于,它是对 state
同步器状态进行减少操作,换句话说先初始化若干信号量,如果获得信号量时,剩余信号量小于0,则代表没有可用的信号量,则直接返回,如果获得信号量成功则对 state
值进行修改,回到上面的例子中,我们刚分析道其中第一个线程,第一个线程获得到了信号量,此时剩余信号量为0,它会将 state
值设置为0,设置之后回到了 acquireSharedInterruptibly
的 if (tryAcquireShared(arg) < 0)
语句中,if语句为true,不进入到if语句内部,此时AQS的情况如下图所示:
state
已经变成0了,当它执行到
tryAcquireShared
去获取信号量时,可用信号量为0个,当可用信号量减去申请的信号量个数1时,此时剩余信号量变成了-1,所以这时候if语句的条件
remaining < 0
是满足的,进入到if语句中,返回的是剩余信号量-1,此时会跳转到调用地方,也就是AQS的
acquireSharedInterruptibly
方法中,这时候发现if语句中(
tryAcquireShared(arg) < 0
)返回结果是-1,会进入到if语句内部执行
doAcquireSharedInterruptibly
方法,这个方法主要操作是将当前线程放入到等待队列中,等到其他线程释放信号量,接下来慢慢剖析一下内部源码
AQS->doAcquireSharedInterruptibly
:
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //将节点添加到队列中 boolean failed = true; //失败标志位,如果出现中断,或者异常抛出时会进行取消操作 try { for (;;) { final Node p = node.predecessor(); //获取当前节点的前节点 if (p == head) { //如果当前节点的前节点为头节点 int r = tryAcquireShared(arg); //尝试获得锁 if (r >= 0) { //如果可以获得信号量 setHeadAndPropagate(node, r); //设置当前节点为头节点 p.next = null; // help GC //帮助GC回收 failed = false; //设置失败为false,也就是正常获取 return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码
这时候当我们的程序运行到 addWaiter
,来看一下这个时候队列的情况:
Ref-405
是头节点,然后再将当前线程的节点指向头节点也就是上图中所示内容,因为这里
addWaiter
内部代码和之前分析的AQS的独占锁(也就是
ReentrantLock
源码)的时候已经分析过了,这里就不在赘述了,将线程加入到等待队列中之后,接下来进入到for死循环中去,首先上来获取当前节点的头节点,也就是上图的
Ref-405
,然后判断是不是头节点,这里面的内容其实就是再去尝试争抢一下信号量,看有没有信号量释放,如果返回的信号量剩余个数大于等于0,则代表争抢信号量成功,需要对节点进行处理,但是我们这个例子中,当进行
tryAcquireShared
时,返回的值是-1,所以获取信号量失败,不会进入到下面内容,但是我们在这里先进行分析分析一下这个方法
setHeadAndPropagate
,为后面埋下伏笔:
//从方法中也可以看出来大致意思是设置头节点,并且根据条件是否唤醒后继节点。 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 记录一下原来的head头节点。 setHead(node); // 设置新的节点设置为头节点。 if (propagate > 0 || //如果有信号量 h == null || //头节点为空情况下 h.waitStatus < 0 || //如果原来的头结点的状态是负数,这里指的是SIGNAL和PROPAGATE两个状态 (h = head) == null || // 重新阅读头节点防止额外的竞争。 h.waitStatus < 0) { //如果原来的头结点的状态是负数,这里指的是SIGNAL和PROPAGATE两个状态 Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 复制代码
当他没有进入到setHeadAndPropagate方法,它会走下面的步骤:
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); 复制代码
shouldParkAfterFailedAcquire
将头节点修改为 SIGNAL
, parkAndCheckInterrupt
将线程进行阻塞,运行到这里是线程被挂起,等待其他线程唤醒,此时队列状态如下所示:
当我们调用 semaphore.release()
进行释放信号量时,它其实调用的是 AbstractQueuedSynchronizer
中的 releaseShared(int arg)
方法,我们来看一下源码内容:
public void release() { sync.releaseShared(1); } 复制代码
接下来分析一下AQS中的 releaseShared
方法:
public final boolean releaseShared(int arg) { // 调用Semaphore实现的tryReleaseShared方法。 if (tryReleaseShared(arg)) { // 唤醒后记节点 doReleaseShared(); return true; } return false; } 复制代码
先进尝试释放信号量,如果信号量释放成功,则进行调用doReleaseShared来进行唤醒等待的节点,告知队列中等待的节点已经有信号量了可以进行获取了。
protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取当前的state值 int current = getState(); // 将当前的state值添加releases个信号量 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // cas修改state值 if (compareAndSetState(current, next)) return true; } } 复制代码
其实最主要的方法是方法是 doReleaseShared
方法,我们来看一下源码:
/** * 唤醒队列中的节点,以及修改头结点的waitStatus状态为PROPAGATE * 1. 如果头节点等待状态为SIGNAL,则将头节点状态设为0,并唤醒后继节点 * 2. 如果头节点等待状态为0,则将头节点状态设为PROPAGATE,保证唤醒能够正常传播下去。 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果头结点的状态为SIGNAL则进行唤醒操作。 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 如果头节点状态为0,则将头节点状态修改为PROPAGATE,至于为什么会变成0,为什么要有PROPAGATE?,请看下文。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
这里可能大家会有一个疑问,为什么不是直接propagate > 0,然后就直接唤醒下一个节点呢?这里我要引用一下之前的版本中的一个bug来说明一下:BUG-6801020
根据BUG中的描述影响的版本号是 JDK 6u11,6u17 两个版本,BUG中提及到了复现bug的代码如下所示:
import java.util.concurrent.Semaphore; public class TestSemaphore { private static Semaphore sem = new Semaphore(0); private static class Thread1 extends Thread { @Override public void run() { sem.acquireUninterruptibly(); } } private static class Thread2 extends Thread { @Override public void run() { sem.release(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10000000; i++) { Thread t1 = new Thread1(); Thread t2 = new Thread1(); Thread t3 = new Thread2(); Thread t4 = new Thread2(); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); System.out.println(i); } } } 复制代码
接下来看一下受影响版本号中的 setHeadAndPropagate
和 releaseShared
两个方法源码,如下:
private void setHeadAndPropagate(Node node, int propagate) { setHead(node); // 这里是区别点,他这里直接是比较的信号量如果存在,并且当前节点的等待状态不等于0,才会去唤醒下一个线程。 if (propagate > 0 && node.waitStatus != 0) { /* * Don't bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */ Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
在JDK JDK 6u11,6u17 版本中发现中是没有 PROPAGATE
这种状态的,在后面的版本中引入是为了解决共享模式下并发释放导致的线程hang住问题,上面例子运行一段时间后,偶尔会出现线程hang住的情况。上面例子中初始化了四个线程,信号量初始化时是0个,t1线程和t2线程是获取信号量,t3和t4线程是释放信号量,假设某种情况极端的情况下t1和t2添加到了队列中,如下图所示:
releaseShared
方法,会调用 unparkSuccessor
,这个方法是用来通知等待线程,此时head中的waitStatus由-1变成0,然后唤醒线程t1,此时信号量为1。 doAcquireSharedInterruptibly
方法里面,当线程唤醒的时候也是从这个方法中进行执行,当t1线程尝试获得信号量时,发现可以获得信号量, tryAcquireShared
返回的是0,因为消耗了一个信号量,而此时当前线程没有进行继续往下操作,而是进行了线程切换,此时线程状态如下: releaseShared
,此时头节点的waitStatus=0,直接返回false,并未调用unparkSuccessor,但是此时信号量变成了1。 setHeadAndPropagate
方法的时候,他没有进入到if语句的内部,所以t2线程一直没有被唤醒,导致主线程挂起。 jdk1.8中的 setHeadAndPropagate
并没有直接调用 unparkSuccessor
方法,而是修改调用 doReleaseShared
方法,我们来看一下这个方法跟上面bug中有什么区别:
/** * 1.唤醒队列中的节点 * 2.如果队列头节点waitStatus=0,则将当前head头节点修改为PROPAGATE(-3)状态 */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
这里我们回到上面第三步,此时切换到t4线程,t4调用 releaseShared
,此时头节点的waitStatus=0,直接返回false,并未调用unparkSuccessor,但是此时信号量变成了1,并且将head头节点的waitStatus状态修改为-3。
13.png
回到上面第四步骤:此时t1线程被唤醒,继续执行将head节点指向了 Ref-505
,并且当时的信号量只有1个,他自己消耗了信号量,虽然现在state=1,但是我们可以看线程切换时,信号量的 state=0
,所以线程切换回去之后,它的 propagate=0
,调用 setHeadAndPropagate
方法的时候,此时head头节点的状态是 PROPAGATE(-3)
,会进入到if语句中执行 doReleaseShared
方法,此时唤醒线程t2。
由于文章写的时间比较长,中间伴随着找工作,所以耽搁的时间有点长了,如果有错误的地方请指正,我这边及时做更正。写文章不易,读到这里的朋友都是好样的,由于文章篇幅有点长,在释放信号量地方没有图示来进行表示,但是后面大致讲解了释放的流程所以这里就不赘述了。谢谢