实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。
这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是 BlockingQuque
来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize
, 的那些记录需要主动调用 done()
函数来触发批处理,并且结束队列内的循环线程,从而终止当前应用。
注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。
下面是 BatchQueue 的简单实现
package cc.unmi; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; public class BatchQueue<T> { private final int batchSize; private final Consumer<List<T>> consumer; private AtomicBoolean inUse = new AtomicBoolean(true); private BlockingQueue<T> queue = new LinkedBlockingQueue<>(); public BatchQueue(int batchSize, Consumer<List<T>> consumer) { this.batchSize = batchSize; this.consumer = consumer; startLoop(); } public boolean add(T t) { if(!inUse.get()) { throw new RuntimeException("This queue is aready shutdown"); } return queue.add(t); } public void shutdown() { inUse.set(false); } private void startLoop() { new Thread(() -> { while(inUse.get()) { if(queue.size() >= batchSize) { drainToConsume(); } } drainToConsume(); }).start(); } private void drainToConsume() { List<T> drained = new ArrayList<>(); queue.drainTo(drained, batchSize); consumer.accept(drained); } }
客户端 Client 的使用代码如下:
package cc.unmi; import java.util.Scanner; public class Client { public static void main(String[] args) { BatchQueue<String> batchQueue = new BatchQueue<>(3, System.out::println); while (true) { String line = new Scanner(System.in).nextLine(); if (line.equals("done")) { batchQueue.shutdown(); break; } batchQueue.add(line); } } }
运行效果
调用 shutdown()
方法时把队列中剩下的不足数额的记录也处理掉,并且结果内部循环,才能终止当前应用。队列 shutdown 之后将不可再使用了。
如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。
更实际的应用中,可能不易找到时机去主动调用队列的 shutdown()
方法,可能就需要一个计时器来判断。比如进程一直在执行,没有新的记录进来,队列中有未到 batchSize 的记录,总不能让这个队列中的记录一直等几天吧。这时候就需要一个计时器,即使数量不够,但时间到了照样要处理。计时器需在添加第一条记录时启动,并在每次批处理进行后复位。