2. 1. 什么是Flux?
Spring WebFlux是一个响应式Web框架,为构建异步非阻塞应用提供了强大功能。其核心特性之一就是处理Flux的能力。Flux是一个响应式数据流,可以发射零到多个元素。它可以从多种来源创建,比如数据库查询、网络调用或内存集合。
在响应式编程中,订阅(subscription)是一个关键概念,它表示数据源(发布者)和数据消费者(订阅者)之间的连接。订阅维护一个状态,反映订阅是否活跃。它可以用来取消订阅,从而停止Flux的数据发射并释放发布者持有的资源。常见的取消场景包括用户主动取消请求或发生超时等情况。
3. 2. 取消Flux的好处
在响应式Spring WebFlux中,取消正在进行的Flux对系统资源高效利用和防止内存泄漏至关重要。主要原因包括:
- 背压(Backpressure):响应式编程使用背压来调节发布者和订阅者之间的数据流。如果订阅者处理速度跟不上发布者,未取消的订阅会持续产生数据,导致背压堆积,可能引发内存泄漏。⚠️
- 资源管理:Flux可能占用内存、CPU和网络连接等系统资源。取消订阅能释放这些资源,使其可用于其他任务。✅
- 性能提升:提前终止订阅可避免不必要的处理,缩短响应时间,提升整体系统性能。✅
4. 3. Maven依赖
我们以传感器数据流为例,演示如何使用WebFlux提供的多种选项取消Flux。首先添加以下关键依赖:
-
spring-boot-starter-webflux
:包含构建响应式Web应用所需的所有依赖,包括响应式编程的Reactor库和默认的Netty服务器。 -
reactor-spring
:提供与Spring框架集成的Reactor项目模块。 -
reactor-test
:为响应式流提供测试支持。
在POM中声明依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-spring</artifactId>
<version>${reactor-spring.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
5. 4. 在WebFlux中取消Flux
Spring WebFlux支持通过dispose()
显式取消,或通过特定操作符隐式取消(这些操作符内部会调用Subscription
的cancel()
方法):
-
takeUntil()
-
takeWhile()
-
take(long n)
-
take(Duration n)
这些操作符本质上都会调用订阅者onSubscribe()
方法中传入的Subscription
对象的cancel()
方法。下面逐一分析。
5.1. 4.1 使用takeUntil()
取消
假设传感器数据流在遇到值8时停止发射:
@Test
void givenOngoingFlux_whentakeUntil_thenFluxCancels() {
Flux<Integer> sensorData = Flux.range(1, 10);
List<Integer> result = new ArrayList<>();
sensorData.takeUntil(reading -> reading == 8)
.subscribe(result::add);
assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7, 8);
}
5.2. 4.2 使用takeWhile()
取消
当传感器读数小于8时持续发射数据,使用takeWhile()
操作符:
@Test
void givenOngoingFlux_whentakeWhile_thenFluxCancels() {
List<Integer> result = new ArrayList<>();
Flux<Integer> sensorData = Flux.range(1, 10)
.takeWhile(reading -> reading < 8)
.doOnNext(result::add);
sensorData.subscribe();
assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7);
}
takeWhile()
根据谓词判断:谓词为true时发射数据,一旦返回false立即取消订阅。这里用doOnNext()
收集数据到列表。
5.3. 4.3 使用take(long n)
取消
take()
操作符可限制从无限序列中获取的元素数量。示例从1到Integer.MAX_VALUE的Flux中取前10个:
@Test
void givenOngoingFlux_whentake_thenFluxCancels() {
Flux<Integer> sensorData = Flux.range(1, Integer.MAX_VALUE);
List<Integer> result = new ArrayList<>();
sensorData.take(10)
.subscribe(result::add);
Assertions.assertThat(result)
.containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
订阅在发射10个元素后自动取消,结果列表验证了这一点。
5.4. 4.4 使用take(Duration d)
取消
当超过指定时间后不再需要数据时,可基于时长取消。示例创建每2秒发射一次的Flux,但只接收3秒内的数据:
@Test
void givenAnOnGoingFlux_whenTimeout_thenCancelsFlux() {
Flux<Integer> sensorData = Flux.interval(Duration.ZERO, Duration.ofSeconds(2))
.map(i -> i.intValue() + 10)
.take(5);
Flux<Integer> canceledByTimeout = sensorData.take(Duration.ofSeconds(3));
StepVerifier.create(canceledByTimeout)
.expectNext(10, 11)
.expectComplete()
.verify();
}
interval()
每2秒发射一个值(从0开始),map()
将其加10,take(5)
限制最多5个元素。take(Duration.ofSeconds(3))
创建新Flux,只接收3秒内的数据(即前两个值10和11)。
使用StepVerifier
验证行为:期望接收10和11后流完成。注意verify()
内部会调用subscribe()
,无需显式订阅。
5.5. 4.5 使用dispose()
显式取消
通过Disposable
接口的dispose()
方法显式取消订阅:
@Test
void giveAnOnGoingFlux_whenDispose_thenCancelsFluxExplicitly() throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1));
AtomicInteger count = new AtomicInteger(0);
Disposable disposable = flux.subscribe(i -> {
System.out.println("Received: " + i);
count.incrementAndGet();
}, e -> System.err.println("Error: " + e.getMessage())
);
Thread.sleep(5000);
System.out.println("Will Dispose the Flux Next");
disposable.dispose();
if(disposable.isDisposed()) {
System.out.println("Flux Disposed");
}
assertEquals(4, count.get());
}
线程休眠5秒后调用dispose()
取消订阅。由于元素每秒发射一个,5秒内应接收4个元素(第5个元素在取消时尚未发射)。
6. 5. 取消后的清理
取消订阅不会自动释放关联资源,需手动清理。使用doOnCancel()
和doFinally()
执行清理操作:
@Test
void givenAFluxIsCanceled_whenDoOnCancelAndDoFinally_thenMessagePrinted() throws InterruptedException {
List<Integer> result = new ArrayList<>();
PrintStream mockPrintStream = mock(PrintStream.class);
System.setOut(mockPrintStream);
Flux<Integer> sensorData = Flux.interval(Duration.ofMillis(100))
.doOnCancel(() -> System.out.println("Flux Canceled"))
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL) {
System.out.println("Flux Completed due to Cancelation");
} else {
System.out.println("Flux Completed due to Completion or Error");
}
})
.map(i -> ThreadLocalRandom.current().nextInt(1, 1001))
.doOnNext(result::add);
Disposable subscription = sensorData.subscribe();
Thread.sleep(1000);
subscription.dispose();
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockPrintStream, times(2)).println(captor.capture());
assertThat(captor.getAllValues()).contains("Flux Canceled", "Flux Completed due to Cancelation");
}
doOnCancel()
在取消时触发,doFinally()
在终止(无论取消、完成或错误)时触发。实际场景中可在此关闭连接等资源。测试验证了取消时打印了预期消息。
7. 6. 总结
本文介绍了WebFlux中取消Flux的多种方法:从Flux基础概念讲起,分析了取消的必要性,详细演示了显式和隐式取消技术,并说明了取消后的资源清理。掌握这些技巧能帮你构建更健壮的响应式应用,避免资源泄漏和性能问题。
完整代码示例可在GitHub获取。