1. 概述
Stream API 提供了丰富的中间操作、归约操作和终端操作,且支持并行处理。
其中,归约操作(reduction)可以从一个元素序列中生成一个单一的结果,通过反复应用某种合并操作来实现。
在本篇文章中,我们将深入探讨通用的 Stream.reduce() 方法,并结合具体示例进行讲解。
2. 核心概念:Identity、Accumulator 和 Combiner
在深入使用 Stream.reduce()
之前,我们先拆解其构成要素,以便更好地理解每个部分的作用。
- Identity(初始值):归约操作的初始值,同时也是流为空时的默认返回值。
- Accumulator(累加器):接受两个参数的函数:一个是当前的归约中间结果,另一个是流中的下一个元素。
- Combiner(组合器):用于在并行归约或累加器参数类型不匹配时,合并多个中间结果。
3. 使用 Stream.reduce()
为了更好地理解 identity、accumulator 和 combiner 的作用,我们来看几个基础示例:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int result = numbers
.stream()
.reduce(0, (subtotal, element) -> subtotal + element);
assertThat(result).isEqualTo(21);
在这个例子中,整数 0 是 identity,它作为归约操作的初始值,并在流为空时作为默认返回值。
而下面这个 lambda 表达式:
subtotal, element -> subtotal + element
就是 accumulator(累加器),它将当前部分和与下一个元素相加。
我们可以使用方法引用来简化代码:
int result = numbers.stream().reduce(0, Integer::sum);
assertThat(result).isEqualTo(21);
当然,我们也可以对其他类型的流使用 reduce()
。例如,对字符串列表进行拼接:
List<String> letters = Arrays.asList("a", "b", "c", "d", "e");
String result = letters
.stream()
.reduce("", (partialString, element) -> partialString + element);
assertThat(result).isEqualTo("abcde");
或者使用方法引用:
String result = letters.stream().reduce("", String::concat);
assertThat(result).isEqualTo("abcde");
再比如,将字符串全部转为大写后拼接:
String result = letters
.stream()
.reduce(
"", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase());
assertThat(result).isEqualTo("ABCDE");
在并行流中使用 reduce()
:
List<Integer> ages = Arrays.asList(25, 30, 45, 28, 32);
int computedAges = ages.parallelStream().reduce(0, (a, b) -> a + b, Integer::sum);
当流并行执行时,Java 运行时会将流拆分为多个子流。此时,需要一个 combiner 来合并子流的结果。在上面的示例中,Integer::sum
就是 combiner。
⚠️ 但下面这段代码是编译不通过的:
List<User> users = Arrays.asList(new User("John", 30), new User("Julie", 35));
int computedAges =
users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());
这里的问题是,累加器的参数类型是 Integer
和 User
,但累加器的逻辑是两个 Integer
相加,编译器无法推断出 user
参数的类型。
我们可以通过添加 combiner 来解决:
int result = users.stream()
.reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
assertThat(result).isEqualTo(65);
✅ 简单总结:如果使用的是顺序流,且累加器的参数类型和实现一致,那么 combiner 是不需要的。
4. 并行归约
前面我们提到,可以在并行流中使用 reduce()
。
在使用并行流时,需要确保 reduce()
操作满足以下条件:
- 结合性(Associative):操作结果不受元素顺序影响
- 无干扰性(Non-interfering):操作不会修改数据源
- 无状态且确定性(Stateless and Deterministic):操作没有状态,相同输入产生相同输出
满足这些条件可以避免不可预测的结果。
并行流的性能通常优于顺序流,但仅在操作开销较大或流元素较多时才体现出优势。
我们使用 JMH 来做一个基准测试,比较顺序流和并行流的性能:
@State(Scope.Thread)
private final List<User> userList = createUsers();
@Benchmark
public Integer executeReduceOnParallelizedStream() {
return this.userList
.parallelStream()
.reduce(
0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}
@Benchmark
public Integer executeReduceOnSequentialStream() {
return this.userList
.stream()
.reduce(
0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}
基准测试结果如下:
Benchmark Mode Cnt Score Error Units
JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op
JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op
✅ 并行流在大数据量和复杂操作下表现更优。
5. 归约中的异常处理
之前的示例中 reduce()
没有抛出异常,但这并不意味着它不会。
例如,将流中的所有元素除以某个数再求和:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int divider = 2;
int result = numbers.stream().reduce(0, a / divider + b / divider);
如果 divider
为 0,就会抛出 ArithmeticException
异常。
可以通过 try/catch
捕获异常:
public static int divideListElements(List<Integer> values, int divider) {
return values.stream()
.reduce(0, (a, b) -> {
try {
return a / divider + b / divider;
} catch (ArithmeticException e) {
LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
}
return 0;
});
}
但这样写会污染 lambda 表达式。我们可以将异常处理逻辑抽离到一个独立的方法中:
private static int divide(int value, int factor) {
int result = 0;
try {
result = value / factor;
} catch (ArithmeticException e) {
LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
}
return result;
}
然后简化主方法:
public static int divideListElements(List<Integer> values, int divider) {
return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider));
}
单元测试示例:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
List<Integer> numbersWithZero = Arrays.asList(0, 1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbersWithZero, 1)).isEqualTo(21);
List<Integer> numbersDividedByZero = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbersDividedByZero, 0)).isEqualTo(0);
6. 复杂自定义对象的归约
我们也可以对包含非基本类型字段的自定义对象使用 Stream.reduce()
,只需提供合适的 identity、accumulator 和 combiner。
假设我们的 User
对象属于一个评论网站,每个用户都有一个 Rating
,而 Rating
又由多个 Review
构成。
首先定义 Review
类:
public class Review {
private int points;
private String review;
// constructor, getters and setters
}
再定义 Rating
类:
public class Rating {
double points;
List<Review> reviews = new ArrayList<>();
public void add(Review review) {
reviews.add(review);
computeRating();
}
private double computeRating() {
double totalPoints =
reviews.stream().map(Review::getPoints).reduce(0, Integer::sum);
this.points = totalPoints / reviews.size();
return this.points;
}
public static Rating average(Rating r1, Rating r2) {
Rating combined = new Rating();
combined.reviews = new ArrayList<>(r1.reviews);
combined.reviews.addAll(r2.reviews);
combined.computeRating();
return combined;
}
}
创建用户数据:
User john = new User("John", 30);
john.getRating().add(new Review(5, ""));
john.getRating().add(new Review(3, "not bad"));
User julie = new User("Julie", 35);
john.getRating().add(new Review(4, "great!"));
john.getRating().add(new Review(2, "terrible experience"));
john.getRating().add(new Review(4, ""));
List<User> users = Arrays.asList(john, julie);
使用 reduce()
计算平均评分:
Rating averageRating = users.stream()
.reduce(new Rating(),
(rating, user) -> Rating.average(rating, user.getRating()),
Rating::average);
验证结果:
assertThat(averageRating.getPoints()).isEqualTo(3.6);
7. 总结
本文我们学习了如何使用 Stream.reduce()
方法进行归约操作,包括:
- 顺序流与并行流下的使用方式
- 如何处理归约过程中可能抛出的异常
- 如何在自定义对象上使用
reduce()
如需获取本文中的完整代码示例,请访问 GitHub。