Semaphore是一种同步辅助工具,翻译过来就是信号量,用来实现流量控制,它可以控制同一时间内对资源的访问次数.
无论是Synchroniezd还是ReentrantLock,一次都只允许一个线程访问一个资源,但是Semaphore可以指定多个线程同时访问某一个资源.
Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。
信号量上定义两种操作:
信号量主要用于两个目的:
以下的例子:5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其他线程释放信号量后,才能抢到车位.
public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire();//申请资源 System.out.println(Thread.currentThread().getName()+"抢到车位"); ThreadUtil.sleep(RandomUtil.randomInt(1000,5000)); System.out.println(Thread.currentThread().getName()+"归还车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { //释放资源 semaphore.release(); } } },"线程"+i).start(); } } 复制代码
abstract static class Sync extends AbstractQueuedSynchronizer { //省略 } 复制代码
Semaphore内部使用Sync类,Sync又是继承AbstractQueuedSynchronizer,所以Sync底层还是使用AQS实现的.Sync有两个实现类NonfairSync和FairSync,用来指定获取信号量时是否采用公平策略.
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } Sync(int permits) { setState(permits); } 复制代码
如上所示,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。
参数permits被传递给AQS的state值,用来表示当前持有的信号量个数.
当前线程调用该方法的目的是希望获取一个信号量资源。
如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等0,则当前线程会被放入AQS的阻塞队列。当其他线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。
//Semaphore方法 public void acquire() throws InterruptedException { //传递参数为1,说明要获取1个信号量资源 sync.acquireSharedInterruptibly(1); } //AQS的方法 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //(1)如果线程被中断,则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //(2)否则调用Sync子类方法尝试获取,这里根据构造函数确定使用公平策略 if (tryAcquireShared(arg) < 0) //如果获取失败则放入阻塞队列.然后再次尝试,如果使用则调用park方法挂起当前线程 doAcquireSharedInterruptibly(arg); } 复制代码
由如上代码可知,acquire()在内部调用了Sync的acquireSharedlnterruptibly方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。尝试获取信号量资源的AQS的方法 tryAcquireShared是由Sync的子类实现的,所以这里分别从两 方面来讨论。
先讨论非公平策略NonfairSync类的tryAcquireShared方法,代码如下:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取当前信号量值 int available = getState(); //计算当前剩余值 int remaining = available - acquires; //如果当前剩余值小于0或则CAS设置成功则返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 复制代码
如上代码先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining),如果剩余值小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于NonFairSync是非公平获取的,也就是说先调用aquire方法获取信号量的线程不一定比后来者先获取到信号量。
考虑下面场景,如果线程A先调用了aquire()方法获取信号量,但是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列 。过一段时间后线程C调用了release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程A就会被激活,然后获取该信号量,但是假如线程C释放信号量后,线程C调用了aquire方法,那么线程C就会和线程A去竞争这个信号量资源 。 如果采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A被激活前,或者激活后先于线程 A获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。
下面看公平性的FairSync类是如何保证公平性的。
protected int tryAcquireShared(int acquires) { for (;;) { //查询是否当前线程节点的前驱节点也在等待获取该资源,有的话直接返回 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 复制代码
可见公平性还是靠hasQueuedPredecessors这个函数来保证的。所以Semaphore的公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS阻塞队列,否则就去获取。
该方法与acquire()方法不同,后者只需要获取一个信号量值, 而前者则获取permits个。
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } 复制代码
该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly获取资源时(包含被阻塞后),其他线程调用了当前线程的interrupt() 方法设置了当前线程的中断标志,此时当前线程并不会抛出IllegalArgumentException异常而返回。
public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } 复制代码
该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用aquire方法被阻塞而被放入了AQS的阻塞 队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活, 激活的线程会尝试获取刚增加的信号量.
public void release() { //(1)arg=1 sync.releaseShared(1); } public final boolean releaseShared(int arg) { //(2)尝试释放资源 if (tryReleaseShared(arg)) { //(3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { //获取当前信号量值 int current = getState(); //将当前信号量值增加releases,这里为增加1 int next = current + releases; //移除处理 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //使用CAS保证更新信号量值的原子性 if (compareAndSetState(current, next)) return true; } } 复制代码
由代码release()->sync.releaseShared(1),可知,release方法每次只会对信号量值增加1,tryReleaseShared方法是无限循环,使用CAS保证了release方法对信号量递增1的原子性操作.tryReleaseShared方法增加信号量值成功后会执行代码(3),即调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。
该方法与不带参数的release方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加 permits,而后者每次增加l。
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } 复制代码
另外可以看到,这里的sync.releaseShared是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会被阻塞。