转载

死磕java concurrent包系列(六)基于AQS解析信号量Semaphore

之前分析AQS的时候,内部有两种模式,独占模式和共享模式,前面的ReentrantLock都是使用独占模式,而Semaphore同样作为一个基于AQS实现的并发组件,它是基于共享模式实现的,我们先看看它的使用场景

Semaphore共享锁的基本使用

假设有20个人去银行柜面办理业务,银行只有3个柜面,同时只能办理三个人,如果基于这种有限的、我们需要控制资源的情况,使用Semaphore比较方便:

public class SemaphoreTest {
  //排队总人数
  private static final int COUNT =20;
  //只有三个柜台
  private static final Semaphore AVALIABLECOUNT = new Semaphore(3);

  public static void main(String[] args) {
    //创建一个线程池
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(COUNT);
    BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder().namingPattern("线程池");
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(COUNT, COUNT, 30L, TimeUnit.SECONDS, workQueue,
        builder.build());
    for (int i = 0; i < COUNT; i++) {
      final int count = i;
      //排队的人都需要被服务,所以所有的人直接提交线程池处理
      threadPoolExecutor.execute(() -> {
        try {
          //使用acquire获取共享锁
          AVALIABLECOUNT.acquire();
          System.out.println(Thread.currentThread().getName());
          System.out.println("服务号"+count+"正在服务");
          Thread.sleep(1000);
        }catch (Exception e){
          System.out.println(e.getMessage());
        }
        finally {
          //获取完了之后释放资源
          AVALIABLECOUNT.release();
        }
      });
    }
    threadPoolExecutor.shutdown();
  }
}
复制代码

输出如下:我们执行代码,可以发现每隔1秒几乎同一时间出现3条线程访,如下图

死磕java concurrent包系列(六)基于AQS解析信号量Semaphore

Semaphore内部原理解析

Semaphore的内部结构

在深入分析Semaphore的内部原理前先看看一张类图结构

死磕java concurrent包系列(六)基于AQS解析信号量Semaphore

这个结构和ReentrantLock基本上完全一致,Semaphore内部同样存在继承自AQS的内部类Sync以及继承自Sync的公平锁(FairSync)和非公平锁(NofairSync),从这点也足以说明Semaphore的内部实现原理也是基于AQS并发组件的。 在之前的文章中,我们提到过,AQS是基础组件,只负责核心并发操作,如加入或维护同步队列,控制同步状态等,而具体的加锁和解锁操作交由子类完成,因此子类Semaphore共享锁的获取与释放需要自己实现,这两个方法分别是获取锁的tryAcquireShared(int arg)方法和释放锁的tryReleaseShared(int arg)方法,这点从Semaphore的内部结构完全可以看出来。 我们在调用Semaphore的方法时,其内部则是通过间接调用其内部类或AQS执行的。下面我们就从Semaphore的源码入手分析共享锁实现原理,这里先从非公平锁入手。

非公平锁的共享锁

同样的,我们先看看构造方法:

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
复制代码

我们通过默认构造函数创建时,诞生的就是非公平锁,接下来我们看一下构造方法的入参permits的传递:

static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
          super(permits);
    }
   //调用父类Sync的nonfairTryAcquireShared
   protected int tryAcquireShared(int acquires) {
       return nonfairTryAcquireShared(acquires);
   }
}

复制代码

在Sync中:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        //直接将该值设置为AQS中的state的值
        Sync(int permits) {
            setState(permits);
        }
复制代码

所以Semaphore的入参permit直接传入设置到AQS中的state中。 接下来我们看看acquire()方法,我们先通俗的解释一下它的执行过程: 当一个线程请求到来时,state值代表的许可数,那么请求线程将会获得同步状态即对共享资源的访问权,并更新state的值(一般是对state值减1),但如果请求线程过多,state值代表的许可数已减为0,则请求线程将无法获取同步状态,线程将被加入到同步队列并阻塞,直到其他线程释放同步状态(一般是对state值加1)才可能获取对共享资源的访问权。 调用Semaphore的acquire()方法后将会调用到AQS的acquireSharedInterruptibly():

//Semaphore的acquire()
    public void acquire() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //判断是否被中断
        if (Thread.interrupted())
            throw new InterruptedException();
        //如果tryAcquireShared(arg)不小于0,则说明当前还有permit可被使用
        if (tryAcquireShared(arg) < 0)
            //如果许可被用完了,没有剩余许可 则加入同步队列等待
            doAcquireSharedInterruptibly(arg);
    }
复制代码

在acquireSharedInterruptibly()方法内部先进行了线程中断的判断,那么先尝试调用tryAcquireShared(arg)方法获取同步状态,如果此时许可获取成功,那么方法执行结束,如果获取失败,则说明没有剩余许可了,那么调用doAcquireSharedInterruptibly(arg);方法加入同步队列等待。 这里的tryAcquireShared(arg)是个模板方法设计模式,AQS内部没有提供具体实现,由子类实现,也就是有Semaphore内部自己实现,该方法在Semaphore内部非公平锁的实现如下

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                //remaining < 0说明许可已经供不应求了,这个时候进来的线程需要被阻塞
                //否则CAS操作更新avaliable的值,它表示剩余的许可数
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
复制代码

nonfairTryAcquireShared(int acquires)方法内部,先获取state的值,并执行减法操作,得到remaining值,它可以理解为剩余的许可数,如果remaining<0,说明请求的许可数过大,此时直接返回一个负数的remaining;如果remaining大于0,说明还有剩余的许可数,则可以访问共享资源,后续将被加入同步队列(通过doAcquireSharedInterruptibly(arg))。 注意Semaphore的acquire()可能存在并发操作,因此nonfairTryAcquireShared()方法体内部采用死循环+无锁(CAS)并发的操作保证对state值修改的安全性。 例如:假设permit值为5,有多个线程并发accquire获取许可,线程1运行时得到的remainin是5-1=4,线程2运行时,得到的remaining同样是5-1=4,但是执行compareAndSetState时,线程2 更快一点,执行CAS操作:判断state现在是否为5,如果为5,则CAS更新为4. 这个时候线程1也执行CAS操作,判断state现在是否为5,发现不为5,所以CAS失败,这时候需要这个死循环去重试。

如果remaining大于0,说明还有剩余的许可数,则可以访问共享资源,后续将被加入同步队列,接下来看入队的操作,这一部分与ReentrantLock差不多:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //使用SHARED类型创建共享模式的Node
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取前序节点
                final Node p = node.predecessor();
                //如果前序节点是头节点,说明自己的Node在队列最前端,此时可能共享资源随时被释放
                //所以需要再次尝试获取共享资源
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //如果获取共享资源成功
                    if (r >= 0) {
                        //已经获取资源后,node已经没有意义,所以清理head节点并传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //如果不是头节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

在方法中,由于当前线程没有获取同步状态,因此创建一个共享模式类型(Node.SHARED)的结点并通过addWaiter(Node.SHARED)加入同步队列,加入完成后,当前线程进入自旋状态,首先判断前驱结点是否为head,如果是,那么尝试获取同步状态并返回r值,如果r大于0,则说明获取同步状态成功,将当前线程设置为head并传播,传播指的是,通知后续结点继续获取同步状态,到此return结束,获取到同步状态的线程将会执行原定的任务。

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);//设置为头结点
        /* 
         * 尝试去唤醒队列中的下一个节点,如果满足如下条件: 
         * 还有剩余许可(propagate > 0), 
         * 或者h.waitStatus为PROPAGATE(被上一个操作设置) 
         * 并且 
         *   下一个节点处于共享模式或者为null。 
         * 
         * 这两项检查中的保守主义可能会导致不必要的唤醒,但只有在有
         * 有在多个线程争取获得/释放同步状态时才会发生,所以大多
         * 数情况下会立马获得需要的信号
         */  
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            //唤醒后继节点,因为是共享模式,所以允许多个线程同时获取同步状态
                doReleaseShared();
        }
    }

复制代码

但如果前驱结点不为head或前驱结点为head并尝试获取同步状态失败(与),那么调用shouldParkAfterFailedAcquire(p, node)方法判断前驱结点的waitStatus值是否为SIGNAL并调整同步队列中的node结点状态,如果返回true,那么执行parkAndCheckInterrupt()方法,将当前线程挂起。 shouldParkAfterFailedAcquire方法与ReentrantLock中的如出一辙:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前结点的等待状态
        int ws = pred.waitStatus;
        //如果为等待唤醒(SIGNAL)状态则返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果ws>0 则说明是结束状态,
        //遍历前驱结点直到找到没有结束状态的结点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果ws小于0又不是SIGNAL状态,说明是node是首次加入的线程
            //则将其前驱节点设置为SIGNAL状态。下次执行shouldParkAfterFailedAcquire方法时就
            //满足ws == Node.SIGNAL条件了
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

复制代码

这个方法是AQS中的,如果不懂的话,可以参考之前在ReentrantLock中也分析过: juejin.im/post/5c021b… 中自旋的部分。 到此,加入同步队列的整个过程完成。

总结

在AQS中存在一个volatile变量state,当我们创建Semaphore对象传入许可数值时,最终会赋值给state,state的数值代表可同时操作共享数据的线程数量,每当一个线程请求(如调用Semaphored的acquire()方法)获取同步状态成功,state的值将会减少1,直到state为0时,表示已没有可用的许可数,也就是对共享数据进行操作的线程数已达到最大值,其他后来线程将被阻塞,此时AQS内部会将线程封装成共享模式的Node结点,加入同步队列中等待并开启自旋操作。只有当持有对共享数据访问权限的线程执行完成任务并释放同步状态后,同步队列中的对于的结点线程才有可能获取同步状态并被唤醒执行同步操作,注意在同步队列中获取到同步状态的结点将被设置成head并清空相关线程数据(毕竟线程已在执行也就没有必要保存信息了),AQS通过这种方式便实现共享锁,用图表示如下:

死磕java concurrent包系列(六)基于AQS解析信号量Semaphore

##非公平锁的释放锁 接下来看一下释放锁:

public void release() {
       sync.releaseShared(1);
}

//调用到AQS中的releaseShared(int arg) 
public final boolean releaseShared(int arg) {
       //调用子类Semaphore实现的tryReleaseShared方法尝试释放同步状态
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
  }
复制代码

显然Semaphore间接调用了AQS中的releaseShared(int arg)方法,通过tryReleaseShared(arg)方法尝试释放同步状态,如果释放成功,那么将调用doReleaseShared()唤醒同步队列中后继结点的线程,tryReleaseShared(int releases)方法如下:

//在Semaphore的内部类Sync中实现的
protected final boolean tryReleaseShared(int releases) {
       for (;;) {
              //获取当前state
             int current = getState();
             //释放状态state增加releases
             int next = current + releases;
             if (next < current) // overflow
                 throw new Error("Maximum permit count exceeded");
              //通过CAS更新state的值
             if (compareAndSetState(current, next))
                 return true;
         }
        }
复制代码

逻辑很简单,释放同步状态,更新state的值,同样的,通过for死循环和CAS操作来保证线程安全问题,因为可能存在多个线程同时释放同步状态的场景。释放成功后通过doReleaseShared()方法唤醒后继结点。

private void doReleaseShared() {
    /* 
     * 如果头节点的后继节点需要唤醒,那么执行唤醒  
     * 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证   
     * 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必  
     * 需做循环,所以,和其他unparkSuccessor方法使用方式不一样  
     * 的是,如果(头结点)等待状态设置失败,重新检测。 
     */  
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            // 获取头节点对应的线程的状态
            int ws = h.waitStatus;
            // 如果头节点对应的线程是SIGNAL状态,则意味着头
            //结点的后继结点所对应的线程需要被unpark唤醒。
            if (ws == Node.SIGNAL) {
                // 修改头结点对应的线程状态设置为0。失败的话,则继续循环。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒头结点h的后继结点所对应的线程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果头结点发生变化,则继续循环。否则,退出循环。
        if (h == head)                   // loop if head changed
            break;
    }
}


//唤醒传入结点的后继结点对应的线程
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
       //拿到后继结点
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          //唤醒该线程
          LockSupport.unpark(s.thread);
    }

复制代码

显然doReleaseShared()方法中通过调用unparkSuccessor(h)方法唤醒head的后继结点对应的线程。这个方法在之前获取资源时也会被调用:

if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }

复制代码

两种情况下都是为唤醒后继节点,因为是共享模式,所以允许多个线程同时获取同步状态。释放操作的过程还是相对简单些的,即尝试更新state值,更新成功调用doReleaseShared()方法唤醒后继结点对应的线程。

公平锁的共享锁

公平锁的中的共享模式实现除了在获取同步状态时与非公平锁不同外,其他基本一样:

static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //这里是重点,先判断队列中是否有结点再执行
                //同步状态获取。
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

相比之下,对于非公平锁:
    final int nonfairTryAcquireShared(int acquires) {
         //使用死循环
         for (;;) {
             //每当有线程获取共享资源时,就直接尝试CAS操作
             int available = getState();
             int remaining = available - acquires;
             //判断信号量是否已小于0或者CAS执行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }

复制代码

从代码中可以看出,与非公平锁tryAcquireShared(int acquires)方法实现的唯一不同是,在尝试获取同步状态前,先调用了hasQueuedPredecessors()方法判断同步队列中是否存在结点,如果存在则返回-1,即将线程加入同步队列等待,后续通过Node结构保证唤醒的顺序。从而保证先到来的线程请求一定会先执行,也就是所谓的公平锁。其他操作,与前面分析的非公平锁一样。

总结

AQS作为核心并发组件,它通过state值来控制对共享资源访问的线程数,内部的Node有独占模式(EXCLUSIVE)和共享模式(SHARED):

  • 对于ReenTrantLock:state默认为0,每次加锁后state更新为1,更新为1之后如果还有线程尝试获取锁,则加入同步队列等待;每当线程释放锁时,再更新为0并唤醒队列中的线程
  • 对于Semaphore:State默认为许可数,每当线程请求同步状态成功,state值将会减1,如果超过限制数量的线程将被封装共享模式的Node结点加入同步队列封装成独占模式(EXCLUSIVE)等待,直到其他执行线程释放同步状态,才有机会获得执行权,而每个线程执行完成任务释放同步状态后,state值将会增加1,这就是共享锁的基本实现模型。

AQS是采用模板方法的设计模式构建的,它作为基础组件,封装的是核心并发操作,但是实现上分为两种模式,即共享模式(如Semaphore)与独占模式(如ReetrantLock,这两个模式的本质区别在于多个线程能不能共享一把锁),而这两种模式的加锁与解锁实现方式是不一样的,但AQS只关注内部公共方法实现并不关心外部不同模式的实现,所以提供了模板方法给子类使用:也就是说实现独占锁,如ReentrantLock需要自己实现tryAcquire()方法和tryRelease()方法,而实现共享模式的Semaphore,则需要实现tryAcquireShared()方法和tryReleaseShared()方法,这样做的好处是显而易见的,无论是共享模式还是独占模式,其基础的实现都是同一套组件(AQS),只不过是加锁解锁的逻辑不同罢了,更重要的是如果我们需要自定义锁的话,也变得非常简单,只需要选择不同的模式实现不同的加锁和解锁的模板方法即可。 不管是ReentrantLock还是Semaphore,公平锁与非公平锁的不同之处在于公平锁会在线程请求同步状态前,判断同步队列是否存在Node,如果存在就将请求线程封装成Node结点加入同步队列,从而保证每个线程获取同步状态都是先到先得的顺序执行的。非公平锁则是通过竞争的方式获取,不管同步队列是否存在Node结点,只有通过竞争获取就可以获取线程执行权。

原文  https://juejin.im/post/5c11fc82e51d45242973eded
正文到此结束
Loading...