1. 概述

Stream API 提供了丰富的中间操作、归约操作和终端操作,且支持并行处理。

其中,归约操作(reduction)可以从一个元素序列中生成一个单一的结果,通过反复应用某种合并操作来实现。
在本篇文章中,我们将深入探讨通用的 Stream.reduce() 方法,并结合具体示例进行讲解。

2. 核心概念:Identity、Accumulator 和 Combiner

在深入使用 Stream.reduce() 之前,我们先拆解其构成要素,以便更好地理解每个部分的作用。

  • Identity(初始值):归约操作的初始值,同时也是流为空时的默认返回值。
  • Accumulator(累加器):接受两个参数的函数:一个是当前的归约中间结果,另一个是流中的下一个元素。
  • Combiner(组合器):用于在并行归约或累加器参数类型不匹配时,合并多个中间结果。

3. 使用 Stream.reduce()

为了更好地理解 identity、accumulator 和 combiner 的作用,我们来看几个基础示例:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int result = numbers
  .stream()
  .reduce(0, (subtotal, element) -> subtotal + element);
assertThat(result).isEqualTo(21);

在这个例子中,整数 0 是 identity,它作为归约操作的初始值,并在流为空时作为默认返回值。

而下面这个 lambda 表达式:

subtotal, element -> subtotal + element

就是 accumulator(累加器),它将当前部分和与下一个元素相加。

我们可以使用方法引用来简化代码:

int result = numbers.stream().reduce(0, Integer::sum);
assertThat(result).isEqualTo(21);

当然,我们也可以对其他类型的流使用 reduce()。例如,对字符串列表进行拼接:

List<String> letters = Arrays.asList("a", "b", "c", "d", "e");
String result = letters
  .stream()
  .reduce("", (partialString, element) -> partialString + element);
assertThat(result).isEqualTo("abcde");

或者使用方法引用:

String result = letters.stream().reduce("", String::concat);
assertThat(result).isEqualTo("abcde");

再比如,将字符串全部转为大写后拼接:

String result = letters
  .stream()
  .reduce(
    "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase());
assertThat(result).isEqualTo("ABCDE");

在并行流中使用 reduce()

List<Integer> ages = Arrays.asList(25, 30, 45, 28, 32);
int computedAges = ages.parallelStream().reduce(0, (a, b) -> a + b, Integer::sum);

当流并行执行时,Java 运行时会将流拆分为多个子流。此时,需要一个 combiner 来合并子流的结果。在上面的示例中,Integer::sum 就是 combiner。

⚠️ 但下面这段代码是编译不通过的:

List<User> users = Arrays.asList(new User("John", 30), new User("Julie", 35));
int computedAges = 
  users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());

这里的问题是,累加器的参数类型是 IntegerUser,但累加器的逻辑是两个 Integer 相加,编译器无法推断出 user 参数的类型。

我们可以通过添加 combiner 来解决:

int result = users.stream()
  .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
assertThat(result).isEqualTo(65);

✅ 简单总结:如果使用的是顺序流,且累加器的参数类型和实现一致,那么 combiner 是不需要的

4. 并行归约

前面我们提到,可以在并行流中使用 reduce()

在使用并行流时,需要确保 reduce() 操作满足以下条件:

  • 结合性(Associative):操作结果不受元素顺序影响
  • 无干扰性(Non-interfering):操作不会修改数据源
  • 无状态且确定性(Stateless and Deterministic):操作没有状态,相同输入产生相同输出

满足这些条件可以避免不可预测的结果。

并行流的性能通常优于顺序流,但仅在操作开销较大或流元素较多时才体现出优势。

我们使用 JMH 来做一个基准测试,比较顺序流和并行流的性能:

@State(Scope.Thread)
private final List<User> userList = createUsers();

@Benchmark
public Integer executeReduceOnParallelizedStream() {
    return this.userList
      .parallelStream()
      .reduce(
        0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

@Benchmark
public Integer executeReduceOnSequentialStream() {
    return this.userList
      .stream()
      .reduce(
        0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

基准测试结果如下

Benchmark                                                   Mode  Cnt  Score    Error  Units
JMHStreamReduceBenchMark.executeReduceOnParallelizedStream  avgt    5  0,007 ±  0,001   s/op
JMHStreamReduceBenchMark.executeReduceOnSequentialStream    avgt    5  0,010 ±  0,001   s/op

✅ 并行流在大数据量和复杂操作下表现更优。

5. 归约中的异常处理

之前的示例中 reduce() 没有抛出异常,但这并不意味着它不会。

例如,将流中的所有元素除以某个数再求和:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int divider = 2;
int result = numbers.stream().reduce(0, a / divider + b / divider);

如果 divider 为 0,就会抛出 ArithmeticException 异常。

可以通过 try/catch 捕获异常:

public static int divideListElements(List<Integer> values, int divider) {
    return values.stream()
      .reduce(0, (a, b) -> {
          try {
              return a / divider + b / divider;
          } catch (ArithmeticException e) {
              LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
          }
          return 0;
      });
}

但这样写会污染 lambda 表达式。我们可以将异常处理逻辑抽离到一个独立的方法中:

private static int divide(int value, int factor) {
    int result = 0;
    try {
        result = value / factor;
    } catch (ArithmeticException e) {
        LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
    }
    return result;
}

然后简化主方法:

public static int divideListElements(List<Integer> values, int divider) {
    return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider));
}

单元测试示例:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);

List<Integer> numbersWithZero = Arrays.asList(0, 1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbersWithZero, 1)).isEqualTo(21);

List<Integer> numbersDividedByZero = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbersDividedByZero, 0)).isEqualTo(0);

6. 复杂自定义对象的归约

我们也可以对包含非基本类型字段的自定义对象使用 Stream.reduce(),只需提供合适的 identity、accumulator 和 combiner。

假设我们的 User 对象属于一个评论网站,每个用户都有一个 Rating,而 Rating 又由多个 Review 构成。

首先定义 Review 类:

public class Review {

    private int points;
    private String review;

    // constructor, getters and setters
}

再定义 Rating 类:

public class Rating {

    double points;
    List<Review> reviews = new ArrayList<>();

    public void add(Review review) {
        reviews.add(review);
        computeRating();
    }

    private double computeRating() {
        double totalPoints = 
          reviews.stream().map(Review::getPoints).reduce(0, Integer::sum);
        this.points = totalPoints / reviews.size();
        return this.points;
    }

    public static Rating average(Rating r1, Rating r2) {
        Rating combined = new Rating();
        combined.reviews = new ArrayList<>(r1.reviews);
        combined.reviews.addAll(r2.reviews);
        combined.computeRating();
        return combined;
    }

}

创建用户数据:

User john = new User("John", 30);
john.getRating().add(new Review(5, ""));
john.getRating().add(new Review(3, "not bad"));
User julie = new User("Julie", 35);
john.getRating().add(new Review(4, "great!"));
john.getRating().add(new Review(2, "terrible experience"));
john.getRating().add(new Review(4, ""));
List<User> users = Arrays.asList(john, julie);

使用 reduce() 计算平均评分:

Rating averageRating = users.stream()
  .reduce(new Rating(), 
    (rating, user) -> Rating.average(rating, user.getRating()), 
    Rating::average);

验证结果:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. 总结

本文我们学习了如何使用 Stream.reduce() 方法进行归约操作,包括:

  • 顺序流与并行流下的使用方式
  • 如何处理归约过程中可能抛出的异常
  • 如何在自定义对象上使用 reduce()

如需获取本文中的完整代码示例,请访问 GitHub


原始标题:Guide to Stream.reduce()