我在8年前去面试程序员的时候,一个不大的公司,里面的开发主管接待了我们,给我的题目就是写一段程序模拟生产者消费者问题,当时可把我难坏了,一下子感觉自己的知识储备竟然如此的匮乏。
而在我从事DBA工作之后,经常会有大批量并发的环境,有的需要排障,有的需要优化,在很多并发的场景中,发现生产者消费者问题可以模拟出很多实际中的问题,所以生产者消费者问题非常重要,也是我想不断改进和探索的一类问题。
引入仓库的必要性要想使用程序来模拟,其实也不用花太多的时间,我们简单说说需要考虑的地方。首先生产者,消费者是两个实体对象,生产者生产物品,消费者消费物品,如果在生产者中定义生产的流程,在消费者中定义消费的流程,两个对象就需要彼此引用,依赖性太高,而且实际上性能也好不到哪里去,所以就需要一个缓冲器,一个中间对象,我们就叫做仓库吧,生产的物品推入仓库,消费的物品从仓库中取出,这样生产者和消费者就能够取消之间的引用,直接通过仓库引用来同步状态,降低耦合。
所以我们的一个初步设想就是生产者-->仓库<--消费者 这样的模式。
生产者消费者的几种类型和实现方式当然生产者消费者问题有两种类型,一种就是使用某种机制来保护生产者和消费者之间的同步,另外一种和Linux中的管道思路相似。相对来说第一种类型的处理方式更为通用,大体分为三类具体的实现方式:
经典的wait(),notify()方法
await(),signal()方法
使用阻塞队列(BlockingQueue),比如LinkedBlockingQueue
为了更快出成果,尽可能快速理解,我也参考了一些资料,下午下班前写了下面的程序。我就简单说明第一种和第二种吧。
因为实体类对象是通用的,我就不再重复列出了,有生产者Producer和消费者Consumer两个类。
生产者类
import com.jeanron.test1.Storage;
public class Producer extends Thread {
// 每次生产的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Producer(Storage storage) {
this.storage = storage;
}
public Producer(Storage storage, int num) {
this.storage = storage;
this.num = num;
}
// 线程run函数
public void run() {
produce(num);
}
// 调用仓库Storage的生产函数
public void produce(int num) {
storage.produce(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
消费者类
import com.jeanron.test1.Storage;
public class Consumer extends Thread
{
// 每次消费的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Consumer(Storage storage)
{
this.storage = storage;
}
// 构造函数,设置仓库
public Consumer(Storage storage,int num)
{
this.storage = storage;
this.num = num;
}
// 线程run函数
public void run()
{
consume(num);
}
// 调用仓库Storage的生产函数
public void consume(int num)
{
storage.consume(num);
}
// get/set方法
public int getNum()
{
return num;
}
public void setNum(int num)
{
this.num = num;
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
} 滴哟中
import java.util.LinkedList;
public class Storage
{
// 仓库最大存储量
private final int MAX_SIZE = 200;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
private static Storage storage=null;
public static Storage getInstance() {
if (storage == null) {
synchronized (Storage.class) {
if (storage == null) {
storage = new Storage();
}
}
}
return storage;
}
// 生产num个产品
public void produce(int num)
{
// 同步代码段
synchronized (list)
{
// 如果仓库剩余容量不足
while (list.size() + num > MAX_SIZE)
{
System.out.println(Thread.currentThread().getName()+"【生产者:要生产的产品数量】:" + num + "/t【库存量】:"
+ list.size() + "/t暂时不能执行生产任务!");
try
{
// 由于条件不满足,生产阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 生产条件满足情况下,生产num个产品
for (int i = 1; i <= num; ++i)
{
list.add(new Object());
}
System.out.println(Thread.currentThread().getName()+"【生产者:已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// 消费num个产品
public void consume(int num)
{
// 同步代码段
synchronized (list)
{
// 如果仓库存储量不足
while (list.size() < num)
{
System.out.println(Thread.currentThread().getName()+"【消费者:要消费的产品数量】:" + num + "/t【库存量】:"
+ list.size() + "/t暂时不能执行消费任务!");
try
{
// 由于条件不满足,消费阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 消费条件满足情况下,消费num个产品
for (int i = 1; i <= num; ++i)
{
list.remove();
}
System.out.println(Thread.currentThread().getName()+"【消费者:已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// get/set方法
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
this.list = list;
}
public int getMAX_SIZE()
{
return MAX_SIZE;
}
}
第二种使用await和signal的方式实现,我们使用Lock来产生两个Condition对象来管理任务间的通信,一个重要的参考点就是仓库满,仓库空,这类方式最后必须有try-finally的子句,保证能够释放锁。
package com.jeanron.test2;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.jeanron.test2.Storage;
public class Storage
{
// 仓库最大存储量
private final int MAX_SIZE = 200;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
private static Storage storage=null;
public static Storage getInstance() {
if (storage == null) {
synchronized (Storage.class) {
if (storage == null) {
storage = new Storage();
}
}
}
return storage;
}
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的条件变量
private final Condition full = lock.newCondition();
// 仓库空的条件变量
private final Condition empty = lock.newCondition();
// 生产num个产品
public void produce(int num)
{
// 获得锁
lock.lock();
// 如果仓库剩余容量不足
while (list.size() + num > MAX_SIZE)
{
System.out.println(Thread.currentThread().getName()+"【要生产的产品数量】:" + num + "/t【库存量】:" + list.size()
+ "/t暂时不能执行生产任务!");
try
{
// 由于条件不满足,生产阻塞
full.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 生产条件满足情况下,生产num个产品
for (int i = 1; i <= num; ++i)
{
list.add(new Object());
}
System.out.println(Thread.currentThread().getName()+"【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());
// 唤醒其他所有线程
full.signalAll();
empty.signalAll();
// 释放锁
lock.unlock();
}
// 消费num个产品
public void consume(int num)
{
// 获得锁
lock.lock();
// 如果仓库存储量不足
while (list.size() < num)
{
System.out.println(Thread.currentThread().getName()+"【要消费的产品数量】:" + num + "/t【库存量】:" + list.size()
+ "/t暂时不能执行生产任务!");
try
{
// 由于条件不满足,消费阻塞
empty.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 消费条件满足情况下,消费num个产品
for (int i = 1; i <= num; ++i)
{
list.remove();
}
System.out.println(Thread.currentThread().getName()+"【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());
// 唤醒其他所有线程
full.signalAll();
empty.signalAll();
// 释放锁
lock.unlock();
}
// set/get方法
public int getMAX_SIZE()
{
return MAX_SIZE;
}
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
测试类算是用用,我们直接使用多线程调度器Executor来做。对于生产者消费者使用随机数的方式来初始化物品数,仓库使用单例模式。
package com.jeanron.main;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.jeanron.entity1.Consumer;
import com.jeanron.entity1.Producer;
import com.jeanron.test1.Storage;
public class Test1
{
public static void main(String[] args)
{
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
exec.execute(new Producer(Storage.getInstance(),new Random().nextInt(10)*10));
}
for(int i=0;i<10;i++){
exec.execute(new Consumer(Storage.getInstance(),new Random().nextInt(10)*10));
}
}
}
生产者是5个线程,消费者是10个线程,我们来看看调用之后的信息,这个可以简单分析下日志即可,不具有典型性和代表性。
第一类实现方式的输出如下:
pool-1-thread-1【生产者:已经生产产品数】:10 【现仓储量为】:10
pool-1-thread-3【生产者:已经生产产品数】:50 【现仓储量为】:60
pool-1-thread-2【生产者:已经生产产品数】:50 【现仓储量为】:110
pool-1-thread-5【生产者:已经生产产品数】:0 【现仓储量为】:110
pool-1-thread-4【生产者:已经生产产品数】:80 【现仓储量为】:190
pool-1-thread-4【消费者:已经消费产品数】:40 【现仓储量为】:150
pool-1-thread-1【消费者:已经消费产品数】:10 【现仓储量为】:140
pool-1-thread-3【消费者:已经消费产品数】:20 【现仓储量为】:120
pool-1-thread-2【消费者:已经消费产品数】:10 【现仓储量为】:110
pool-1-thread-5【消费者:已经消费产品数】:40 【现仓储量为】:70
pool-1-thread-6【消费者:已经消费产品数】:30 【现仓储量为】:40
pool-1-thread-4【消费者:要消费的产品数量】:90 【库存量】:40 暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50 【库存量】:40 暂时不能执行消费任务!
pool-1-thread-3【消费者:要消费的产品数量】:90 【库存量】:40 暂时不能执行消费任务!
pool-1-thread-7【消费者:已经消费产品数】:20 【现仓储量为】:20
pool-1-thread-3【消费者:要消费的产品数量】:90 【库存量】:20 暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50 【库存量】:20 暂时不能执行消费任务!
pool-1-thread-4【消费者:要消费的产品数量】:90 【库存量】:20 暂时不能执行消费任务!
第二类实现方式的输出如下:
pool-1-thread-1【已经生产产品数】:16 【现仓储量为】:16
pool-1-thread-3【已经生产产品数】:60 【现仓储量为】:76
pool-1-thread-2【已经生产产品数】:67 【现仓储量为】:143
pool-1-thread-5【要生产的产品数量】:90 【库存量】:143 暂时不能执行生产任务!
pool-1-thread-4【已经生产产品数】:27 【现仓储量为】:170
pool-1-thread-1【已经消费产品数】:10 【现仓储量为】:160
pool-1-thread-2【已经消费产品数】:23 【现仓储量为】:137
pool-1-thread-6【已经消费产品数】:18 【现仓储量为】:119
pool-1-thread-5【要生产的产品数量】:90 【库存量】:119 暂时不能执行生产任务!
pool-1-thread-3【已经消费产品数】:97 【现仓储量为】:22
pool-1-thread-1【要消费的产品数量】:57 【库存量】:22 暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62 【库存量】:22 暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35 【库存量】:22 暂时不能执行生产任务!
pool-1-thread-4【已经消费产品数】:17 【现仓储量为】:5
pool-1-thread-2【已经消费产品数】:1 【现仓储量为】:4
pool-1-thread-7【要消费的产品数量】:93 【库存量】:4 暂时不能执行生产任务!
pool-1-thread-2【已经消费产品数】:4 【现仓储量为】:0
pool-1-thread-5【已经生产产品数】:90 【现仓储量为】:90
pool-1-thread-1【已经消费产品数】:57 【现仓储量为】:33
pool-1-thread-6【要消费的产品数量】:62 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-8【已经消费产品数】:33 【现仓储量为】:0
pool-1-thread-12【要消费的产品数量】:21 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-9【要消费的产品数量】:24 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-11【要消费的产品数量】:54 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-13【要消费的产品数量】:58 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38 【库存量】:0 暂时不能执行生产任务!