1. 概述

在本教程中,我们将基于 Project Reactor 的基础 知识,学习几种创建 Flux 序列的技术。

2. Maven 依赖

首先,我们引入两个必要的依赖项:*reactor-core* 和 reactor-test

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.0</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.6.0</version>
    <scope>test</scope>
</dependency>

3. 同步生成序列

创建 Flux 最简单的方式是使用 Flux#generate 方法。该方法依赖一个生成函数来产生序列中的元素。

我们先定义一个类来演示 generate 方法的使用方式:

public class SequenceGenerator {
    // 后续方法将在此类中定义
}

3.1. 使用新状态的生成器

来看一个使用 Reactor 生成 斐波那契数列 的例子:

public Flux<Integer> generateFibonacciWithTuples() {
    return Flux.generate(
            () -> Tuples.of(0, 1),
            (state, sink) -> {
                sink.next(state.getT1());
                return Tuples.of(state.getT2(), state.getT1() + state.getT2());
            }
    );
}

这个 generate 方法接收两个函数参数:一个 Callable 和一个 BiFunction

  • Callable 用于初始化生成器状态,这里我们使用一个 Tuples 来保存初始值 01
  • BiFunction 是生成器函数,它接收一个 SynchronousSink,在每次调用时通过 sink.next() 发出一个元素,并返回下一个状态。

顾名思义,SynchronousSink 是同步的。⚠️注意:每次调用生成器时,不能多次调用 sink.next()

我们使用 StepVerifier 验证生成的序列:

@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);

    StepVerifier.create(fibonacciFlux)
      .expectNext(0, 1, 1, 2, 3)
      .expectComplete()
      .verify();
}

在这个例子中,订阅者只请求 5 个元素,因此序列在数字 3 处结束。

注意:每次调用生成器都会返回一个新的状态对象供下一轮使用,但这不是必须的。我们也可以复用同一个状态实例。

3.2. 使用可变状态的生成器

假设我们希望复用状态对象来生成斐波那契数列。首先定义一个状态类:

public class FibonacciState {
    private int former;
    private int latter;

    // 构造函数、getter 和 setter 省略
}

我们将使用这个类的实例来维护生成器的状态。两个字段 formerlatter 分别表示序列中的前一个和当前数字。

修改后的生成器如下:

public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
    return Flux.generate(
      () -> new FibonacciState(0, 1),
      (state, sink) -> {
        sink.next(state.getFormer());
        if (state.getLatter() > limit) {
            sink.complete();
        }
        int temp = state.getFormer();
        state.setFormer(state.getLatter());
        state.setLatter(temp + state.getLatter());
        return state;
    });
}

和之前的例子类似,这个 generate 变体 接收状态提供者和生成器函数。

不同的是,这里的状态对象是复用的。⚠️为了避免无限序列,我们在这里设置了当值超过限制时调用 sink.complete()

测试代码如下:

@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();

    StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
      .expectNext(0, 1, 1, 2, 3, 5, 8)
      .expectComplete()
      .verify();
}

3.3. 无状态变体

generate 方法还有一个 仅接收一个 Consumer 参数的变体。它只适用于生成预定义的序列,功能有限,这里不再详述。

4. 异步生成序列

同步生成并不是唯一方式,我们还可以使用 createpush 操作符,以异步方式生成多个元素。

4.1. 使用 create 方法

使用 create 方法,我们可以在多个线程中生成元素。下面的例子展示了如何从两个不同源收集元素:

public class SequenceCreator {
    public Consumer<List<Integer>> consumer;

    public Flux<Integer> createNumberSequence() {
        return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
    }
}

generate 不同的是:

  • create 方法不维护状态。
  • 它通过外部源提供元素,而不是自己生成。
  • 它使用 FluxSink 而不是 SynchronousSink,✅可以多次调用 next()

我们使用一个模拟的 consumer 字段来接收数据源,实际场景可能是某个可观察的 API。

测试代码如下:

@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
    List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();

    SequenceCreator sequenceCreator = new SequenceCreator();
    Thread producingThread1 = new Thread(
      () -> sequenceCreator.consumer.accept(sequence1)
    );
    Thread producingThread2 = new Thread(
      () -> sequenceCreator.consumer.accept(sequence2)
    );

    List<Integer> consolidated = new ArrayList<>();
    sequenceCreator.createNumberSequence().subscribe(consolidated::add);

    producingThread1.start();
    producingThread2.start();
    producingThread1.join();
    producingThread2.join();

    assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);
}

由于是异步操作,元素的顺序是不确定的。

create 方法还有一个 接收 OverflowStrategy 参数的变体,用于处理背压。默认情况下,会缓存所有元素。

4.2. 使用 push 方法

除了 createFlux 还提供了 push 方法。它与 create 类似,但只允许一个线程发出信号。

如果我们把前面例子中的 create 替换为 push,代码仍可编译,但在多线程环境下可能会抛出异常。❌*因此,如果确定只有一个线程生产数据,才使用 push*

5. 处理序列

前面的方法都是静态的,用于从给定源创建序列。Flux 还提供了一个实例方法 handle,用于处理发布者产生的序列。

handle 操作符可以对序列进行处理,甚至过滤元素。✅它结合了 mapfilter 的功能。

示例代码如下:

public class SequenceHandler {
    public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
        return sequence.handle((number, sink) -> {
            if (number % 2 == 0) {
                sink.next(number / 2);
            }
        });
    }
}

在这个例子中,只有偶数会被处理,奇数会被忽略。

⚠️和 generate 一样,handle 使用 SynchronousSink,每次只能发出一个元素。

测试代码如下:

@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
    SequenceHandler sequenceHandler = new SequenceHandler();
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);

    StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
      .expectNext(0, 1, 4, 17)
      .expectComplete()
      .verify();
}

前 10 个斐波那契数中,偶数为 0, 2, 8, 34,因此 expectNext 的参数是它们除以 2 后的结果。

6. 总结

本文介绍了使用 Flux API 以编程方式生成序列的各种方法,重点讲解了 generatecreate 操作符。

示例代码可在 GitHub 获取。这是一个 Maven 项目,可直接运行。


原始标题:Creating Sequences with Project Reactor | Baeldung