Master-Worker模式是常用的并行设计模式。核心思想是,系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理子任务。任务处理过程中,Master还负责监督任务进展和Worker的健康状态;Master将接收Client提交的任务,并将任务的进展汇总反馈给Client
如下图所示:
Worker:用于实际处理一个任务;
Task:具体任务实体
Master:任务的分配和最终结果的合成;
Main:启动程序,调度开启Master
/** * @author mouliu * @create 2018-04-07-下午10:03 */ public class Task { private int id; private int price ; public int getId() { return id; } public void setId(int id) { this.id = id; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } }
/** * @author mouliu * @create 2018-04-09-上午9:22 */ public class Main { public static void main(String[] args){ Master master = new Master(new Worker(),20); Random r = new Random(); for (int i=1;i<=100;i++){ Task t = new Task(); t.setId(i); t.setPrice(r.nextInt(1000)); master.submit(t); } master.execute(); long start = System.currentTimeMillis(); while (true){ if (master.isComplete()){ long end = System.currentTimeMillis()-start; int priceResult = master.getResult(); System.out.println("最终结果:" + priceResult + ", 执行时间:" + end); break; } } } }
/** * @author mouliu * @create 2018-04-07-下午10:07 */ public class Worker implements Runnable{ private ConcurrentLinkedQueue<Task>workQueue; private ConcurrentHashMap<String,Object>resultMap; public void setWorkQueue(ConcurrentLinkedQueue<Task>workQueue){ this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String,Object> resultMap){ this.resultMap = resultMap; } @Override public void run() { while (true){ Task input = this.workQueue.poll(); if (input==null){ break; } Object output = handle(input); this.resultMap.put(Integer.toString(input.getId()),output); } } private Object handle(Task input){ Object output = null; try { Thread.sleep(500); output = input.getPrice(); } catch (InterruptedException e) { e.printStackTrace(); } return output; } }
/** * @author mouliu * @create 2018-04-07-下午10:02 */ public class Master { //1、有一个盛放任务的容器 private ConcurrentLinkedQueue<Task>workQueue = new ConcurrentLinkedQueue<>(); //2、需要有一个盛放worker的集合 private HashMap<String,Thread> workers = new HashMap<>(); //3、需要有一个盛放每一个worker执行任务的结果集合 private ConcurrentHashMap<String,Object> resultMap= new ConcurrentHashMap<>(); //4、构造方法(创建给定数量的线程) Master(Worker worker,int workerCount){ worker.setWorkQueue(this.workQueue); worker.setResultMap(this.resultMap); for (int i=0;i<workerCount;i++){ this.workers.put(Integer.toString(i),new Thread(worker)); } } //5、需要一个提交任务的方法 public void submit(Task task){ this.workQueue.add(task); } //6、需要有一个执行的方法,启动所有的worker方法去执行任务 public void execute(){ for (Map.Entry<String,Thread>me:workers.entrySet()){ me.getValue().start(); } } //7、判断是否运行结束的方法 public boolean isComplete(){ for (Map.Entry<String,Thread>me:workers.entrySet()){ if (me.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true; } //8、计算结果方法 public int getResult(){ int priceResult = 0; for (Map.Entry<String,Object>me:resultMap.entrySet()){ priceResult+=(Integer) me.getValue(); } return priceResult; } }
Master-Worker模式可以将大任务划分为小任务的场景,是一种分流处理的设计理念。通过多线程或者多进程多机器的模式,可以将小任务处理分发给更多的CPU处理可以提高任务的完成速度,提高系统的性能。