转载

一个简单的 Java 自动批处理队列

实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。

这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是 BlockingQuque 来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize , 的那些记录需要主动调用 done() 函数来触发批处理,并且结束队列内的循环线程,从而终止当前应用。

注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。

下面是 BatchQueue 的简单实现

package cc.unmi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class BatchQueue<T> {
    private final int batchSize;
    private final Consumer<List<T>> consumer;

    private AtomicBoolean inUse = new AtomicBoolean(true);
    private BlockingQueue<T> queue = new LinkedBlockingQueue<>();

    public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
        this.batchSize = batchSize;
        this.consumer = consumer;

        startLoop();
    }

    public boolean add(T t) {
        if(!inUse.get()) {
            throw new RuntimeException("This queue is aready shutdown");
        }
        return queue.add(t);
    }

    public void shutdown() {
        inUse.set(false);
    }

    private void startLoop() {
        new Thread(() -> {
            while(inUse.get()) {
                if(queue.size() >= batchSize) {
                    drainToConsume();
                }
            }
            drainToConsume();
        }).start();
    }

    private void drainToConsume() {
        List<T> drained = new ArrayList<>();
        queue.drainTo(drained, batchSize);
        consumer.accept(drained);
    }
}

客户端 Client 的使用代码如下:

package cc.unmi;

import java.util.Scanner;

public class Client {
    public static void main(String[] args) {
        BatchQueue<String> batchQueue = new BatchQueue<>(3, System.out::println);
        while (true) {
            String line = new Scanner(System.in).nextLine();
            if (line.equals("done")) {
                batchQueue.shutdown();
                break;
            }
            batchQueue.add(line);
        }
    }
}

运行效果

一个简单的 Java 自动批处理队列

调用 shutdown() 方法时把队列中剩下的不足数额的记录也处理掉,并且结果内部循环,才能终止当前应用。队列 shutdown 之后将不可再使用了。

如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。

更实际的应用中,可能不易找到时机去主动调用队列的 shutdown() 方法,可能就需要一个计时器来判断。比如进程一直在执行,没有新的记录进来,队列中有未到 batchSize 的记录,总不能让这个队列中的记录一直等几天吧。这时候就需要一个计时器,即使数量不够,但时间到了照样要处理。计时器需在添加第一条记录时启动,并在每次批处理进行后复位。

原文  http://unmi.cc/simple-java-auto-batch-queue/
正文到此结束
Loading...