生产者-消费者模式的本质是一个多线程同步问题,即生产者生成一定数量的数据放入缓冲区,然后重复此过程;与此同时,消费者也在缓冲区中消耗这些数据。乍一想这个问题似乎很简单,只要按照顺序执行就可以了,但是在多线程环境下,如何按照“顺序”执行就成了一个非常突出的问题。多线程环境下,生产者和消费者线程之间必须保持同步,要保证生成不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据,如果解决办法不够完善,那么很容易出现死锁的情况。
而解决这个问题可以分为两类:
第一种方式效率较高,并且易于实现,代码的可控制性较好,属于常用的模式。
第二种方式由于管道缓冲区不易控制,被传输的对象不易封装等,实用性不是很强,但是也有方法去实现。
保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。
wait() / notify() await() / signal() BlockingQueue
当缓冲区已满时,生产者线程执行,放弃锁,使自己处于等待状态,让其它线程执行;
当缓冲区已空时,消费者停止执行,放弃锁,使自己处于等待状态,让其它线程执行。
当生产者向缓冲区放入一个产品时,向其它等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出一个产品时,向其它等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
代码实现如下:
import java.util.LinkedList; public class Storage { // 仓库容量 private final int MAX_SIZE = 10; // 仓库存储的载体 private LinkedList<Object> list = new LinkedList<>(); //生产 public void produce() { //加锁 synchronized (list) { while (list.size() + 1 > MAX_SIZE) { System.out.println("【生产者" + Thread.currentThread().getName() + "】仓库已满"); try { //挂起当前线程 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(new Object()); System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size()); list.notifyAll(); } } //消费 public void consume() { //加锁 synchronized (list) { while (list.size() == 0) { System.out.println("【消费者" + Thread.currentThread().getName() + "】仓库为空"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size()); list.notifyAll(); } } }
生产者:
public class Producer implements Runnable{ private Storage storage; public Producer(){} public Producer(Storage storage , String name){ this.storage = storage; } @Override public void run(){ while(true){ try{ Thread.sleep(1000); //避免执行太快,睡1s执行一次 storage.produce(); //生产产品 }catch (InterruptedException e){ e.printStackTrace(); } } } }
消费者:
public class Consumer implements Runnable{ private Storage storage; public Consumer(){} public Consumer(Storage storage , String name){ this.storage = storage; } @Override public void run(){ while(true){ try{ Thread.sleep(3000); //避免执行太快,睡3s再消费 storage.consume(); }catch (InterruptedException e){ e.printStackTrace(); } } } }
测试效果:
public class Main { public static void main(String[] args) { Storage storage = new Storage(); //3个生产者线程 Thread p1 = new Thread(new Producer(storage)); Thread p2 = new Thread(new Producer(storage)); Thread p3 = new Thread(new Producer(storage)); //3个消费者线程 Thread c1 = new Thread(new Consumer(storage)); Thread c2 = new Thread(new Consumer(storage)); Thread c3 = new Thread(new Consumer(storage)); p1.start(); p2.start(); p3.start(); c1.start(); c2.start(); c3.start(); } }
运行结果:
【生产者p1】生产一个产品,现库存1 【生产者p2】生产一个产品,现库存2 【生产者p3】生产一个产品,现库存3 【生产者p1】生产一个产品,现库存4 【生产者p2】生产一个产品,现库存5 【生产者p3】生产一个产品,现库存6 【生产者p1】生产一个产品,现库存7 【生产者p2】生产一个产品,现库存8 【消费者c1】消费一个产品,现库存7 【生产者p3】生产一个产品,现库存8 【消费者c2】消费一个产品,现库存7 【消费者c3】消费一个产品,现库存6 【生产者p1】生产一个产品,现库存7 【生产者p2】生产一个产品,现库存8 【生产者p3】生产一个产品,现库存9 【生产者p1】生产一个产品,现库存10 【生产者p2】仓库已满 【生产者p3】仓库已满 【生产者p1】仓库已满 【消费者c1】消费一个产品,现库存9 【生产者p1】生产一个产品,现库存10 【生产者p3】仓库已满 。。。。。。以下省略
一个生产者线程运行 produce
方法,睡眠1s;一个消费者运行一次 consume
方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。
notifyAll()
方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。
在JDK5中,用 ReentrantLock
和 Condition
可以实现等待/通知模型,具有更大的灵活性。通过在 Lock
对象上调用 newCondition()
方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
这种方式跟第一种方式基本一样,只不过一个使用的是基于JVM实现的 synchronized
锁的等待/通知机制,而这种使用的是基于 AQS
的显式 Lock
锁的等待/通知机制,所以只需要改动上面的 Storage.java
类,代码如下:
import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Storage { // 仓库最大存储量 private final int MAX_SIZE = 10; // 仓库存储的载体 private LinkedList<Object> list = new LinkedList<Object>(); // 锁 private final Lock lock = new ReentrantLock(); // 仓库满的条件变量 private final Condition full = lock.newCondition(); // 仓库空的条件变量 private final Condition empty = lock.newCondition(); public void produce() { // 获得锁 lock.lock(); while (list.size() + 1 > MAX_SIZE) { System.out.println("【生产者" + Thread.currentThread().getName() + "】仓库已满"); try { full.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(new Object()); System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size()); // 唤醒其他所有线程、释放锁 full.signalAll(); empty.signalAll(); lock.unlock(); } public void consume() { // 获得锁 lock.lock(); while (list.size() == 0) { System.out.println("【消费者" + Thread.currentThread().getName() + "】仓库为空"); try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size()); // 唤醒其他所有线程、释放锁 full.signalAll(); empty.signalAll(); lock.unlock(); } }
其它代码均与上面一样,运行结果也类似,所以就不再重复
BlockingQueue
是 JDK5.0
的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种 await() / signal()
方法。它可以在生成对象时指定容量大小,用于阻塞操作的是 put()
和 take()
方法。
put()
方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()
方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
代码实现如下:
import java.util.concurrent.LinkedBlockingQueue; public class Storage { // 仓库存储的载体 private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10); public void produce() { try{ //容量最大时自动阻塞 list.put(new Object()); System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size()); } catch (InterruptedException e){ e.printStackTrace(); } } public void consume() { try{ //容量为0时自动阻塞 list.take(); System.out.println("【消费者" + Thread.currentThread().getName() + "】消费了一个产品,现库存" + list.size()); } catch (InterruptedException e){ e.printStackTrace(); } } }
上述运行代码可能会出现 put()
或 take()
和 System.out.println()
输出不匹配的情况,是由于它们之间没有同步造成的。 BlockingQueue
可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。
Semaphore
是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。 Semaphore
可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为 1
的 Semaphore
,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为 0
的 Semaphore
是可以 release
的,然后就可以 acquire
(即一开始使线程阻塞从而完成其他执行)。
代码实现如下:
import java.util.LinkedList; import java.util.concurrent.Semaphore; public class Storage { // 仓库存储的载体 private LinkedList<Object> list = new LinkedList<Object>(); // 仓库的最大容量 final Semaphore notFull = new Semaphore(10); // 将线程挂起,等待其他来触发 final Semaphore notEmpty = new Semaphore(0); // 互斥锁 final Semaphore mutex = new Semaphore(1); public void produce() { try { notFull.acquire(); mutex.acquire(); list.add(new Object()); System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size()); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notEmpty.release(); } } public void consume() { try { notEmpty.acquire(); mutex.acquire(); list.remove(); System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size()); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notFull.release(); } } }
管道一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。
inputStream.connect(outputStream)
或 outputStream.connect(inputStream)
作用是使两个 Stream
之间产生通信链接,这样才可以将数据进行输出与输入。
这种方式只适用于两个线程之间通信,不适合多个线程之间通信。
生产者:
import java.io.IOException; import java.io.PipedOutputStream; public class Producer implements Runnable { private PipedOutputStream pipedOutputStream; public Producer() { pipedOutputStream = new PipedOutputStream(); } public PipedOutputStream getPipedOutputStream() { return pipedOutputStream; } @Override public void run() { try { for (int i = 1; i <= 5; i++) { pipedOutputStream.write(("This is a test, Id=" + i + "!/n").getBytes()); } pipedOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }
消费者:
import java.io.IOException; import java.io.PipedInputStream; public class Consumer implements Runnable { private PipedInputStream pipedInputStream; public Consumer() { pipedInputStream = new PipedInputStream(); } public PipedInputStream getPipedInputStream() { return pipedInputStream; } @Override public void run() { int len = -1; byte[] buffer = new byte[1024]; try { while ((len = pipedInputStream.read(buffer)) != -1) { System.out.println(new String(buffer, 0, len)); } pipedInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }
测试函数:
import java.io.IOException; public class Main { public static void main(String[] args) { Producer p = new Producer(); Consumer c = new Consumer(); Thread t1 = new Thread(p); Thread t2 = new Thread(c); try { p.getPipedOutputStream().connect(c.getPipedInputStream()); t2.start(); t1.start(); } catch (IOException e) { e.printStackTrace(); } } }
生产者:
import java.io.IOException; import java.io.PipedWriter; public class Producer implements Runnable { private PipedWriter pipedWriter; public Producer() { pipedWriter = new PipedWriter(); } public PipedWriter getPipedWriter() { return pipedWriter; } @Override public void run() { try { for (int i = 1; i <= 5; i++) { pipedWriter.write("This is a test, Id=" + i + "!/n"); } pipedWriter.close(); } catch (IOException e) { e.printStackTrace(); } } }
消费者:
import java.io.IOException; import java.io.PipedReader; public class Consumer implements Runnable { private PipedReader pipedReader; public Consumer() { pipedReader = new PipedReader(); } public PipedReader getPipedReader() { return pipedReader; } @Override public void run() { int len = -1; char[] buffer = new char[1024]; try { while ((len = pipedReader.read(buffer)) != -1) { System.out.println(new String(buffer, 0, len)); } pipedReader.close(); } catch (IOException e) { e.printStackTrace(); } } }
测试函数:
import java.io.IOException; public class Main { public static void main(String[] args) { Producer p = new Producer(); Consumer c = new Consumer(); Thread t1 = new Thread(p); Thread t2 = new Thread(c); try { p.getPipedWriter().connect(c.getPipedReader()); t2.start(); t1.start(); } catch (IOException e) { e.printStackTrace(); } } }