转载

Java实现生产者消费者的两种方式(r12笔记第66天)

   我在8年前去面试程序员的时候,一个不大的公司,里面的开发主管接待了我们,给我的题目就是写一段程序模拟生产者消费者问题,当时可把我难坏了,一下子感觉自己的知识储备竟然如此的匮乏。

   而在我从事DBA工作之后,经常会有大批量并发的环境,有的需要排障,有的需要优化,在很多并发的场景中,发现生产者消费者问题可以模拟出很多实际中的问题,所以生产者消费者问题非常重要,也是我想不断改进和探索的一类问题。

引入仓库的必要性

   要想使用程序来模拟,其实也不用花太多的时间,我们简单说说需要考虑的地方。首先生产者,消费者是两个实体对象,生产者生产物品,消费者消费物品,如果在生产者中定义生产的流程,在消费者中定义消费的流程,两个对象就需要彼此引用,依赖性太高,而且实际上性能也好不到哪里去,所以就需要一个缓冲器,一个中间对象,我们就叫做仓库吧,生产的物品推入仓库,消费的物品从仓库中取出,这样生产者和消费者就能够取消之间的引用,直接通过仓库引用来同步状态,降低耦合。

    所以我们的一个初步设想就是生产者-->仓库<--消费者 这样的模式。

生产者消费者的几种类型和实现方式

    当然生产者消费者问题有两种类型,一种就是使用某种机制来保护生产者和消费者之间的同步,另外一种和Linux中的管道思路相似。相对来说第一种类型的处理方式更为通用,大体分为三类具体的实现方式:

  1. 经典的wait(),notify()方法

  2. await(),signal()方法

  3. 使用阻塞队列(BlockingQueue),比如LinkedBlockingQueue

通用的对象类

为了更快出成果,尽可能快速理解,我也参考了一些资料,下午下班前写了下面的程序。我就简单说明第一种和第二种吧。

  因为实体类对象是通用的,我就不再重复列出了,有生产者Producer和消费者Consumer两个类。

生产者类

import com.jeanron.test1.Storage;

public class Producer extends Thread {
    // 每次生产的产品数量
    private int num;

    // 所在放置的仓库
    private Storage storage;

    // 构造函数,设置仓库
    public Producer(Storage storage) {
        this.storage = storage;
    }

    public Producer(Storage storage, int num) {
        this.storage = storage;
        this.num = num;
    }

    // 线程run函数
    public void run() {
        produce(num);
    }

    // 调用仓库Storage的生产函数
    public void produce(int num) {
        storage.produce(num);
    }

    // get/set方法
    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    public Storage getStorage() {
        return storage;
    }

    public void setStorage(Storage storage) {
        this.storage = storage;
    }
}

消费者类

import com.jeanron.test1.Storage;
 
public class Consumer extends Thread  
{  
    // 每次消费的产品数量  
    private int num;  
 
    // 所在放置的仓库  
    private Storage storage;  
 
    // 构造函数,设置仓库  
    public Consumer(Storage storage)  
    {  
        this.storage = storage;  
    }  
    
    // 构造函数,设置仓库  
    public Consumer(Storage storage,int num)  
    {  
        this.storage = storage;  
        this.num = num;
    }  
 
    // 线程run函数  
    public void run()  
    {  
        consume(num);  
    }  
 
    // 调用仓库Storage的生产函数  
    public void consume(int num)  
    {  
        storage.consume(num);  
    }  
 
    // get/set方法  
    public int getNum()  
    {  
        return num;  
    }  
 
    public void setNum(int num)  
    {  
        this.num = num;  
    }  
 
    public Storage getStorage()  
    {  
        return storage;  
    }  
 
    public void setStorage(Storage storage)  
    {  
        this.storage = storage;  
    }  
}  滴哟中


第一种实现方式

import java.util.LinkedList;

public class Storage  
{  
    // 仓库最大存储量  
    private final int MAX_SIZE = 200;  
 
    // 仓库存储的载体  
    private LinkedList<Object> list = new LinkedList<Object>();  
    private static Storage storage=null;  
    public static Storage getInstance() {  
        if (storage == null) {    
            synchronized (Storage.class) {    
               if (storage == null) {    
                   storage = new Storage();   
               }    
            }    
        }    
        return storage;   
    }   
    
    // 生产num个产品  
    public void produce(int num)  
    {  
        // 同步代码段  
        synchronized (list)  
        {  
            // 如果仓库剩余容量不足  
            while (list.size() + num > MAX_SIZE)  
            {  
                System.out.println(Thread.currentThread().getName()+"【生产者:要生产的产品数量】:" + num + "/t【库存量】:"  
                        + list.size() + "/t暂时不能执行生产任务!");  
                try  
                {  
                    // 由于条件不满足,生产阻塞  
                    list.wait();  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
 
            // 生产条件满足情况下,生产num个产品  
            for (int i = 1; i <= num; ++i)  
            {  
                list.add(new Object());  
            }  
 
            System.out.println(Thread.currentThread().getName()+"【生产者:已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());  
 
            list.notifyAll();  
        }  
    }  
 
    // 消费num个产品  
    public void consume(int num)  
    {  
        // 同步代码段  
        synchronized (list)  
        {  
            // 如果仓库存储量不足  
            while (list.size() < num)  
            {  
                System.out.println(Thread.currentThread().getName()+"【消费者:要消费的产品数量】:" + num + "/t【库存量】:"  
                        + list.size() + "/t暂时不能执行消费任务!");  
                try  
                {  
                    // 由于条件不满足,消费阻塞  
                    list.wait();  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
 
            // 消费条件满足情况下,消费num个产品  
            for (int i = 1; i <= num; ++i)  
            {  
                list.remove();  
            }  
 
            System.out.println(Thread.currentThread().getName()+"【消费者:已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());  
 
            list.notifyAll();  
        }  
    }  
 
    // get/set方法  
    public LinkedList<Object> getList()  
    {  
        return list;  
    }  
 
    public void setList(LinkedList<Object> list)  
    {  
        this.list = list;  
    }  
 
    public int getMAX_SIZE()  
    {  
        return MAX_SIZE;  
    }  
}  





第二种实现方式


第二种使用await和signal的方式实现,我们使用Lock来产生两个Condition对象来管理任务间的通信,一个重要的参考点就是仓库满,仓库空,这类方式最后必须有try-finally的子句,保证能够释放锁。

package com.jeanron.test2;

import java.util.LinkedList;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  

import com.jeanron.test2.Storage;
 
 
public class Storage  
{  
    // 仓库最大存储量  
    private final int MAX_SIZE = 200;  
 
    // 仓库存储的载体  
    private LinkedList<Object> list = new LinkedList<Object>();  
    
    private static Storage storage=null;  
    public static Storage getInstance() {  
        if (storage == null) {    
            synchronized (Storage.class) {    
               if (storage == null) {    
                   storage = new Storage();   
               }    
            }    
        }    
        return storage;   
    }   
    
 
    // 锁  
    private final Lock lock = new ReentrantLock();  
 
    // 仓库满的条件变量  
    private final Condition full = lock.newCondition();  
 
    // 仓库空的条件变量  
    private final Condition empty = lock.newCondition();  
 
    // 生产num个产品  
    public void produce(int num)  
    {  
        // 获得锁  
        lock.lock();  
 
        // 如果仓库剩余容量不足  
        while (list.size() + num > MAX_SIZE)  
        {  
            System.out.println(Thread.currentThread().getName()+"【要生产的产品数量】:" + num + "/t【库存量】:" + list.size()  
                    + "/t暂时不能执行生产任务!");  
            try  
            {  
                // 由于条件不满足,生产阻塞  
                full.await();  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
 
        // 生产条件满足情况下,生产num个产品  
        for (int i = 1; i <= num; ++i)  
        {  
            list.add(new Object());  
        }  
 
        System.out.println(Thread.currentThread().getName()+"【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());  
 
        // 唤醒其他所有线程  
        full.signalAll();  
        empty.signalAll();  
 
        // 释放锁  
        lock.unlock();  
    }  
 
    // 消费num个产品  
    public void consume(int num)  
    {  
        // 获得锁  
        lock.lock();  
 
        // 如果仓库存储量不足  
        while (list.size() < num)  
        {  
            System.out.println(Thread.currentThread().getName()+"【要消费的产品数量】:" + num + "/t【库存量】:" + list.size()  
                    + "/t暂时不能执行生产任务!");  
            try  
            {  
                // 由于条件不满足,消费阻塞  
                empty.await();  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
 
        // 消费条件满足情况下,消费num个产品  
        for (int i = 1; i <= num; ++i)  
        {  
            list.remove();  
        }  
 
        System.out.println(Thread.currentThread().getName()+"【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());  
 
        // 唤醒其他所有线程  
        full.signalAll();  
        empty.signalAll();  
 
        // 释放锁  
        lock.unlock();  
    }  
 
    // set/get方法  
    public int getMAX_SIZE()  
    {  
        return MAX_SIZE;  
    }  
 
    public LinkedList<Object> getList()  
    {  
        return list;  
    }  
 
    public void setList(LinkedList<Object> list)  
    {  


测试类

测试类算是用用,我们直接使用多线程调度器Executor来做。对于生产者消费者使用随机数的方式来初始化物品数,仓库使用单例模式。

package com.jeanron.main;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.jeanron.entity1.Consumer;
import com.jeanron.entity1.Producer;
import com.jeanron.test1.Storage;

 
public class Test1  
{  
    public static void main(String[] args)  
    {  
        
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i=0;i<5;i++){
            exec.execute(new Producer(Storage.getInstance(),new Random().nextInt(10)*10));            
        }
        
        for(int i=0;i<10;i++){
            exec.execute(new Consumer(Storage.getInstance(),new Random().nextInt(10)*10));            
        }
        
    }  


测试结果

生产者是5个线程,消费者是10个线程,我们来看看调用之后的信息,这个可以简单分析下日志即可,不具有典型性和代表性。

第一类实现方式的输出如下:

pool-1-thread-1【生产者:已经生产产品数】:10    【现仓储量为】:10
pool-1-thread-3【生产者:已经生产产品数】:50    【现仓储量为】:60
pool-1-thread-2【生产者:已经生产产品数】:50    【现仓储量为】:110
pool-1-thread-5【生产者:已经生产产品数】:0    【现仓储量为】:110
pool-1-thread-4【生产者:已经生产产品数】:80    【现仓储量为】:190
pool-1-thread-4【消费者:已经消费产品数】:40    【现仓储量为】:150
pool-1-thread-1【消费者:已经消费产品数】:10    【现仓储量为】:140
pool-1-thread-3【消费者:已经消费产品数】:20    【现仓储量为】:120
pool-1-thread-2【消费者:已经消费产品数】:10    【现仓储量为】:110
pool-1-thread-5【消费者:已经消费产品数】:40    【现仓储量为】:70
pool-1-thread-6【消费者:已经消费产品数】:30    【现仓储量为】:40
pool-1-thread-4【消费者:要消费的产品数量】:90    【库存量】:40    暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50    【库存量】:40    暂时不能执行消费任务!
pool-1-thread-3【消费者:要消费的产品数量】:90    【库存量】:40    暂时不能执行消费任务!
pool-1-thread-7【消费者:已经消费产品数】:20    【现仓储量为】:20
pool-1-thread-3【消费者:要消费的产品数量】:90    【库存量】:20    暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50    【库存量】:20    暂时不能执行消费任务!
pool-1-thread-4【消费者:要消费的产品数量】:90    【库存量】:20    暂时不能执行消费任务!

第二类实现方式的输出如下:

pool-1-thread-1【已经生产产品数】:16    【现仓储量为】:16
pool-1-thread-3【已经生产产品数】:60    【现仓储量为】:76
pool-1-thread-2【已经生产产品数】:67    【现仓储量为】:143
pool-1-thread-5【要生产的产品数量】:90    【库存量】:143    暂时不能执行生产任务!
pool-1-thread-4【已经生产产品数】:27    【现仓储量为】:170
pool-1-thread-1【已经消费产品数】:10    【现仓储量为】:160
pool-1-thread-2【已经消费产品数】:23    【现仓储量为】:137
pool-1-thread-6【已经消费产品数】:18    【现仓储量为】:119
pool-1-thread-5【要生产的产品数量】:90    【库存量】:119    暂时不能执行生产任务!
pool-1-thread-3【已经消费产品数】:97    【现仓储量为】:22
pool-1-thread-1【要消费的产品数量】:57    【库存量】:22    暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62    【库存量】:22    暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35    【库存量】:22    暂时不能执行生产任务!
pool-1-thread-4【已经消费产品数】:17    【现仓储量为】:5
pool-1-thread-2【已经消费产品数】:1    【现仓储量为】:4
pool-1-thread-7【要消费的产品数量】:93    【库存量】:4    暂时不能执行生产任务!
pool-1-thread-2【已经消费产品数】:4    【现仓储量为】:0
pool-1-thread-5【已经生产产品数】:90    【现仓储量为】:90
pool-1-thread-1【已经消费产品数】:57    【现仓储量为】:33
pool-1-thread-6【要消费的产品数量】:62    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-8【已经消费产品数】:33    【现仓储量为】:0
pool-1-thread-12【要消费的产品数量】:21    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-9【要消费的产品数量】:24    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-11【要消费的产品数量】:54    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-13【要消费的产品数量】:58    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38    【库存量】:0    暂时不能执行生产任务!


Java实现生产者消费者的两种方式(r12笔记第66天)

正文到此结束
Loading...