既然要解决生产者消费者问题,那第一步自然就是想到PV操作,但是发现Java并没有semaphore这一套系统,虽然麻烦了点,但是还是试图自己去实现看看。
于是我首先像模像样定义了一个semaphore。
public class Semaphore { private int s; public Semaphore(int s) { this.s = s; } public int getS() { return s; } public void setS(int s) { this.s = s; } }
接下来实现一个PV系统。
importjava.util.Queue; public class PVSystem { private Launcherlauncher; public PVSystem(Launcherlauncher) { this.launcher = launcher; } public synchronizedvoid p(Semaphoresem) { sem.setS(sem.getS()-1); if (sem.getS() < 0) { try { launcher.threadQueue.offer(Thread.currentThread()); Thread.currentThread().wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronizedvoid v(Semaphoresem) { System.out.println(Thread.currentThread().getName()); sem.setS(sem.getS() + 1); if (sem.getS() <= 0) { launcher.threadQueue.poll().notify(); } } }
生产者和消费者线程直接调用PV操作
public class Producer extends Thread{ private Launcherlauncher; public Producer(Launcherlauncher) { this.launcher = launcher; } @Override public void run() { PVSystempv = new PVSystem(launcher); while (true) { pv.p(Launcher.empty); pv.p(Launcher.mutex); System.out.println(this.getName() + ":" + launcher.size + " -> " + (++launcher.size)); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } pv.v(Launcher.mutex); pv.v(Launcher.full); } } }
消费者线程
public class Consumer extends Thread { private Launcherlauncher; public Consumer(Launcherlauncher) { this.launcher = launcher; } @Override public void run() { PVSystempv = new PVSystem(launcher); while (true) { pv.p(Launcher.full); pv.p(Launcher.mutex); System.out.println(this.getName() + ":" + launcher.size + "->" + (--launcher.size)); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } pv.v(Launcher.mutex); pv.v(Launcher.empty); } } }
然后在main函数里面启动,并且设置一个线程队列。
importjava.util.LinkedList; importjava.util.Queue; public class Launcher { public int size = 0; public static Semaphorefull = new Semaphore(0); public static Semaphoreempty = new Semaphore(10); public static Semaphoremutex = new Semaphore(1); public Queue<Thread> threadQueue = new LinkedList<>(); public static void main(String[] args) { Launcherlauncher = new Launcher(); for (int i = 0; i < 1; i++) { Producerproducer = new Producer(launcher); producer.setName("Producer " + i); producer.start(); Consumerconsumer = new Consumer(launcher); consumer.setName("Consumer " + i); consumer.start(); } } }
本以为大功告成,没想到马上就gg了。那么问题出在哪呢?再次查文档发现,问题出来我对synchronized关键字以及wait和notify函数的理解上。
至此明白了,作为面向对象的Java语言,所有的一切都是面向对象的,也就是说,是封装了底层的互斥实现过程,直接对对象进行操作,而不让你自己操作线程来实现互斥。下面的这种方式才是正确的用法,用同步关键字修饰的obj自带互斥锁。如果是修饰方法,相当于synchronized(this)。
synchronized (obj) { while (condition) { obj.wait(); } // do something obj.notify(); }
也就是说,调用obj.notify()之后,唤醒线程,但是不释放monitor,被唤醒的线程可能因为拿不到锁又阻塞了。不过当synchronized修饰的代码块执行完毕后,monitor会自动释放。
搞明白以后,于是有了下面的修改。