转载

Java并发工具类-Semaphore解析

Semaphore(信号量)用来限制访问同一资源的线程数量。它通过初始化一个固定数量的配额,当线程要执行时,必须先获取配额才能继续执行,当获取不到时,就需要挂起等待;持有配额的线程执行完后需释放配额,并唤醒等待的线程。

使用方法

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore spd = new Semaphore(3);
        for (int i = 0; i < 9; i++) {
            new Thread(() -> {
                try {
                    // 申请一个配额
                    spd.acquire();
                    System.out.println(Thread.currentThread().getName() + "获取到配额:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                    // 相关业务逻辑
                    Thread.sleep(3000);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    //释放一个配额
                    spd.release();
                }
            }, "线程-" + (i+1)).start();
        }
    }
}
复制代码

执行结果如下:

线程-1获取到配额:2020-03-12 14:48:56
线程-3获取到配额:2020-03-12 14:48:56
线程-2获取到配额:2020-03-12 14:48:56
线程-5获取到配额:2020-03-12 14:48:59
线程-4获取到配额:2020-03-12 14:48:59
线程-6获取到配额:2020-03-12 14:48:59
线程-8获取到配额:2020-03-12 14:49:02
线程-9获取到配额:2020-03-12 14:49:02
线程-7获取到配额:2020-03-12 14:49:02
复制代码

我们默认初始化了3个配额,然后启动9个线程,从执行结果可以看出,由于配额的限制,执行的过程中始终只能有3个线程同时执行。

内部原理

我们通过追踪源码来研究一下Semaphore的内部原理。首先我们先是new了一个Semaphore对象,进入它的构造方法:

public class Semaphore{
	...
    public Semaphore(int permits) {
    	// 调用了内部类NonfairSync 
        sync = new NonfairSync(permits);
    }
    
    static final class NonfairSync extends Sync {
        NonfairSync(int permits) {
        	// 调用了父类Sync的构造方法
            super(permits);
        }
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

	 abstract static class Sync extends AbstractQueuedSynchronizer {
		...
        Sync(int permits) {
        	// 父类AbstractQueuedSynchronizer实现
            setState(permits);
        }

    }
}
复制代码

可以看出,最终是调用了内部类Sync的构造方法,setState方法如下:

public abstract class AbstractQueuedSynchronizer
    protected final void setState(int newState) {
    	// 初始值赋给同步状态state
        state = newState;
    }
}
复制代码

从上面的逻辑可以看出,Semaphore内部是基于AQS( AQS实现原理 )实现的,初始化时只是给AQS的同步状态赋值。

下面我们看下申请配额acquire方法:

// Semaphore类
 public void acquire() throws InterruptedException {
	  // 这边调用AbstractQueuedSynchronizer中方法
      sync.acquireSharedInterruptibly(1);
  }
	
  // AbstractQueuedSynchronizer类
  public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
      if (Thread.interrupted())
          throw new InterruptedException();
      // tryAcquireShared获取共享锁,由子类实现
      if (tryAcquireShared(arg) < 0)
          doAcquireSharedInterruptibly(arg);
  }
复制代码

tryAcquireShared方法由子类实现,我们进入NonfairSync:

static final class NonfairSync extends Sync {
        ...
        protected int tryAcquireShared(int acquires) {			
        	// 获取非公平锁,父类Sync中实现
            return nonfairTryAcquireShared(acquires);
        }
    }
    
	// 父类Sync
   final int nonfairTryAcquireShared(int acquires) {
     for (;;) {
     		// 读取可用的配额
            int available = getState();
            // 可用配额减去申请的数量
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                // 返回剩余的配额数
                return remaining;
        }
    }
复制代码

这边的逻辑是,先查看剩余配额,若不满足申请的配额时,返回一个负值;满足时,则会返回大于等于0的整数。我们接着回到上面申请配额的代码:

// AbstractQueuedSynchronizer类
  public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
      if (Thread.interrupted())
          throw new InterruptedException();
      if (tryAcquireShared(arg) < 0)
      	  // 没申请到配额
          doAcquireSharedInterruptibly(arg);
  }
复制代码

从这边可以看出,如果申请到了配额,线程就继续执行,如果没申请到,就进入doAcquireSharedInterruptibly方法:

// AbstractQueuedSynchronizer类
	private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
   private final boolean parkAndCheckInterrupt() {
   		// 线程挂起
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码

可以看出没申请到配额的线程,会被挂起。 到目前为止,acquire方法就分析完了,最后来我们看下release方法的实现:

// Semaphore类
    public void release() {
    	// 父类AbstractQueuedSynchronizer实现
        sync.releaseShared(1);
    }
    
	// AbstractQueuedSynchronizer类
    public final boolean releaseShared(int arg) {
    	// tryReleaseShared:释放配额,由子类实现
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

	// Sync类
  	protected final boolean tryReleaseShared(int releases) {
      for (;;) {
      		// 获取剩余配额
           int current = getState();
           // 剩余配额加上释放的配额
           int next = current + releases;
           if (next < current) // overflow
               throw new Error("Maximum permit count exceeded");
           // CAS修改配额数
           if (compareAndSetState(current, next))
               return true;
       }
   	}
复制代码

上面方法可以看出,如果成功释放了配额,tryReleaseShared方法返回true,接着会进入doReleaseShared方法:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    //唤醒线程
                    unparkSuccessor(h);
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }
	// 唤醒线程
    private void unparkSuccessor(Node node) {
    	...
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码

这段代码就是唤醒阻塞队列中的线程,从而那些挂起的线程可以重新申请配额进行执行。

原文  https://juejin.im/post/5e69e8d1e51d4526fc74b0c4
正文到此结束
Loading...