2. Stream 创建

Stream 实例可以通过多种数据源创建,创建后不会修改原始数据源,因此可以从单一数据源创建多个 Stream 实例。

2.1 空 Stream

使用 empty() 方法创建空 Stream:

Stream<String> streamEmpty = Stream.empty();

常用于避免返回 null,例如:

public Stream<String> streamOf(List<String> list) {
    return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}

2.2 Collection 转 Stream

任何 Collection 类型(List, Set 等)都能直接创建 Stream:

Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();

2.3 数组转 Stream

数组也是常见的数据源:

Stream<String> streamOfArray = Stream.of("a", "b", "c");

也可以从现有数组或部分数组创建:

String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3); // 截取索引 1-3

2.4 Stream.builder()

使用 builder()必须显式指定类型,否则 build() 会返回 Stream<Object>

Stream<String> streamBuilder =
  Stream.<String>builder().add("a").add("b").add("c").build();

2.5 Stream.generate()

generate() 接收 Supplier<T> 生成元素。由于生成的是无限流,必须通过 limit() 限制大小

Stream<String> streamGenerated =
  Stream.generate(() -> "element").limit(10); // 生成10个"element"

2.6 Stream.iterate()

通过 iterate() 创建无限流:

Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);

首元素是 40,后续元素通过函数 n -> n + 2 生成(如第二个元素是 42)。

2.7 基本类型 Stream

Java 8 为三种基本类型提供专用接口:IntStream, LongStream, DoubleStream,避免自动装箱开销:

IntStream intStream = IntStream.range(1, 3); // [1, 2]
LongStream longStream = LongStream.rangeClosed(1, 3); // [1, 2, 3]
  • range(start, end):左闭右开区间
  • rangeClosed(start, end):闭区间

Random 类也支持生成基本类型流:

Random random = new Random();
DoubleStream doubleStream = random.doubles(3); // 生成3个随机double

2.8 字符串 Stream

通过 String.chars() 创建字符流(返回 IntStream):

IntStream streamOfChars = "abc".chars();

按正则分割字符串:

Stream<String> streamOfString =
  Pattern.compile(", ").splitAsStream("a, b, c");

2.9 文件 Stream

Java NIO 的 Files 类通过 lines() 方法读取文本文件每行:

Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset = 
  Files.lines(path, Charset.forName("UTF-8")); // 指定编码

3. Stream 引用

Stream 实例在执行中间操作时可被引用,但一旦调用终端操作就会失效。看个踩坑例子:

Stream<String> stream = 
  Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny(); // 正常

但再次引用会抛 IllegalStateException

Optional<String> firstElement = stream.findFirst(); // 报错!

关键点:Stream 不可复用。正确做法是收集到集合:

List<String> elements =
  Stream.of("a", "b", "c").filter(element -> element.contains("b"))
    .collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();

4. Stream 流水线

Stream 流水线由三部分组成:数据源中间操作终端操作

中间操作返回新流,例如 skip() 跳过元素:

Stream<String> onceModifiedStream =
  Stream.of("abcd", "bbcd", "cbcd").skip(1); // 跳过首元素

可链式调用多个中间操作:

Stream<String> twiceModifiedStream =
  stream.skip(1).map(element -> element.substring(0, 3)); // 跳过+截取

终端操作触发实际计算(如 count()),每个流只能调用一次终端操作

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
  .map(element -> element.substring(0, 3)).sorted().count();

5. 惰性求值

中间操作是惰性的:仅在终端操作需要时执行。看个例子:

private long counter;
private void wasCalled() { counter++; }

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
    wasCalled();
    return element.contains("2");
});

此时 counter 仍为 0,因为缺少终端操作。添加终端操作后:

Optional<String> stream = list.stream().filter(element -> {
    log.info("filter() was called");
    return element.contains("2");
}).map(element -> {
    log.info("map() was called");
    return element.toUpperCase();
}).findFirst();

日志显示:

  • filter() 被调用2次(跳过不满足的元素)
  • map() 被调用1次(仅对满足条件的元素)

惰性求值优化了性能,避免不必要的计算。

6. 执行顺序

操作顺序对性能影响极大。看个反面例子:

long size = list.stream().map(element -> {
    wasCalled(); // 昂贵操作
    return element.substring(0, 3);
}).skip(2).count(); // 后过滤

wasCalled() 被执行3次,但最终只有1个元素。调整顺序后:

long size = list.stream().skip(2).map(element -> {
    wasCalled(); // 仅执行1次
    return element.substring(0, 3);
}).count();

黄金法则:将减少流大小的操作(如 skip(), filter(), distinct())放在流水线前端。

7. Stream 归约

终端操作 reduce()collect() 可自定义归约逻辑。

7.1 reduce() 方法

三种重载形式:

  1. 单参数:返回 Optional<T>

    OptionalInt reduced =
      IntStream.range(1, 4).reduce((a, b) -> a + b); // 6 (1+2+3)
    
  2. 双参数(初始值 + 累加器)

    int reducedTwoParams =
      IntStream.range(1, 4).reduce(10, (a, b) -> a + b); // 16 (10+1+2+3)
    
  3. 三参数(初始值 + 累加器 + 合并器)

    int reducedParams = Stream.of(1, 2, 3)
      .reduce(10, (a, b) -> a + b, (a, b) -> {
         log.info("combiner was called");
         return a + b;
      }); // 16,串行流不调用合并器
    

在并行流中合并器才会生效:

int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
    .reduce(10, (a, b) -> a + b, (a, b) -> {
       log.info("combiner was called"); // 调用2次
       return a + b;
    }); // 36 (并行计算结果)

7.2 collect() 方法

使用 Collector 定义归约逻辑。以下用 Product 类演示:

List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
  new Product(14, "orange"), new Product(13, "lemon"),
  new Product(23, "bread"), new Product(13, "sugar"));

常用收集器

  1. 转换为 Collection

    List<String> collectorCollection = 
      productList.stream().map(Product::getName).collect(Collectors.toList());
    
  2. 拼接字符串

    String listToString = productList.stream().map(Product::getName)
      .collect(Collectors.joining(", ", "[", "]")); // [potatoes, orange, lemon...]
    
  3. 数值统计

    double averagePrice = productList.stream()
      .collect(Collectors.averagingInt(Product::getPrice)); // 平均值
    int summingPrice = productList.stream()
      .collect(Collectors.summingInt(Product::getPrice)); // 总和
    
  4. 完整统计信息

    IntSummaryStatistics statistics = productList.stream()
      .collect(Collectors.summarizingInt(Product::getPrice));
    // 输出: IntSummaryStatistics{count=5, sum=86, min=13, average=17.2, max=23}
    
  5. 分组

    Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
      .collect(Collectors.groupingBy(Product::getPrice)); // 按价格分组
    
  6. 分区(按条件分两组)

    Map<Boolean, List<Product>> mapPartioned = productList.stream()
      .collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
    
  7. 自定义转换

    Set<Product> unmodifiableSet = productList.stream()
      .collect(Collectors.collectingAndThen(Collectors.toSet(),
      Collections::unmodifiableSet)); // 转为不可变Set
    
  8. 自定义收集器

    Collector<Product, ?, LinkedList<Product>> toLinkedList =
      Collector.of(LinkedList::new, LinkedList::add, 
        (first, second) -> { 
           first.addAll(second); 
           return first; 
        });
    LinkedList<Product> linkedListOfProducts =
      productList.stream().collect(toLinkedList);
    

8. 并行流

Java 8 通过 parallelStream() 简单实现并行处理:

Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel(); // true
boolean bigPrice = streamOfCollection
  .map(product -> product.getPrice() * 12)
  .anyMatch(price -> price > 200);

Collection 数据源用 parallel()

IntStream intStreamParallel = IntStream.range(1, 150).parallel();

注意事项

  1. 底层使用 ForkJoinPool,无法自定义线程池(但有变通方案)
  2. 避免阻塞操作
  3. 确保任务耗时相近,否则拖慢整体性能

切换回串行流:

IntStream intStreamSequential = intStreamParallel.sequential();

9. 总结

Stream API 是处理元素序列的强大工具,合理使用可:

  • 减少模板代码
  • 提升可读性
  • 优化应用性能

关键提醒:生产环境切勿保留未消费的 Stream,会导致内存泄漏。完整示例代码见 GitHub