生产者和消费者问题是线程模型中老生常谈的问题,也是面试中经常遇到的问题。光在Java中的实现方式多达数十种,更不用说加上其他语言的实现方式了。那么我们该如何学习呢?
本文会通过精讲wait()和notify()方法实现生产者-消费者模型,来学习生产者和消费者问题的原理。
目的是当你理解了最简单实现原理,再看其他的实现,无非使用了更高级的机制(例如锁、信号量、管道等等)来照猫画虎的实现这个原理,万变不离其宗,它们的原理都是一样的。
本文也会列出一部分其他的实现方式代码。千万不要尝试去背诵所有实现代码,只有掌握了实现原理才能遇到问题的时候游刃有余。
生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
现实生活中的例子:12306抢购火车票、淘宝购买商品、仓库管理等。
public class Test1 { private static Integer count = 0; //代表生产的商品数量 private static final Integer FULL = 10; //代表商品最多多少个(也就是缓冲区大小) private static final Object LOCK = new Object(); //锁对象 ----分析1 public static void main(String[] args) { for (int i = 0; i < 5; i++) { //创造一堆生产者和消费者模拟真实环境 new Thread(new Producer()).start(); } for (int i = 0; i < 5; i++) { new Thread(new Consumer()).start(); } } static class Producer implements Runnable { //代表生产者 @Override public void run() { } } static class Consumer implements Runnable { //代表消费者 @Override public void run() { } } }
分析1.在main函数中创建了5个消费者线程任务和5个生产者线程任务,当这10个线程同时运行时,需要保证生产者和消费者所公用的缓冲区是同步被改变的,就是说不同线程访问缓冲区的数据不能发生错乱。这里就是用一个锁来保证缓冲区每次只有一个线程访问
接下来看下生产者和消费者的实现:
static class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { //一次多生产几个商品 try { Thread.sleep(3000); //模拟真实环境,让生产的慢一点,间隔3秒 } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { //线程同步 while (count.equals(FULL)) { //当缓冲区满了 try { LOCK.wait(); //让线程等待 ----分析1 } catch (Exception e) { e.printStackTrace(); } } count++; //缓冲区不满时继续生产商品,商品加一 System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); LOCK.notifyAll(); //唤醒等待的消费者 } } } } static class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (LOCK) { while (count == 0) { //当没有商品时,需要等待生产者生产商品 try { LOCK.wait(); //----分析 2 } catch (Exception e) { } } count--; //商品被消耗,商品减一 System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); LOCK.notifyAll(); //商品被消耗后,通知等待的生产者 } } } }
分析:
1.当缓冲区满了的时候,需要阻止生产者继续生产商品
2.当缓冲区为空,没有商品时,需要阻止消费者继续消费商品
相信代码分析和详细的注释,你已经能很好的理解这个生产者-消费者模型的原理了。接下来贴出其他的几种实现代码。
使用锁实现:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Test1 { private static Integer count = 0; private static final Integer FULL = 10; //创建一个锁对象 private Lock lock = new ReentrantLock(); //创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args) { Test1 test1 = new Test1(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } //获取锁 lock.lock(); try { while (count == FULL) { try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); //唤醒消费者 notEmpty.signal(); } finally { //释放锁 lock.unlock(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } lock.lock(); try { while (count == 0) { try { notEmpty.await(); } catch (Exception e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); notFull.signal(); } finally { lock.unlock(); } } } } }
使用阻塞队列:
当队列满了或空了的时候进行入队列操作都会被阻塞。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Test1 { private static Integer count = 0; //创建一个阻塞队列 final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { Test1 test1 = new Test1(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); new Thread(test1.new Producer()).start(); new Thread(test1.new Consumer()).start(); } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { blockingQueue.take(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } }