转载

用Java实现Stream流处理中的滑窗

简单地说,滑窗算法是一种移动固定大小的窗口(子列表)来遍历数据结构的方法,主要是基于固定步骤的序列流数据。

如果我们想通过使用大小为3的窗口遍历列表[1 2 3 4 5],我们透过窗口只能看到以下数据组:

[1 2 3]

[2 3 4]

[3 4 5]

.如果我们想要使用比集合大小更大的窗口遍历相同的列表,我们甚至不会得到一个元素。

Java 10提供了一种 Stream 实现,支持顺序和并行聚合操作的一系列元素:


int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();

下面谈的是如何在这个流上使用滑窗算法。

为了能够创建自定义Stream,我们需要实现自定义 Spliterator 。

在我们的例子中,我们需要能够迭代Stream <T>序列数据,因此我们需要实现Spliterator接口并指定泛型类型参数:


public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
// ...
}

有一堆方法需要实现:


public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
//下面会实现
@Override
public boolean tryAdvance(Consumer<? super Stream<T>> action) {
return false;
}
//准备下面实现
@Override
public Spliterator<Stream<T>> trySplit() {
return null;
}
@Override
public long estimateSize() {
return 0;
}
//下面准备实现
@Override
public int characteristics() {
return 0;
}
}

我们还需要一些字段来存储缓冲元素、窗口大小参数、源集合的迭代器以及预先计算的大小估计(稍后我们将需要):


private final Queue<T> buffer;
private final Iterator<T> sourceIterator;
private final int windowSize;
private final int size;

在我们开始实现接口方法之前,我们需要能够实例化我们的工具。

在这种情况下,我们将限制构造函数的可见性,并公开一个公共静态工厂方法:


private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
this.buffer = new ArrayDeque<>(windowSize);
this.sourceIterator = Objects.requireNonNull(source).iterator();
this.windowSize = windowSize;
this.size = calculateSize(source, windowSize);
}

公开的静态方法:


static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
return StreamSupport.stream(
new SlidingWindowSpliterator<>(stream, windowSize), false);
}

现在让我们实现Spliterator方法中容易的部分。

实现 trySplit()时,我们默认使用文档中指定的值。幸运的是,计算大小很容易:


private static int calculateSize(Collection<?> source, int windowSize) {
return source.size() < windowSize
? 0
: source.size() - windowSize + 1;
}
@Override
public Spliterator<Stream<T>> trySplit() {
return null;
}
@Override
public long estimateSize() {
return size;
}

在characteristics()中,我们指定:

ORDERED - 因为顺序很重要

NONNULL - 因为元素永远不会为null(尽管可以包含空值)

SIZED -因为大小是可以预见的


@Override
public int characteristics() {
return ORDERED | NONNULL | SIZED;
}

现在实现tryAdvance,这里是关键部分 - 负责实际分组和迭代的方法。

首先,如果窗口小于1,则没有任何内容可以迭代,以便我们可以立即返回:


@Override
public boolean tryAdvance(Consumer<? super Stream<T>> action) {
if (windowSize < 1) {
return false;
}
// ...
}

现在,要生成第一个子列表,我们需要开始迭代并填充缓冲区:


while (sourceIterator.hasNext()) {
buffer.add(sourceIterator.next());
// ...
}

填充缓冲区后,我们可以调度整个组,并从缓冲区中丢弃最旧的元素。

这里有一个关键部分,可能会试图将buffer.stream()传递给accept()方法,这是一个巨大的错误 - Streams惰性地绑定到底层集合,这意味着如果源更改,Stream也会更改。

为了避免这个问题并将我们的组与内部缓冲区表示分离,我们需要在创建每个Stream实例之前对缓冲区的当前状态进行快照。我们将使用数组支持Stream实例,以使它们尽可能轻量级。

由于Java不支持通用数组,我们需要做一些丑陋的转换:


if (buffer.size() == windowSize) {
action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
buffer.poll();
return sourceIterator.hasNext();
}

...瞧,我们准备好使用它:

windowed(List.of(1,2,3,4,5), 3)
.map(group -> group.collect(toList()))
.forEach(System.out::println);

滑窗代码编制成功,运行结果如下:


// result
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
原文  https://www.jdon.com/49945
正文到此结束
Loading...