AND型信号量可能大家都听说过并可能都有一定的理解,但是你有使用过么?今天就使用Java来模拟实现!
本文是对上篇文章(进程同步机制)的一次实践,通过JUC提供的一些机制来模拟一些OS中的AND型信号量,因为记录型型信号量可以等价于JUC中提供的Semaphore(信号量),但是对于AND型信号量因为一些原因(主要是过时了),JUC没有提供,今天就手动的来写一个AND型信号量对应的Swait操作和Ssignal操作(这里不明白的可以看前面的理论篇)。通过本篇博文让你对进程同步机制有个更好的理解。
在这里,首先解释一下,为了满足线程申请信号量不成功后将进程阻塞,并插入到对应的队列中,所以使用了ReentrantLock+Condition来实现Swait方法。废话不多说,直接上代码:
//数据定义 static Lock lock = new ReentrantLock(); static Condition condition1 = lock.newCondition(); static Condition condition2 = lock.newCondition(); public static void Swait(String id, Semaphore s1, Semaphore s2) throws InterruptedException { lock.tryLock(1, TimeUnit.SECONDS); log.info("当前的两个信号量的状态:【{},{}】", s1.availablePermits(), s2.availablePermits()); //availablePermits可获取到信号量中还剩余的值 if(s1.availablePermits() < 1 || s2.availablePermits() < 1){ if (s1.availablePermits() < 1) { log.info("线程【{}】被挂起到信号量【{}】中", id, s1); //阻塞,并插入到condition1的阻塞队列中 condition1.await(); } else { log.info("线程【{}】被挂起到信号量【{}】中", id, s2); //阻塞,并插入到condition2的阻塞队列中 condition2.await(); } log.info("被挂起的线程【{}】被唤醒执行。", id); } else { log.info("为线程【{}】分配资源!", id); s1.acquire(); s2.acquire(); } lock.unlock(); } public static void Ssignal(Semaphore s1, Semaphore s2) throws InterruptedException { log.info("线程【{}】执行了释放资源", id); lock.tryLock(1, TimeUnit.SECONDS); s1.release(); s2.release(); //唤醒等待队列中的线程 condition.signal(); lock.unlock(); }
大家仔细看上面的代码,这个也是我刚开始写的代码,第一眼看似乎是没什么问题,但是里面隐藏着一个坑,在Swait方法中,调用condition1.await(),此时线程被阻塞在这一行中,但是当被别的线程(调用Ssignal)唤醒时,在被阻塞的下一行开始继续执行,但是在后续的代码里,是没有去申请信号量的,而是直接就Swait成功了,这样在执行Ssignal时就会导致信号量凭空的增加了,也就无法正确的表征系统中的资源数量了。
下面我们就对代码进行优化,大家可以回顾一下AND型信号量,当其因为资源不足时,需要将线程插入到第一个无法满足条件(即Si<1)的信号量对应的等待队列中,并且将程序计数器放置到Swait操作的开始处,所以我们对Swait代码进行修改如下:
public static void Swait(String id, Semaphore s1, Semaphore s2) throws InterruptedException { lock.tryLock(1, TimeUnit.SECONDS); log.info("当前的两个信号量的状态:【{},{}】", s1.availablePermits(), s2.availablePermits()); //如果申请不到,就挂起线程,并将线程插入到condition的队列中 while (s1.availablePermits() < 1 || s2.availablePermits() < 1) { if (s1.availablePermits() < 1) { log.info("线程【{}】被挂起到信号量【{}】中", id, s1); condition1.await(); } else { log.info("线程【{}】被挂起到信号量【{}】中", id, s2); condition2.await(); } log.info("被挂起的线程【{}】被唤醒执行。", id); } log.info("为线程【{}】分配资源!", id); s1.acquire(); s2.acquire(); lock.unlock(); }
在上面的代码中,我们将请求的资源放到一个循环条件中,以满足将程序计数器放置到Swait操作的开始处,在每次被唤醒后都要重新判断资源是否足够,如果足够才跳出循环,否则就再次自我阻塞。
如果你知道了信号量的种类数(系统中的资源类型),其实上面的代码已经可以满足一定的需要了,只需要我们将所有的信号量写入到参数列表中即可。但是对于致力于代码的复用,这里就有些差强人意了,因此我们再次对代码进行改进,代码如下所示:
public static void Swait(String id, Semaphore... list) throws InterruptedException { lock.lock(); //如果资源不足,就挂起线程,并将线程插入到condition的队列中 while (true) { int count=0; //循环判断参数列表中信号量的可用值 for (Semaphore semaphore:list){ if(semaphore.availablePermits()>0){ count++; } } //如果资源都满足,则跳出循环,进行资源分配 if(count == list.length){ break; } log.info("线程【{}】被挂起-----", id); //将当前线程阻塞 condition1.await(); log.info("被挂起的线程【{}】被唤醒执行。", id); } log.info("为线程【{}】分配资源!", id); //分配资源 for (Semaphore semaphore:list){ semaphore.acquire(); } lock.unlock(); } public static void Ssignal(String id, Semaphore... list) throws InterruptedException { log.info("线程【{}】执行了释放资源", id); lock.tryLock(1, TimeUnit.SECONDS); //循环释放信号量 for (Semaphore semaphore:list){ semaphore.release(); } //唤醒等待队列中的线程 condition.signal(); lock.unlock(); }
为此,我们将方法中的信号量列表改为可变的参数列表,这样在传参的时候就可以方便的进行了,但是也会存才一些问题,比如无法约束“借出”与“归还”的信号量的数量是否一致。并且因为信号量的数量不定,所以无法为每个信号量新建一个条件变量(Condition),因此在上面的代码中所有的信号量公用一个条件变量,所有阻塞的线程都插入在其阻塞队列中。
这里我们使用一个经典的进程同步问题来演示我们使用Java模拟的AND型信号量,在这里,我们采用生产者–消费者问题来演示,完整的代码如下:
//用来保证互斥的访问临界区(缓存区) static final Semaphore mutex = new Semaphore(1); //缓冲区,最大容量为50 static List<Integer> buffer = new ArrayList<>(); //缓冲区中还可放入的消息数量 static final Semaphore empty = new Semaphore(50); //缓冲区中的消息数量 static final Semaphore full = new Semaphore(0); //可重入锁和条件变量 static Lock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); //用与辅助的简单的生成消息 static Integer count = 0; //生产者 static class Producer extends Thread { Producer(String name) { super.setName(name); } @Override public void run() { do { try { Swait(this.getName(), mutex, empty); log.info("生产了一条消息:【{}】", count); buffer.add(count++); Thread.sleep(1000); Ssignal(this.getName(), mutex, full); } catch (InterruptedException e) { log.error("生产消息时产生异常!"); } } while (true); } } //消费者 static class Consumer extends Thread { Consumer(String name) { super.setName(name); } @Override public void run() { do { try { Swait(this.getName(), mutex, full); log.info("消费了一条消息:【{}】", buffer.remove(0)); Thread.sleep(1000); Ssignal(this.getName(), mutex, empty); } catch (InterruptedException e) { log.error("消费消息时产生异常!"); } } while (true); } } public static void Swait(String id, Semaphore... list) throws InterruptedException { lock.lock(); //如果资源不足,就挂起线程,并将线程插入到condition的队列中 while (true) { int count=0; for (Semaphore semaphore:list){ if(semaphore.availablePermits()>0){ count++; } } if(count == list.length){ break; } log.info("线程【{}】被挂起", id); condition.await(); log.info("被挂起的线程【{}】被唤醒执行。", id); } log.info("为线程【{}】分配资源!", id); for (Semaphore semaphore:list){ semaphore.acquire(); } lock.unlock(); } public static void Ssignal(String id, Semaphore... list) throws InterruptedException { log.info("线程【{}】执行了释放资源", id); lock.tryLock(1, TimeUnit.SECONDS); for (Semaphore semaphore:list){ semaphore.release(); } //唤醒等待队列中的一个线程 condition.signal(); lock.unlock(); } public static void main(String[] args) { Producer p1 = new Producer("p1"); Consumer c1 = new Consumer("c1"); p1.start(); c1.start(); }
上面代码都是可以直接执行的,如果不需要使用参数列表,可以将上面的Swait方法进行替换即可(记得创建对应的条件变量)。
下图是部分的执行结果:
又到了分隔线以下,本文到此就结束了,本文内容全部都是由博主自己进行整理并结合自身的理解并且进行的代码编写,如果有什么错误,还请批评指正。
本文的所有java代码都已通过测试,对其中有什么疑惑的,可以评论区留言,欢迎你的留言与讨论;另外原创不易,如果本文对你有所帮助,还请留下个赞,以表支持。
希望本文可以帮助你理解加深理解进程同步,也可以帮助你理解Java并发编程.