2. Stream 创建
Stream 实例可以通过多种数据源创建,创建后不会修改原始数据源,因此可以从单一数据源创建多个 Stream 实例。
2.1 空 Stream
使用 empty()
方法创建空 Stream:
Stream<String> streamEmpty = Stream.empty();
常用于避免返回 null
,例如:
public Stream<String> streamOf(List<String> list) {
return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}
2.2 Collection 转 Stream
任何 Collection
类型(List
, Set
等)都能直接创建 Stream:
Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();
2.3 数组转 Stream
数组也是常见的数据源:
Stream<String> streamOfArray = Stream.of("a", "b", "c");
也可以从现有数组或部分数组创建:
String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3); // 截取索引 1-3
2.4 Stream.builder()
使用 builder()
时必须显式指定类型,否则 build()
会返回 Stream<Object>
:
Stream<String> streamBuilder =
Stream.<String>builder().add("a").add("b").add("c").build();
2.5 Stream.generate()
generate()
接收 Supplier<T>
生成元素。由于生成的是无限流,必须通过 limit()
限制大小:
Stream<String> streamGenerated =
Stream.generate(() -> "element").limit(10); // 生成10个"element"
2.6 Stream.iterate()
通过 iterate()
创建无限流:
Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);
首元素是 40
,后续元素通过函数 n -> n + 2
生成(如第二个元素是 42
)。
2.7 基本类型 Stream
Java 8 为三种基本类型提供专用接口:IntStream
, LongStream
, DoubleStream
,避免自动装箱开销:
IntStream intStream = IntStream.range(1, 3); // [1, 2]
LongStream longStream = LongStream.rangeClosed(1, 3); // [1, 2, 3]
range(start, end)
:左闭右开区间rangeClosed(start, end)
:闭区间
Random
类也支持生成基本类型流:
Random random = new Random();
DoubleStream doubleStream = random.doubles(3); // 生成3个随机double
2.8 字符串 Stream
通过 String.chars()
创建字符流(返回 IntStream
):
IntStream streamOfChars = "abc".chars();
按正则分割字符串:
Stream<String> streamOfString =
Pattern.compile(", ").splitAsStream("a, b, c");
2.9 文件 Stream
Java NIO 的 Files
类通过 lines()
方法读取文本文件每行:
Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset =
Files.lines(path, Charset.forName("UTF-8")); // 指定编码
3. Stream 引用
Stream 实例在执行中间操作时可被引用,但一旦调用终端操作就会失效。看个踩坑例子:
Stream<String> stream =
Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny(); // 正常
但再次引用会抛 IllegalStateException
:
Optional<String> firstElement = stream.findFirst(); // 报错!
关键点:Stream 不可复用。正确做法是收集到集合:
List<String> elements =
Stream.of("a", "b", "c").filter(element -> element.contains("b"))
.collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();
4. Stream 流水线
Stream 流水线由三部分组成:数据源、中间操作和终端操作。
中间操作返回新流,例如 skip()
跳过元素:
Stream<String> onceModifiedStream =
Stream.of("abcd", "bbcd", "cbcd").skip(1); // 跳过首元素
可链式调用多个中间操作:
Stream<String> twiceModifiedStream =
stream.skip(1).map(element -> element.substring(0, 3)); // 跳过+截取
终端操作触发实际计算(如 count()
),每个流只能调用一次终端操作:
List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
.map(element -> element.substring(0, 3)).sorted().count();
5. 惰性求值
中间操作是惰性的:仅在终端操作需要时执行。看个例子:
private long counter;
private void wasCalled() { counter++; }
List<String> list = Arrays.asList("abc1", "abc2", "abc3");
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
wasCalled();
return element.contains("2");
});
此时 counter
仍为 0
,因为缺少终端操作。添加终端操作后:
Optional<String> stream = list.stream().filter(element -> {
log.info("filter() was called");
return element.contains("2");
}).map(element -> {
log.info("map() was called");
return element.toUpperCase();
}).findFirst();
日志显示:
filter()
被调用2次(跳过不满足的元素)map()
被调用1次(仅对满足条件的元素)
惰性求值优化了性能,避免不必要的计算。
6. 执行顺序
操作顺序对性能影响极大。看个反面例子:
long size = list.stream().map(element -> {
wasCalled(); // 昂贵操作
return element.substring(0, 3);
}).skip(2).count(); // 后过滤
wasCalled()
被执行3次,但最终只有1个元素。调整顺序后:
long size = list.stream().skip(2).map(element -> {
wasCalled(); // 仅执行1次
return element.substring(0, 3);
}).count();
黄金法则:将减少流大小的操作(如 skip()
, filter()
, distinct()
)放在流水线前端。
7. Stream 归约
终端操作 reduce()
和 collect()
可自定义归约逻辑。
7.1 reduce() 方法
三种重载形式:
单参数:返回
Optional<T>
OptionalInt reduced = IntStream.range(1, 4).reduce((a, b) -> a + b); // 6 (1+2+3)
双参数(初始值 + 累加器)
int reducedTwoParams = IntStream.range(1, 4).reduce(10, (a, b) -> a + b); // 16 (10+1+2+3)
三参数(初始值 + 累加器 + 合并器)
int reducedParams = Stream.of(1, 2, 3) .reduce(10, (a, b) -> a + b, (a, b) -> { log.info("combiner was called"); return a + b; }); // 16,串行流不调用合并器
在并行流中合并器才会生效:
int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called"); // 调用2次
return a + b;
}); // 36 (并行计算结果)
7.2 collect() 方法
使用 Collector
定义归约逻辑。以下用 Product
类演示:
List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
new Product(14, "orange"), new Product(13, "lemon"),
new Product(23, "bread"), new Product(13, "sugar"));
常用收集器:
转换为
Collection
List<String> collectorCollection = productList.stream().map(Product::getName).collect(Collectors.toList());
拼接字符串
String listToString = productList.stream().map(Product::getName) .collect(Collectors.joining(", ", "[", "]")); // [potatoes, orange, lemon...]
数值统计
double averagePrice = productList.stream() .collect(Collectors.averagingInt(Product::getPrice)); // 平均值 int summingPrice = productList.stream() .collect(Collectors.summingInt(Product::getPrice)); // 总和
完整统计信息
IntSummaryStatistics statistics = productList.stream() .collect(Collectors.summarizingInt(Product::getPrice)); // 输出: IntSummaryStatistics{count=5, sum=86, min=13, average=17.2, max=23}
分组
Map<Integer, List<Product>> collectorMapOfLists = productList.stream() .collect(Collectors.groupingBy(Product::getPrice)); // 按价格分组
分区(按条件分两组)
Map<Boolean, List<Product>> mapPartioned = productList.stream() .collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
自定义转换
Set<Product> unmodifiableSet = productList.stream() .collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet)); // 转为不可变Set
自定义收集器
Collector<Product, ?, LinkedList<Product>> toLinkedList = Collector.of(LinkedList::new, LinkedList::add, (first, second) -> { first.addAll(second); return first; }); LinkedList<Product> linkedListOfProducts = productList.stream().collect(toLinkedList);
8. 并行流
Java 8 通过 parallelStream()
简单实现并行处理:
Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel(); // true
boolean bigPrice = streamOfCollection
.map(product -> product.getPrice() * 12)
.anyMatch(price -> price > 200);
非 Collection
数据源用 parallel()
:
IntStream intStreamParallel = IntStream.range(1, 150).parallel();
注意事项:
- 底层使用
ForkJoinPool
,无法自定义线程池(但有变通方案) - 避免阻塞操作
- 确保任务耗时相近,否则拖慢整体性能
切换回串行流:
IntStream intStreamSequential = intStreamParallel.sequential();
9. 总结
Stream API 是处理元素序列的强大工具,合理使用可:
- 减少模板代码
- 提升可读性
- 优化应用性能
关键提醒:生产环境切勿保留未消费的 Stream,会导致内存泄漏。完整示例代码见 GitHub。