1. 概述
Java 8 引入了 Stream API,此后它已成为 Java 开发中的核心工具。它简单易用、易于理解和维护,同时支持顺序和并行处理。
然而,Stream API 的中间操作数量固定且灵活性不足。为克服这一限制,Java 24 引入了 Gatherer 接口,为中间操作提供了更强的灵活性。
本文将深入探讨 Gatherers 的概念及其使用方法。
2. Stream Gatherer
Stream Gatherers 的核心目标是支持自定义中间操作,使流水线更灵活且表达力更强。它们支持异步和增量处理,同时允许自定义数据分组或累积。
它们能实现 m-to-n 元素转换、基于历史元素的决策转换、并行执行,以及将无限流转换为有限流。
接下来我们分析构成 Gatherer 的核心函数。
2.1 Gatherer 核心函数
Gatherer 包含四个定义元素收集和转换方式的函数:
- ***initializer()***:可选函数,用于在流处理期间存储状态。为转换提供初始状态。
- ***integrator()***:整合新流元素(可选地结合处理状态),并选择性地向下游发送元素。可根据条件提前终止处理,是转换逻辑的核心控制器。
- ***combiner()***:可选函数,启用 Gatherer 的并行处理能力。合并两个状态以实现并行元素处理。若未定义或输入流未标记为并行,则 Gatherer 将顺序执行。
- ***finisher()***:可选函数,在流中无更多元素可消费时调用。适用于缓冲或滑动窗口等有状态操作。
接下来查看内置 Gatherers 的示例。
2.2 fold()
fold() 以有序方式合并多个元素生成最终结果。相比 reduce() 的优势在于:结果仍可继续用于流操作。
代码示例:
@Test
void givenNumbers_whenFolded_thenSumIsEmitted() {
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
Stream folded = numbers.gather(Gatherers.fold(() -> 0, Integer::sum));
List<Integer> resultList = folded.toList();
assertEquals(1, resultList.size());
assertEquals(Integer.valueOf(15), resultList.getFirst());
}
我们初始化 fold() 方法的初始值为 0,并对输入流的所有数字求和。由于 gatherers 是中间操作,我们将结果收集到列表并验证预期输出。
2.3 mapConcurrent()
顾名思义,mapConcurrent() 在给定并发限制下,并行地对所有元素应用函数。它避免了手动管理线程池或使用 Callable 和 Future 的复杂性。
代码示例:
@Test
void givenWords_whenMappedConcurrently_thenUppercasedWordsAreEmitted() {
Stream<String> words = Stream.of("a", "b", "c", "d");
List<String> resultList = words.gather(Gatherers.mapConcurrent(2, String::toUpperCase)).toList();
assertEquals(4, resultList.size());
assertEquals(List.of("A", "B", "C", "D"),resultList);
}
我们设置 maxConcurrency 为 2(toUpperCase() 函数的最大并发度),并验证输出符合预期。
2.4 scan()
scan() 执行增量累积:从初始状态开始,评估当前状态并应用于当前元素,生成下游值。
代码示例:
@Test
void givenNumbers_whenScanned_thenRunningTotalsAreEmitted() {
Stream<Integer> numbers = Stream.of(1, 2, 3, 4);
List<Integer> resultList = numbers.gather(Gatherers.scan(() -> 0, Integer::sum)).toList();
assertEquals(4, resultList.size());
assertEquals(List.of(1, 3, 6, 10),resultList);
}
使用 scan() 计算输入流的累加和。提供初始值 0 后,所有输入值的累加和被逐步计算。
2.5 windowSliding()
名称即功能:实现了 滑动窗口算法。若窗口大小大于流输入,则仅包含所有流元素的单个窗口。通常,它将输入元素分组到配置大小的滑动窗口中。
代码示例:
@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
List<List<Integer>> resultList = numbers.gather(Gatherers.windowSliding(3))
.toList();
assertEquals(3, resultList.size());
assertEquals(expectedOutput,resultList);
}
如预期,我们得到输入元素的 m-to-n 映射,分组为配置大小的列表。
3. 实战场景
前文介绍了内置的中间操作支持。现在我们探索如何为不同输入输出关系构建自定义 Gatherer。
3.1 一对一映射
**Gatherer 的唯一必需函数是 *integrator()***。下面将 String 流转换为长度(一对一映射),仅定义 *integrator()*:
@Test
void givenStrings_whenUsingCustomGatherer_thenLengthsAreCalculated() {
List<Integer> expectedOutput = List.of(5, 6, 3);
Stream<String> inputStrings = Stream.of("apple", "banana", "cat");
List<Object> resultList = inputStrings.gather(Gatherer.of((state, element, downstream) -> {
downstream.push(element.length());
return true;
}))
.toList();
assertEquals(3, resultList.size());
assertEquals(expectedOutput, resultList);
}
我们将 Gatherer 的 integrator() 定义为 lambda 表达式,将字符串长度推送到下游。也可通过继承 Gatherer 接口实现自定义类。
3.2 一对多映射
输入 String 流,通过拆分句子生成单词组合。定义自定义 Gatherer 探索各函数:
public class SentenceSplitterGatherer implements Gatherer<String, List<String>,String> {
@Override
public Supplier<List<String>> initializer() {
return ArrayList::new;
}
@Override
public BinaryOperator<List<String>> combiner() {
return (left, right) -> {
left.addAll(right);
return left;
};
}
@Override
public Integrator<List<String>, String, String> integrator() {
return (state, element, downstream) -> {
var words = element.split("\\s+");
for (var word : words) {
state.add(word);
downstream.push(word);
}
return true;
};
}
}
在 SentenceSplitterGatherer 中:
- ✅ initializer() 返回空 ArrayList 作为初始状态
- ✅ combiner() 支持并行处理能力
- ✅ integrator() 拆分字符串并更新状态及下游
验证自定义 Gatherer:
@Test
void givenSentences_whenUsingCustomOneToManyGatherer_thenWordsAreExtracted() {
List<String> expectedOutput = List.of("hello", "world", "java", "streams");
Stream<String> sentences = Stream.of("hello world", "java streams");
List<String> words = sentences.gather(new SentenceSplitterGatherer())
.toList();
assertEquals(expectedOutput, words);
}
3.3 多对一映射
定义自定义 Gatherer:初始化空 ArrayList,定义 Integer 流的求和逻辑,**最后在无上游元素时执行 *finisher()***:
public class NumericSumGatherer implements Gatherer<Integer, ArrayList<Integer>, Integer> {
@Override
public Supplier<ArrayList<Integer>> initializer() {
return ArrayList::new;
}
@Override
public Integrator<ArrayList<Integer>, Integer, Integer> integrator() {
return new Integrator<>() {
@Override
public boolean integrate(ArrayList<Integer> state,
Integer element, Downstream<? super Integer> downstream) {
if (state.isEmpty()) {
state.add(element);
} else {
state.addFirst(state.getFirst() + element);
}
return true;
}
};
}
@Override
public BiConsumer<ArrayList<Integer>, Downstream<? super Integer>> finisher() {
return (state, downstream) -> {
if (!downstream.isRejecting() && !state.isEmpty()) {
downstream.push(state.getFirst());
state.clear();
}
};
}
}
核心逻辑:累加流中所有元素。测试验证:
@Test
void givenNumbers_whenUsingCustomManyToOneGatherer_thenSumIsCalculated() {
Stream<Integer> inputValues = Stream.of(1, 2, 3, 4, 5, 6);
List<Integer> result = inputValues.gather(new NumericSumGatherer())
.toList();
Assertions.assertEquals(Integer.valueOf(21), result.getFirst());
}
3.4 多对多映射
前文展示了内置 windowSliding() 的工作原理。现在用自定义逻辑实现相同功能:
public class SlidingWindowGatherer implements Gatherer<Integer, Deque<Integer>, List<Integer>> {
@Override
public Supplier<Deque<Integer>> initializer() {
return ArrayDeque::new;
}
@Override
public Integrator<Deque<Integer>, Integer, List<Integer>> integrator() {
return new Integrator<>() {
@Override
public boolean integrate(Deque<Integer> state,
Integer element, Downstream<? super List<Integer>> downstream) {
state.addLast(element);
if (state.size() == 3) {
downstream.push(new ArrayList<>(state));
state.removeFirst();
}
return true;
}
};
}
@Override
public BiConsumer<Deque<Integer>, Downstream<? super List<Integer>>> finisher() {
return (state, downstream) -> {};
}
}
实现要点:
- ✅ 初始化固定大小为 3 的空 Deque 作为滑动窗口
- ✅ 整合时将新元素添加到窗口末尾
- ✅ 窗口达 3 个元素时,推送副本到下游并移除最旧元素
- ⚠️ 跳过 finisher(不发送部分窗口)
验证实现:
@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
List<List<Integer>> resultList = numbers.gather(new SlidingWindowGatherer())
.toList();
Assertions.assertEquals(3, resultList.size());
Assertions.assertEquals(expectedOutput, resultList);
}
4. 总结
本文首先探讨了 Gatherer API 的特性及解决的问题:为中间 Stream 操作提供类似 collect() 对终端操作的能力。
接着简要介绍了 API 的核心函数及部分内置 Gatherer。
最后通过不同输入输出关系的自定义 Gatherer 实现,深入分析了各函数的细节。
完整代码见 GitHub 仓库。