2. 1. 什么是Flux?

Spring WebFlux是一个响应式Web框架,为构建异步非阻塞应用提供了强大功能。其核心特性之一就是处理Flux的能力。Flux是一个响应式数据流,可以发射零到多个元素。它可以从多种来源创建,比如数据库查询、网络调用或内存集合。

在响应式编程中,订阅(subscription)是一个关键概念,它表示数据源(发布者)和数据消费者(订阅者)之间的连接。订阅维护一个状态,反映订阅是否活跃。它可以用来取消订阅,从而停止Flux的数据发射并释放发布者持有的资源。常见的取消场景包括用户主动取消请求或发生超时等情况。

3. 2. 取消Flux的好处

在响应式Spring WebFlux中,取消正在进行的Flux对系统资源高效利用和防止内存泄漏至关重要。主要原因包括:

  • 背压(Backpressure):响应式编程使用背压来调节发布者和订阅者之间的数据流。如果订阅者处理速度跟不上发布者,未取消的订阅会持续产生数据,导致背压堆积,可能引发内存泄漏。⚠️
  • 资源管理:Flux可能占用内存、CPU和网络连接等系统资源。取消订阅能释放这些资源,使其可用于其他任务。✅
  • 性能提升提前终止订阅可避免不必要的处理,缩短响应时间,提升整体系统性能。✅

4. 3. Maven依赖

我们以传感器数据流为例,演示如何使用WebFlux提供的多种选项取消Flux。首先添加以下关键依赖:

  • spring-boot-starter-webflux:包含构建响应式Web应用所需的所有依赖,包括响应式编程的Reactor库和默认的Netty服务器。
  • reactor-spring:提供与Spring框架集成的Reactor项目模块。
  • reactor-test:为响应式流提供测试支持。

在POM中声明依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectreactor</groupId>
        <artifactId>reactor-spring</artifactId>
        <version>${reactor-spring.version}</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5. 4. 在WebFlux中取消Flux

Spring WebFlux支持通过dispose()显式取消,或通过特定操作符隐式取消(这些操作符内部会调用Subscriptioncancel()方法):

  • takeUntil()
  • takeWhile()
  • take(long n)
  • take(Duration n)

这些操作符本质上都会调用订阅者onSubscribe()方法中传入的Subscription对象的cancel()方法。下面逐一分析。

5.1. 4.1 使用takeUntil()取消

假设传感器数据流在遇到值8时停止发射:

@Test
void givenOngoingFlux_whentakeUntil_thenFluxCancels() {
    Flux<Integer> sensorData = Flux.range(1, 10);
    List<Integer> result = new ArrayList<>();

    sensorData.takeUntil(reading -> reading == 8)
      .subscribe(result::add);
    assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7, 8);
}

5.2. 4.2 使用takeWhile()取消

当传感器读数小于8时持续发射数据,使用takeWhile()操作符:

@Test
void givenOngoingFlux_whentakeWhile_thenFluxCancels() {
    List<Integer> result = new ArrayList<>();
    Flux<Integer> sensorData = Flux.range(1, 10)
      .takeWhile(reading -> reading < 8)
      .doOnNext(result::add);

    sensorData.subscribe();
    assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7);
}

takeWhile()根据谓词判断:谓词为true时发射数据,一旦返回false立即取消订阅。这里用doOnNext()收集数据到列表。

5.3. 4.3 使用take(long n)取消

take()操作符可限制从无限序列中获取的元素数量。示例从1到Integer.MAX_VALUE的Flux中取前10个:

@Test
void givenOngoingFlux_whentake_thenFluxCancels() {
    Flux<Integer> sensorData = Flux.range(1, Integer.MAX_VALUE);
    List<Integer> result = new ArrayList<>();

    sensorData.take(10)
      .subscribe(result::add);
    Assertions.assertThat(result)
      .containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

订阅在发射10个元素后自动取消,结果列表验证了这一点

5.4. 4.4 使用take(Duration d)取消

当超过指定时间后不再需要数据时,可基于时长取消。示例创建每2秒发射一次的Flux,但只接收3秒内的数据:

@Test
void givenAnOnGoingFlux_whenTimeout_thenCancelsFlux() {
    Flux<Integer> sensorData = Flux.interval(Duration.ZERO, Duration.ofSeconds(2))
      .map(i -> i.intValue() + 10)
      .take(5);

    Flux<Integer> canceledByTimeout = sensorData.take(Duration.ofSeconds(3));

    StepVerifier.create(canceledByTimeout)
      .expectNext(10, 11)
      .expectComplete()
      .verify();
}

interval()每2秒发射一个值(从0开始),map()将其加10,take(5)限制最多5个元素。take(Duration.ofSeconds(3))创建新Flux,只接收3秒内的数据(即前两个值10和11)

使用StepVerifier验证行为:期望接收10和11后流完成。注意verify()内部会调用subscribe(),无需显式订阅。

5.5. 4.5 使用dispose()显式取消

通过Disposable接口的dispose()方法显式取消订阅:

@Test
void giveAnOnGoingFlux_whenDispose_thenCancelsFluxExplicitly() throws InterruptedException {
    Flux<Integer> flux = Flux.range(1, 10)
      .delayElements(Duration.ofSeconds(1));

    AtomicInteger count = new AtomicInteger(0);
    Disposable disposable = flux.subscribe(i -> {
        System.out.println("Received: " + i);
        count.incrementAndGet();
    }, e -> System.err.println("Error: " + e.getMessage())
    );

    Thread.sleep(5000);
    System.out.println("Will Dispose the Flux Next");
    disposable.dispose();
    if(disposable.isDisposed()) {
        System.out.println("Flux Disposed");
    }
    assertEquals(4, count.get());
}

线程休眠5秒后调用dispose()取消订阅。由于元素每秒发射一个,5秒内应接收4个元素(第5个元素在取消时尚未发射)。

6. 5. 取消后的清理

取消订阅不会自动释放关联资源,需手动清理。使用doOnCancel()doFinally()执行清理操作:

@Test
void givenAFluxIsCanceled_whenDoOnCancelAndDoFinally_thenMessagePrinted() throws InterruptedException {

    List<Integer> result = new ArrayList<>();
    PrintStream mockPrintStream = mock(PrintStream.class);
    System.setOut(mockPrintStream);

    Flux<Integer> sensorData = Flux.interval(Duration.ofMillis(100))
      .doOnCancel(() -> System.out.println("Flux Canceled"))
      .doFinally(signalType -> {
          if (signalType == SignalType.CANCEL) {
              System.out.println("Flux Completed due to Cancelation");
          } else {
              System.out.println("Flux Completed due to Completion or Error");
          }
      })
      .map(i -> ThreadLocalRandom.current().nextInt(1, 1001))
      .doOnNext(result::add);

    Disposable subscription = sensorData.subscribe();

    Thread.sleep(1000);
    subscription.dispose();

    ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
    Mockito.verify(mockPrintStream, times(2)).println(captor.capture());

    assertThat(captor.getAllValues()).contains("Flux Canceled", "Flux Completed due to Cancelation");
}

doOnCancel()在取消时触发,doFinally()在终止(无论取消、完成或错误)时触发。实际场景中可在此关闭连接等资源。测试验证了取消时打印了预期消息。

7. 6. 总结

本文介绍了WebFlux中取消Flux的多种方法:从Flux基础概念讲起,分析了取消的必要性,详细演示了显式和隐式取消技术,并说明了取消后的资源清理。掌握这些技巧能帮你构建更健壮的响应式应用,避免资源泄漏和性能问题。

完整代码示例可在GitHub获取。


原始标题:Cancel an Ongoing Flux in Spring WebFlux | Baeldung