一 概述 二 源码总览 三 acquire-请求令牌 四 release-释放令牌 五 总结 复制代码
semaphore是信号的意思,在并发包中则表示持有指定数量令牌的信号量。它通常用于多线程同时请求令牌的控制。提供了acquire方法用于获取令牌,当令牌发放完后则进行阻塞等待,持有令牌的线程完成任务后需要调用release方法归还令牌。semaphore的使用很简单,现在通过学习它的源码来了解它的实现原理是怎样的。
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; // 所有机制都通过AbstractQueuedSynchronizer子类Sync来完成的 private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { // 采用AQS中的state来统计令牌数 setState(permits); } …… } // 非公平锁 static final class NonfairSync extends Sync {……} // 公平锁 static final class FairSync extends Sync{……} // 默认构造方法-默认非公平锁 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 提供选择公平锁还是非公平锁的构造方法 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } // 请求一个令牌-响应中断+阻塞 public void acquire() throws InterruptedException {……} // 指定获取领牌数-响应中断且阻塞 public void acquire(int permits) throws InterruptedException {……} // 请求一个令牌-不响应中断+阻塞 public void acquireUninterruptibly() {……} // 尝试请求一个令牌-非阻塞,失败立即返回 public boolean tryAcquire(){……} // 尝试请求一个令牌,阻塞指定时间,超时后返回 public boolean tryAcquire(long timeout, TimeUnit unit){……} // 释放令牌 public void release(){……} } 复制代码
总体来说,Semaphore内部有一个继承于AQS的内部类Sync,利用AQS的共享锁来实现对共享变量state进行操作,并将state作为令牌的计数, 并提供了公平和非公平锁的方式来获取令牌,整体的设计跟ReentrantLock很像。下面以最常用的acquire和release方法为例,详细了解他们的原理;
先看acquire的方法体:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 复制代码
里面实际调用的是AQS的请求共享锁方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 先进行一遍尝试获取锁,当返回小于0说明令牌不足了,则需要将当前线程加入到等待队列中 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 复制代码
接着先回调在Semaphore中重写的tryAcquireShared()方法尝试获取锁:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { // 循环获取令牌 for (;;) { // 获取当前可用的令牌数 int available = getState(); // 当前获取完后剩下的令牌数 int remaining = available - acquires; // 剩下领牌数小于0或者大于等于0并CAS更新成功则直接返回剩余令牌数 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 复制代码
简单尝试获取令牌失败后则再CAS尝试几次后加入同步队列中休眠等待:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 在同步队列中增加等待节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 如果前驱节点为head节点,表示当前节点是同步等待队列中的第一个,故继续尝试一次获取锁 if (p == head) { // 尝试获取令牌,此时会跳转到semaphore中(因为重写了该方法) int r = tryAcquireShared(arg); // 返回大于0则表示成功获取到令牌了 if (r >= 0) { // 将当前节点设为head节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 自旋几次后为避免强占CPU,则对该线程进行休眠处理 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // 因中断请求则取消排队请求 if (failed) cancelAcquire(node); } } 复制代码
简单总结下acquire方法流程为:
release方法体:
public void release() { sync.releaseShared(1); } 复制代码
实际调用的是AQS的方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 复制代码
先调用在Semaphore中重写的尝试释放令牌方法,并且释放成功后返回true:
protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取当前令牌数量 int current = getState(); // 计算释放后的令牌数量 int next = current + releases; // 如果本次释放的令牌为负数则抛出异常 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // CAS更新令牌数成功后返回true if (compareAndSetState(current, next)) return true; } } 复制代码
释放令牌成功后,唤醒在同步队列中等待的线程:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
简单总结下release流程:
Semaphore利用AQS中的共享锁来操作共享变量state,并使用state作为令牌的计数。每个线程调用required请求获取令牌,调用release则释放领牌。当令牌取完时则剩下的线程加入AQS队列中阻塞等待,当有令牌释放时会唤醒等待的线程。