1. 概述
在本教程中,我们将基于 Project Reactor 的基础 知识,学习几种创建 Flux 序列的技术。
2. Maven 依赖
首先,我们引入两个必要的依赖项:*reactor-core* 和 reactor-test:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.6.0</version>
<scope>test</scope>
</dependency>
3. 同步生成序列
创建 Flux 最简单的方式是使用 Flux#generate 方法。该方法依赖一个生成函数来产生序列中的元素。
我们先定义一个类来演示 generate 方法的使用方式:
public class SequenceGenerator {
// 后续方法将在此类中定义
}
3.1. 使用新状态的生成器
来看一个使用 Reactor 生成 斐波那契数列 的例子:
public Flux<Integer> generateFibonacciWithTuples() {
return Flux.generate(
() -> Tuples.of(0, 1),
(state, sink) -> {
sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
}
);
}
这个 generate 方法接收两个函数参数:一个 Callable 和一个 BiFunction:
- Callable 用于初始化生成器状态,这里我们使用一个 Tuples 来保存初始值 0 和 1。
- BiFunction 是生成器函数,它接收一个 SynchronousSink,在每次调用时通过
sink.next()
发出一个元素,并返回下一个状态。
顾名思义,SynchronousSink 是同步的。⚠️注意:每次调用生成器时,不能多次调用 sink.next()
。
我们使用 StepVerifier 验证生成的序列:
@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);
StepVerifier.create(fibonacciFlux)
.expectNext(0, 1, 1, 2, 3)
.expectComplete()
.verify();
}
在这个例子中,订阅者只请求 5 个元素,因此序列在数字 3 处结束。
✅注意:每次调用生成器都会返回一个新的状态对象供下一轮使用,但这不是必须的。我们也可以复用同一个状态实例。
3.2. 使用可变状态的生成器
假设我们希望复用状态对象来生成斐波那契数列。首先定义一个状态类:
public class FibonacciState {
private int former;
private int latter;
// 构造函数、getter 和 setter 省略
}
我们将使用这个类的实例来维护生成器的状态。两个字段 former 和 latter 分别表示序列中的前一个和当前数字。
修改后的生成器如下:
public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
return Flux.generate(
() -> new FibonacciState(0, 1),
(state, sink) -> {
sink.next(state.getFormer());
if (state.getLatter() > limit) {
sink.complete();
}
int temp = state.getFormer();
state.setFormer(state.getLatter());
state.setLatter(temp + state.getLatter());
return state;
});
}
和之前的例子类似,这个 generate 变体 接收状态提供者和生成器函数。
不同的是,这里的状态对象是复用的。⚠️为了避免无限序列,我们在这里设置了当值超过限制时调用 sink.complete()
。
测试代码如下:
@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
.expectNext(0, 1, 1, 2, 3, 5, 8)
.expectComplete()
.verify();
}
3.3. 无状态变体
generate 方法还有一个 仅接收一个 Consumer
4. 异步生成序列
同步生成并不是唯一方式,我们还可以使用 create 和 push 操作符,以异步方式生成多个元素。
4.1. 使用 create 方法
使用 create 方法,我们可以在多个线程中生成元素。下面的例子展示了如何从两个不同源收集元素:
public class SequenceCreator {
public Consumer<List<Integer>> consumer;
public Flux<Integer> createNumberSequence() {
return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
}
}
与 generate 不同的是:
- create 方法不维护状态。
- 它通过外部源提供元素,而不是自己生成。
- 它使用 FluxSink 而不是 SynchronousSink,✅可以多次调用
next()
。
我们使用一个模拟的 consumer
字段来接收数据源,实际场景可能是某个可观察的 API。
测试代码如下:
@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();
SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
() -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
() -> sequenceCreator.consumer.accept(sequence2)
);
List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);
producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();
assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);
}
由于是异步操作,元素的顺序是不确定的。
create 方法还有一个 接收 OverflowStrategy 参数的变体,用于处理背压。默认情况下,会缓存所有元素。
4.2. 使用 push 方法
除了 create,Flux 还提供了 push 方法。它与 create 类似,但只允许一个线程发出信号。
如果我们把前面例子中的 create 替换为 push,代码仍可编译,但在多线程环境下可能会抛出异常。❌*因此,如果确定只有一个线程生产数据,才使用 push。*
5. 处理序列
前面的方法都是静态的,用于从给定源创建序列。而 Flux 还提供了一个实例方法 handle,用于处理发布者产生的序列。
handle 操作符可以对序列进行处理,甚至过滤元素。✅它结合了 map 和 filter 的功能。
示例代码如下:
public class SequenceHandler {
public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
return sequence.handle((number, sink) -> {
if (number % 2 == 0) {
sink.next(number / 2);
}
});
}
}
在这个例子中,只有偶数会被处理,奇数会被忽略。
⚠️和 generate 一样,handle 使用 SynchronousSink,每次只能发出一个元素。
测试代码如下:
@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
SequenceHandler sequenceHandler = new SequenceHandler();
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);
StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
.expectNext(0, 1, 4, 17)
.expectComplete()
.verify();
}
前 10 个斐波那契数中,偶数为 0, 2, 8, 34,因此 expectNext
的参数是它们除以 2 后的结果。
6. 总结
本文介绍了使用 Flux API 以编程方式生成序列的各种方法,重点讲解了 generate 和 create 操作符。
示例代码可在 GitHub 获取。这是一个 Maven 项目,可直接运行。