生产者和消费者模式应用于异步处理场景,异步处理的好处是生产者和消费者解耦,不互相依赖,生产者不需要等待消费者处理完,就可以持续生产消费内容,效率大大提高。
生产者和消费者代码类结构如下:
1.BlockedQueue是一个阻塞的有界队列,用于存、取消费内容。
2.Producer是生产者,在这里是一个抽象类,子类需要实现generateTask方法。
3.Consumer是消费者,在这里是一个抽象类,子类需要实现exec方法。
4.这里的Producer和Consumer只是一个抽象后的代码模板,逻辑比较简单,落地时可根据实际需要编写合适的模板。
import java.util.Vector; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName BlockedQueue * @Description 阻塞任务队列,添加任务时如果已经达到容量上限,则会阻塞等待 * @Author 铿然一叶 * @Date 2019/10/5 11:32 * @Version 1.0 * javashizhan.com **/ public class BlockedQueue<T>{ //锁 private final Lock lock = new ReentrantLock(); // 条件变量:队列不满 private final Condition notFull = lock.newCondition(); // 条件变量:队列不空 private final Condition notEmpty = lock.newCondition(); //任务集合 private Vector<T> taskQueue = new Vector<T>(); //队列容量 private final int capacity; /** * 构造器 * @param capacity 队列容量 */ public BlockedQueue(int capacity) { this.capacity = capacity; } /** * 入队操作 * @param t */ public void enq(T t) { lock.lock(); try { System.out.println("size: " + taskQueue.size() + " capacity: " + capacity); while (taskQueue.size() == this.capacity) { // 队列满了之后等待,等待队列不满 notFull.await(); } System.out.println(Thread.currentThread().getName() + " add task: " + t.toString()); taskQueue.add(t); // 入队后, 通知队列不空了,可以出队 notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 出队操作 * @return */ public T deq(){ lock.lock(); try { try { while (taskQueue.size() == 0) { // 队列为空时等待,等待队列不空 notEmpty.await(); } } catch (InterruptedException e) { e.printStackTrace(); } T t = taskQueue.remove(0); // 出队后,通知队列不满,可以继续入队 notFull.signal(); return t; }finally { lock.unlock(); } } } 复制代码
/** * @ClassName Producer * @Description 生产者,这个类比较简单,使用继承也省不了多少代码,可继承,也可以自行实现。 * @Author 铿然一叶 * @Date 2019/10/5 11:19 * @Version 1.0 * javashizhan.com **/ public abstract class Producer<T> implements Runnable { private BlockedQueue<T> taskQueue; public Producer(BlockedQueue<T> taskQueue) { this.taskQueue = taskQueue; } public void run() { while(true) { T[] tasks = generateTask(); if (null != tasks && tasks.length > 0) { for(T task: tasks) { if (null != task) { this.taskQueue.enq(task); } } } } } /** * 生成任务,使用了“模板方法”设计模式,子类只要实现此方法则可。 * @return */ public abstract T[] generateTask(); } 复制代码
/** * @ClassName Consumer * @Description 消费者,这个类比较简单,使用继承也省不了多少代码,可继承,也可以自行实现。 * @Author 铿然一叶 * @Date 2019/10/5 11:10 * @Version 1.0 * javashizhan.com **/ public abstract class Consumer<T> implements Runnable { private BlockedQueue<T> taskQueue; public Consumer(BlockedQueue<T> taskQueue) { this.taskQueue = taskQueue; } public void run() { while(true) { T task = taskQueue.deq(); exec(task); } } /** * 执行任务,使用了“模板方法”设计模式,子类只要实现此方法则可 * @param task */ public abstract void exec(T task); } 复制代码
import java.util.Vector; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName BlockedQueue * @Description TODO * @Author 铿然一叶 * @Date 2019/10/5 9:13 * @Version 1.0 * javashizhan.com **/ public class LockTest { public static void main(String[] args) { BlockedQueue<String> taskQueue = new BlockedQueue<String>(10); for (int i = 0; i < 3; i++) { String producerName = "Producder-" + i; Thread producer = new Thread(new Producer<String>(taskQueue) { @Override public String[] generateTask() { String[] tasks = new String[20]; for (int i = 0; i < tasks.length; i++) { long timestamp = System.currentTimeMillis(); tasks[i] = "Task_" + timestamp + "_" + i; } return tasks; } }, producerName); producer.start(); } for (int i = 0; i < 5; i++) { String consumerName = "Consumer-" + i; Thread consumer = new Thread(new Consumer<String>(taskQueue) { @Override public void exec(String task) { System.out.println(Thread.currentThread().getName() + " do task [" + task + "]"); //休眠一会,模拟任务执行耗时 sleep(2000); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }, consumerName); consumer.start(); } } } 复制代码
输出日志:
size: 0 capacity: 10 Producder-1 add task: Task_1570250409102_0 size: 1 capacity: 10 Producder-1 add task: Task_1570250409103_1 size: 2 capacity: 10 Producder-1 add task: Task_1570250409103_2 size: 3 capacity: 10 Producder-1 add task: Task_1570250409103_3 size: 4 capacity: 10 Producder-1 add task: Task_1570250409103_4 size: 5 capacity: 10 Producder-1 add task: Task_1570250409103_5 size: 6 capacity: 10 Producder-1 add task: Task_1570250409103_6 size: 7 capacity: 10 Producder-1 add task: Task_1570250409103_7 size: 8 capacity: 10 Producder-1 add task: Task_1570250409103_8 size: 9 capacity: 10 Producder-1 add task: Task_1570250409103_9 size: 10 capacity: 10 size: 10 capacity: 10 size: 10 capacity: 10 Consumer-0 do task [Task_1570250409102_0] Consumer-4 do task [Task_1570250409103_1] Consumer-3 do task [Task_1570250409103_2] Producder-1 add task: Task_1570250409103_10 Consumer-1 do task [Task_1570250409103_3] Producder-0 add task: Task_1570250409102_0 size: 8 capacity: 10 Producder-0 add task: Task_1570250409103_1 size: 9 capacity: 10 Producder-0 add task: Task_1570250409103_2 size: 10 capacity: 10 size: 10 capacity: 10 Consumer-2 do task [Task_1570250409103_4] Producder-0 add task: Task_1570250409103_3 size: 10 capacity: 10 Consumer-3 do task [Task_1570250409103_6] Producder-2 add task: Task_1570250409103_0 Consumer-1 do task [Task_1570250409103_5] size: 9 capacity: 10 Producder-2 add task: Task_1570250409103_1 size: 10 capacity: 10 Consumer-4 do task [Task_1570250409103_7] Consumer-0 do task [Task_1570250409103_8] Producder-1 add task: Task_1570250409103_11 size: 9 capacity: 10 Producder-1 add task: Task_1570250409103_12 size: 10 capacity: 10 Consumer-2 do task [Task_1570250409103_9] Producder-1 add task: Task_1570250409103_13 size: 10 capacity: 10 复制代码
1.这里用到了Lock来加锁,Lock相比synchronized关键字加锁更灵活一些,如果有特殊需要,方便改造。
2.synchronized实现生产者和消费者模式的例子可参考“ Java并发编程入门(七)轻松理解wait和notify以及使用场景 ”,那个代码还不够通用,你可以修改得通用一些。
3.就当前这个例子而言,使用Lock加锁和“ Java并发编程入门(七)轻松理解wait和notify以及使用场景 ”中使用synchronized加锁没有多大区别,这里仅仅是为了体会下Lock的使用方法。
4.使用有界阻塞队列时需要注意生产者生产任务过程是否可控,如果是第三方不可控调用,当生产任务速度远远大于消费者处理任务速度时,可能由于阻塞导致长时间挂起,要么挂起时间过长,导致等待线程太多,要么超时失败。这时就不适合使用阻塞方式,应该在队列满时抛出异常以通知调用方不要再等待。
end.
相关阅读:
Java并发编程(一)知识地图
Java并发编程(二)原子性
Java并发编程(三)可见性
Java并发编程(四)有序性
Java并发编程(五)创建线程方式概览
Java并发编程入门(六)synchronized用法
Java并发编程入门(七)轻松理解wait和notify以及使用场景
Java并发编程入门(八)线程生命周期
Java并发编程入门(九)死锁和死锁定位
Java并发编程入门(十)锁优化
Java并发编程入门(十一)限流场景和Spring限流器实现站点: javashizhan.com/
微信公众号: