简单地说,滑窗算法是一种移动固定大小的窗口(子列表)来遍历数据结构的方法,主要是基于固定步骤的序列流数据。
如果我们想通过使用大小为3的窗口遍历列表[1 2 3 4 5],我们透过窗口只能看到以下数据组:
[1 2 3]
[2 3 4]
[3 4 5]
.如果我们想要使用比集合大小更大的窗口遍历相同的列表,我们甚至不会得到一个元素。
Java 10提供了一种 Stream 实现,支持顺序和并行聚合操作的一系列元素:
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
下面谈的是如何在这个流上使用滑窗算法。
为了能够创建自定义Stream,我们需要实现自定义 Spliterator 。
在我们的例子中,我们需要能够迭代Stream <T>序列数据,因此我们需要实现Spliterator接口并指定泛型类型参数:
public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
// ...
}
有一堆方法需要实现:
public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
//下面会实现
@Override
public boolean tryAdvance(Consumer<? super Stream<T>> action) {
return false;
}
//准备下面实现
@Override
public Spliterator<Stream<T>> trySplit() {
return null;
}
@Override
public long estimateSize() {
return 0;
}
//下面准备实现
@Override
public int characteristics() {
return 0;
}
}
我们还需要一些字段来存储缓冲元素、窗口大小参数、源集合的迭代器以及预先计算的大小估计(稍后我们将需要):
private final Queue<T> buffer;
private final Iterator<T> sourceIterator;
private final int windowSize;
private final int size;
在我们开始实现接口方法之前,我们需要能够实例化我们的工具。
在这种情况下,我们将限制构造函数的可见性,并公开一个公共静态工厂方法:
private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
this.buffer = new ArrayDeque<>(windowSize);
this.sourceIterator = Objects.requireNonNull(source).iterator();
this.windowSize = windowSize;
this.size = calculateSize(source, windowSize);
}
公开的静态方法:
static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
return StreamSupport.stream(
new SlidingWindowSpliterator<>(stream, windowSize), false);
}
现在让我们实现Spliterator方法中容易的部分。
实现 trySplit()时,我们默认使用文档中指定的值。幸运的是,计算大小很容易:
private static int calculateSize(Collection<?> source, int windowSize) {
return source.size() < windowSize
? 0
: source.size() - windowSize + 1;
}
@Override
public Spliterator<Stream<T>> trySplit() {
return null;
}
@Override
public long estimateSize() {
return size;
}
在characteristics()中,我们指定:
ORDERED - 因为顺序很重要
NONNULL - 因为元素永远不会为null(尽管可以包含空值)
SIZED -因为大小是可以预见的
@Override
public int characteristics() {
return ORDERED | NONNULL | SIZED;
}
现在实现tryAdvance,这里是关键部分 - 负责实际分组和迭代的方法。
首先,如果窗口小于1,则没有任何内容可以迭代,以便我们可以立即返回:
@Override
public boolean tryAdvance(Consumer<? super Stream<T>> action) {
if (windowSize < 1) {
return false;
}
// ...
}
现在,要生成第一个子列表,我们需要开始迭代并填充缓冲区:
while (sourceIterator.hasNext()) {
buffer.add(sourceIterator.next());
// ...
}
填充缓冲区后,我们可以调度整个组,并从缓冲区中丢弃最旧的元素。
这里有一个关键部分,可能会试图将buffer.stream()传递给accept()方法,这是一个巨大的错误 - Streams惰性地绑定到底层集合,这意味着如果源更改,Stream也会更改。
为了避免这个问题并将我们的组与内部缓冲区表示分离,我们需要在创建每个Stream实例之前对缓冲区的当前状态进行快照。我们将使用数组支持Stream实例,以使它们尽可能轻量级。
由于Java不支持通用数组,我们需要做一些丑陋的转换:
if (buffer.size() == windowSize) {
action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
buffer.poll();
return sourceIterator.hasNext();
}
...瞧,我们准备好使用它:
windowed(List.of(1,2,3,4,5), 3) .map(group -> group.collect(toList())) .forEach(System.out::println);
滑窗代码编制成功,运行结果如下:
// result
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]