1. 概述

Java 8 引入了 Stream API,此后它已成为 Java 开发中的核心工具。它简单易用、易于理解和维护,同时支持顺序和并行处理。

然而,Stream API 的中间操作数量固定且灵活性不足。为克服这一限制,Java 24 引入了 Gatherer 接口,为中间操作提供了更强的灵活性

本文将深入探讨 Gatherers 的概念及其使用方法。

2. Stream Gatherer

Stream Gatherers 的核心目标是支持自定义中间操作,使流水线更灵活且表达力更强。它们支持异步和增量处理,同时允许自定义数据分组或累积。

它们能实现 m-to-n 元素转换、基于历史元素的决策转换、并行执行,以及将无限流转换为有限流

接下来我们分析构成 Gatherer 的核心函数。

2.1 Gatherer 核心函数

Gatherer 包含四个定义元素收集和转换方式的函数:

  • ***initializer()***:可选函数,用于在流处理期间存储状态。为转换提供初始状态。
  • ***integrator()***:整合新流元素(可选地结合处理状态),并选择性地向下游发送元素。可根据条件提前终止处理,是转换逻辑的核心控制器。
  • ***combiner()***:可选函数,启用 Gatherer 的并行处理能力。合并两个状态以实现并行元素处理。若未定义或输入流未标记为并行,则 Gatherer 将顺序执行。
  • ***finisher()***:可选函数,在流中无更多元素可消费时调用。适用于缓冲或滑动窗口等有状态操作。

接下来查看内置 Gatherers 的示例。

2.2 fold()

fold() 以有序方式合并多个元素生成最终结果。相比 reduce() 的优势在于:结果仍可继续用于流操作。

代码示例:

@Test
void givenNumbers_whenFolded_thenSumIsEmitted() {
    Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
    Stream folded = numbers.gather(Gatherers.fold(() -> 0, Integer::sum));
    List<Integer> resultList = folded.toList();
    assertEquals(1, resultList.size());
    assertEquals(Integer.valueOf(15), resultList.getFirst());
}

我们初始化 fold() 方法的初始值为 0,并对输入流的所有数字求和。由于 gatherers 是中间操作,我们将结果收集到列表并验证预期输出。

2.3 mapConcurrent()

顾名思义,mapConcurrent() 在给定并发限制下,并行地对所有元素应用函数。它避免了手动管理线程池或使用 CallableFuture 的复杂性。

代码示例:

@Test
void givenWords_whenMappedConcurrently_thenUppercasedWordsAreEmitted() {
    Stream<String> words = Stream.of("a", "b", "c", "d");
    List<String> resultList = words.gather(Gatherers.mapConcurrent(2, String::toUpperCase)).toList();
    assertEquals(4, resultList.size());
    assertEquals(List.of("A", "B", "C", "D"),resultList);
}

我们设置 maxConcurrency2toUpperCase() 函数的最大并发度),并验证输出符合预期。

2.4 scan()

scan() 执行增量累积:从初始状态开始,评估当前状态并应用于当前元素,生成下游值。

代码示例:

@Test
void givenNumbers_whenScanned_thenRunningTotalsAreEmitted() {
    Stream<Integer> numbers = Stream.of(1, 2, 3, 4);
    List<Integer> resultList = numbers.gather(Gatherers.scan(() -> 0, Integer::sum)).toList();
    assertEquals(4, resultList.size());
    assertEquals(List.of(1, 3, 6, 10),resultList);
}

使用 scan() 计算输入流的累加和。提供初始值 0 后,所有输入值的累加和被逐步计算。

2.5 windowSliding()

名称即功能:实现了 滑动窗口算法若窗口大小大于流输入,则仅包含所有流元素的单个窗口。通常,它将输入元素分组到配置大小的滑动窗口中。

代码示例:

@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
    List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
    Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
    List<List<Integer>> resultList = numbers.gather(Gatherers.windowSliding(3))
      .toList();
    assertEquals(3, resultList.size());
    assertEquals(expectedOutput,resultList);
}

如预期,我们得到输入元素的 m-to-n 映射,分组为配置大小的列表

3. 实战场景

前文介绍了内置的中间操作支持。现在我们探索如何为不同输入输出关系构建自定义 Gatherer

3.1 一对一映射

**Gatherer 的唯一必需函数是 *integrator()***。下面将 String 流转换为长度(一对一映射),仅定义 *integrator()*:

@Test
void givenStrings_whenUsingCustomGatherer_thenLengthsAreCalculated() {
    List<Integer> expectedOutput = List.of(5, 6, 3);
    Stream<String> inputStrings = Stream.of("apple", "banana", "cat");
    List<Object> resultList = inputStrings.gather(Gatherer.of((state, element, downstream) -> {
            downstream.push(element.length());
            return true;
        }))
      .toList();
    assertEquals(3, resultList.size());
    assertEquals(expectedOutput, resultList);
}

我们将 Gathererintegrator() 定义为 lambda 表达式,将字符串长度推送到下游。也可通过继承 Gatherer 接口实现自定义类。

3.2 一对多映射

输入 String 流,通过拆分句子生成单词组合。定义自定义 Gatherer 探索各函数:

public class SentenceSplitterGatherer implements Gatherer<String, List<String>,String> {

    @Override
    public Supplier<List<String>> initializer() {
        return ArrayList::new;
    }

    @Override
    public BinaryOperator<List<String>> combiner() {
        return (left, right) -> {
            left.addAll(right);
            return left;
        };
    }

    @Override
    public Integrator<List<String>, String, String> integrator() {
        return (state, element, downstream) -> {
            var words = element.split("\\s+");
            for (var word : words) {
                state.add(word);
                downstream.push(word);
            }
            return true;
        };
    }
}

SentenceSplitterGatherer 中:

  • initializer() 返回空 ArrayList 作为初始状态
  • combiner() 支持并行处理能力
  • integrator() 拆分字符串并更新状态及下游

验证自定义 Gatherer

@Test
void givenSentences_whenUsingCustomOneToManyGatherer_thenWordsAreExtracted() {
    List<String> expectedOutput = List.of("hello", "world", "java", "streams");
    Stream<String> sentences = Stream.of("hello world", "java streams");
    List<String> words = sentences.gather(new SentenceSplitterGatherer())
        .toList();
    assertEquals(expectedOutput, words);
}

3.3 多对一映射

定义自定义 Gatherer:初始化空 ArrayList,定义 Integer 流的求和逻辑,**最后在无上游元素时执行 *finisher()***:

public class NumericSumGatherer implements Gatherer<Integer, ArrayList<Integer>, Integer> {

    @Override
    public Supplier<ArrayList<Integer>> initializer() {
        return ArrayList::new;
    }

    @Override
    public Integrator<ArrayList<Integer>, Integer, Integer> integrator() {
        return new Integrator<>() {
            @Override
            public boolean integrate(ArrayList<Integer> state,
              Integer element, Downstream<? super Integer> downstream) {
                if (state.isEmpty()) {
                    state.add(element);
                } else {
                    state.addFirst(state.getFirst() + element);
                }
                return true;
            }
        };
    }

    @Override
    public BiConsumer<ArrayList<Integer>, Downstream<? super Integer>> finisher() {
        return (state, downstream) -> {
            if (!downstream.isRejecting() && !state.isEmpty()) {
                downstream.push(state.getFirst());
                state.clear();
            }
        };
    }
}

核心逻辑:累加流中所有元素。测试验证:

@Test
void givenNumbers_whenUsingCustomManyToOneGatherer_thenSumIsCalculated() {
    Stream<Integer> inputValues = Stream.of(1, 2, 3, 4, 5, 6);
    List<Integer> result = inputValues.gather(new NumericSumGatherer())
      .toList();
    Assertions.assertEquals(Integer.valueOf(21), result.getFirst());
}

3.4 多对多映射

前文展示了内置 windowSliding() 的工作原理。现在用自定义逻辑实现相同功能:

public class SlidingWindowGatherer implements Gatherer<Integer, Deque<Integer>, List<Integer>> {

    @Override
    public Supplier<Deque<Integer>> initializer() {
        return ArrayDeque::new;
    }

    @Override
    public Integrator<Deque<Integer>, Integer, List<Integer>> integrator() {
        return new Integrator<>() {
            @Override
            public boolean integrate(Deque<Integer> state,
                  Integer element, Downstream<? super List<Integer>> downstream) {
                state.addLast(element);
                if (state.size() == 3) {
                    downstream.push(new ArrayList<>(state));
                    state.removeFirst();
                }
                return true;
            }
        };
    }

    @Override
    public BiConsumer<Deque<Integer>, Downstream<? super List<Integer>>> finisher() {
        return (state, downstream) -> {};
    }
}

实现要点:

  • ✅ 初始化固定大小为 3 的空 Deque 作为滑动窗口
  • ✅ 整合时将新元素添加到窗口末尾
  • ✅ 窗口达 3 个元素时,推送副本到下游并移除最旧元素
  • ⚠️ 跳过 finisher(不发送部分窗口)

验证实现:

@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
    List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
    Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
    List<List<Integer>> resultList = numbers.gather(new SlidingWindowGatherer())
      .toList();
    Assertions.assertEquals(3, resultList.size());
    Assertions.assertEquals(expectedOutput, resultList);
}

4. 总结

本文首先探讨了 Gatherer API 的特性及解决的问题:为中间 Stream 操作提供类似 collect() 对终端操作的能力

接着简要介绍了 API 的核心函数及部分内置 Gatherer

最后通过不同输入输出关系的自定义 Gatherer 实现,深入分析了各函数的细节。

完整代码见 GitHub 仓库


原始标题:Stream Gatherers in Java | Baeldung