1. 引言

本文将深入对比 Java Stream 和 Project Reactor 中的 Flux.fromIterable(),从相似性到核心差异进行全面剖析。作为有经验的开发者,你一定对这两个工具不陌生,但理解它们的本质区别能帮你避免踩坑,选择更适合场景的解决方案。

2. Java Stream 概述

Stream 是 Java 8 引入的函数式数据处理 API,本质是同步的、拉取式的数据流,仅在触发终端操作时才执行处理。核心特点包括:

  • ✅ 支持顺序流和并行流处理
  • ✅ 提供 map()filter() 等函数式操作
  • ✅ 惰性执行(终端操作触发时才计算)
  • ✅ 同步且基于拉取模型

3. Flux.fromIterable() 概述

Flux.fromIterable() 是 Project Reactor 的工厂方法,用于将 Iterable(如 ListSet)转换为响应式流 Flux专为异步非阻塞场景设计,核心特点包括

  • ✅ 支持异步非阻塞处理
  • ✅ 提供 map()filter()flatMap() 等响应式操作
  • ✅ 基于推送模型(数据就绪时自动推送)

4. Stream 与 Flux.fromIterable() 的相似性

虽然设计理念不同,但两者在数据处理上有共通之处:

4.1. 函数式编程风格

两者都支持函数式操作链,能以声明式方式构建数据处理管道。下面看过滤偶数并平方的示例:

// Stream 实现
@Test
void givenList_whenProcessedWithStream_thenReturnDoubledEvenNumbers() {
    List<Integer> numbers = List.of(1, 2, 3, 4, 5);

    List<Integer> doubledEvenNumbers = numbers.stream()
      .filter(n -> n % 2 == 0)
      .map(n -> n * n)
      .toList();
    assertEquals(List.of(4, 16), doubledEvenNumbers);
}
// Flux 实现
@Test
void givenList_whenProcessedWithFlux_thenReturnDoubledEvenNumbers() {
    List<Integer> numbers = List.of(1, 2, 3, 4, 5);
    Flux<Integer> fluxPipeline = Flux.fromIterable(numbers)
      .filter(n -> n % 2 == 0)
      .map(n -> n * 2);

    StepVerifier.create(fluxPipeline)
      .expectNext(4, 16);
}

4.2. 惰性求值

两者都采用惰性计算——只有真正需要结果时才执行操作

// Stream 的惰性验证
@Test
void givenList_whenNoTerminalOperator_thenNoResponse() {
    List<Integer> numbers = List.of(1, 2, 3, 4, 5);
    Function<Integer, Integer> mockMapper = mock(Function.class);
    Stream<Integer> streamPipeline = numbers.stream()
      .map(mockMapper);
    verifyNoInteractions(mockMapper); // 无终端操作时不执行

    List<Integer> mappedList = streamPipeline.toList(); // 触发执行
    verify(mockMapper, times(5)); // 验证执行次数
}
// Flux 的惰性验证
@Test
void givenList_whenFluxNotSubscribed_thenNoResponse() {
    Function<Integer, Integer> mockMapper = mock(Function.class);
    when(mockMapper.apply(anyInt())).thenAnswer(i -> i.getArgument(0));
    List<Integer> numbers = List.of(1, 2, 3, 4, 5);
    Flux<Integer> flux = Flux.fromIterable(numbers)
      .map(mockMapper);

    verifyNoInteractions(mockMapper); // 无订阅时不执行

    StepVerifier.create(flux) // 订阅触发执行
      .expectNextCount(5)
      .verifyComplete();

    verify(mockMapper, times(5)).apply(anyInt()); // 验证执行次数
}

5. Stream 与 Flux.fromIterable() 的核心差异

5.1. 同步 vs 异步

Stream 本质同步:所有操作在调用终端操作的线程上顺序执行。处理大数据集或耗时任务时会阻塞线程,除非显式使用 parallelStream()(但仍是阻塞的)。

Flux 原生支持异步:默认同步(类似顺序流),但通过 subscribeOn()publishOn() 等操作符可轻松实现非阻塞异步处理:

// Flux 异步行为演示
@Test
void givenList_whenProcessingTakesLongerThanEmission_thenEmittedBeforeProcessing() {
    VirtualTimeScheduler.set(VirtualTimeScheduler.create());

    List<Integer> numbers = List.of(1, 2, 3, 4, 5);
    Flux<Integer> sourceFlux = Flux.fromIterable(numbers)
      .delayElements(Duration.ofMillis(500)); // 每500ms发射一个元素

    Flux<Integer> processedFlux = sourceFlux.flatMap(n ->
      Flux.just(n * n)
        .delayElements(Duration.ofSeconds(1)) // 处理延迟1秒
    );

    StepVerifier.withVirtualTime(() -> Flux.merge(sourceFlux, processedFlux))
      .expectSubscription()
      .expectNoEvent(Duration.ofMillis(500))
      .thenAwait(Duration.ofMillis(500 * 5)) // 2500ms后所有元素发射完成
      .expectNextCount(7) // 已发射5个元素+处理完成2个
      .thenAwait(Duration.ofMillis(5000)) // 再等5秒
      .expectNextCount(3) // 剩余3个处理完成
      .verifyComplete();
}

⚠️ 关键点:发射器不会等待处理器完成,两者独立运行。

5.2. 异常处理机制

Stream 遇到异常立即终止:处理过程中抛出异常会导致整个管道停止,后续元素不再处理:

@Test
void givenList_whenDividedByZeroInStream_thenThrowException() {
    List<Integer> numbers = List.of(1, 2, 0, 4, 5);
    assertThrows(ArithmeticException.class, () -> numbers.stream()
      .map(n -> 10 / n) // 遇到0时抛异常
      .toList()); // 4和5不会被处理
}

Flux 将错误视为数据:通过独立错误通道传播异常,提供 onErrorResume() 等操作符优雅处理:

@Test
void givenList_whenDividedByZeroInFlux_thenReturnFallbackValue() {
    List<Integer> numbers = List.of(1, 2, 0, 4, 5);
    Flux<Integer> flux = Flux.fromIterable(numbers)
      .map(n -> 10 / n)
      .onErrorResume(e -> Flux.just(-1)); // 异常时返回-1

    StepVerifier.create(flux)
      .expectNext(10, 5, -1) // 正常处理1,2,异常时返回-1
      .verifyComplete();
}

✅ 优势:Flux 能优雅处理网络故障、数据库错误等现实场景。

5.3. 单次使用 vs 多订阅

Stream 只能消费一次:触发终端操作后流即关闭,无法重用:

@Test
void givenStream_whenReused_thenThrowException() {
    List<Integer> numbers = List.of(1, 2, 3, 4, 5);
    Stream<Integer> doubleStream = numbers.stream()
      .map(n -> n * 2);

    assertEquals(List.of(2, 4, 6, 8, 10), doubleStream.toList()); // 首次使用正常

    assertThrows(IllegalStateException.class, doubleStream::toList); // 重用抛异常
}

Flux 支持多订阅者:同一数据源可被多个消费者独立处理:

@Test
void givenFlux_whenMultipleSubscribers_thenEachReceivesData() {
    List<Integer> numbers = List.of(1, 2, 3, 4, 5);
    Flux<Integer> flux = Flux.fromIterable(numbers).map(n -> n * 2);
    
    // 第一个订阅者
    StepVerifier.create(flux)
      .expectNext(2, 4, 6, 8, 10)
      .verifyComplete();

    // 第二个订阅者(无异常)
    StepVerifier.create(flux)
      .expectNext(2, 4, 6, 8, 10)
      .verifyComplete();
}

5.4. 性能特征

Stream 优化内存处理

  • ✅ 单次遍历完成所有转换
  • ✅ 通过 parallelStream() 利用多核并行计算
  • ✅ 纯计算任务性能更优

Flux 面向响应式场景

  • ❌ 每个元素需包装成响应式信号(onNext/onError/onComplete)
  • ❌ 非阻塞模型在纯计算场景有额外开销
  • ✅ I/O 密集型任务表现更佳

6. 功能对比:Stream vs Flux.fromIterable()

特性 Java Stream Flux.fromIterable()
执行模型 拉取式(消费者主动请求数据) 推送式(生产者异步推送数据)
处理风格 函数式管道 函数式响应式事件驱动
同步/异步 同步(可用parallelStream并行) 异步(非阻塞多线程)
异常处理 无内置支持(需try-catch) 错误作为数据通过独立通道传播
多订阅者支持 ❌ 不支持(消费后即关闭) ✅ 支持多订阅者
适用场景 快速CPU密集型内存计算 异步非阻塞、错误恢复、批处理

7. 结论

Stream 和 Flux.fromIterable() 虽然表面相似,但设计哲学截然不同:

  • Stream 是同步内存处理的利器,简单粗暴地解决纯计算问题
  • Flux 则为异步响应式而生,在错误处理和多订阅场景下表现更佳

选择建议:

  • ✅ 优先用 Stream:处理纯内存数据,不需要异步或错误恢复
  • ✅ 优先用 Flux:涉及网络/数据库I/O,需要弹性错误处理或多订阅者场景

理解这些差异,能让你在实际项目中避开性能陷阱,用对工具。记住:没有银弹,只有适合场景的武器。


原始标题:Comparing Java Stream and Flux.fromIterable() | Baeldung