// 任务队列 private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(2000); // 启动5个消费者线程,执行批量任务 public void start() { ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { pool.execute(() -> { try { while (true) { // 获取批量任务 List<Task> tasks = pollTasks(); // 执行批量任务 execTasks(tasks); } } catch (InterruptedException ignored) { } }); } } // 从任务队列中获取批量任务 private List<Task> pollTasks() throws InterruptedException { List<Task> tasks = new LinkedList<>(); // 阻塞式获取一个任务 // 首先采用阻塞式的方式,如果任务队列中没有任务,能够避免无谓的循环 Task task = queue.take(); while (task != null) { tasks.add(task); // 非阻塞式获取一个任务 task = queue.poll(); } return tasks; } // 批量执行任务 private void execTasks(List<Task> tasks) { }
public class Logger { // 批量异步刷新的数量 private static final int FLUSH_BATCH_SIZE = 500; // 任务队列 private final BlockingQueue<LogMsg> queue = new LinkedBlockingQueue<>(); // 只需要一个线程写日志 private ExecutorService pool = Executors.newFixedThreadPool(1); // 启动写日志线程 public void start() throws IOException { File file = File.createTempFile("test", ".log"); FileWriter writer = new FileWriter(file); pool.execute(() -> { // 未刷盘日志数量 int curIdx = 0; long preFlushTime = System.currentTimeMillis(); while (true) { try { LogMsg logMsg = queue.poll(5, TimeUnit.SECONDS); // 写日志 if (logMsg != null) { writer.write(logMsg.toString()); ++curIdx; } // 如果不存在未刷盘数据,则无需刷盘 if (curIdx <= 0) { continue; } // 异步刷盘规则 if (logMsg != null && logMsg.getLevel() == LEVEL.ERROR || curIdx == FLUSH_BATCH_SIZE || System.currentTimeMillis() - preFlushTime > 5_000) { writer.flush(); curIdx = 0; preFlushTime = System.currentTimeMillis(); } } catch (InterruptedException | IOException ignored) { } finally { try { writer.flush(); writer.close(); } catch (IOException ignored) { } } } }); } private void info(@NonNull String msg) throws InterruptedException { queue.put(new LogMsg(LEVEL.INFO, msg)); } private void error(@NonNull String msg) throws InterruptedException { queue.put(new LogMsg(LEVEL.ERROR, msg)); } } @Data @AllArgsConstructor class LogMsg { private LEVEL level; private String msg; } enum LEVEL { INFO, ERROR }
转载请注明出处:http://zhongmingmao.me/2019/05/26/java-concurrent-producer-consumer/
访问原文「 Java并发 -- 生产者-消费者模式 」获取最佳阅读体验并参与讨论