1. 简介
Reactor Core 是一个实现响应式编程模型的 Java 8 库。它构建在Reactive Streams规范之上,该规范是构建响应式应用的标准。
对于非响应式 Java 开发背景的程序员来说,转向响应式编程的学习曲线可能相当陡峭。当将其与 Java 8 的 Stream API 比较时,这种挑战会更加明显,因为它们容易被误认为是相同的高级抽象。
本文将尝试揭开这个范式的神秘面纱。我们将通过 Reactor 逐步深入,直到构建出如何编写响应式代码的完整图景,为后续系列中的高级文章奠定基础。
2. Reactive Streams 规范
在研究 Reactor 之前,我们先了解 Reactive Streams 规范。这是 Reactor 实现的标准,也是该库的基础。
本质上,Reactive Streams 是异步流处理的规范。
换句话说,这是一个事件被异步产生和消费的系统。想象一下金融应用每秒接收数千个股票更新流,并需要及时响应这些更新。
其主要目标之一是解决背压问题。如果生产者向消费者发送事件的速度超过了消费者的处理能力,最终消费者会被事件淹没,耗尽系统资源。
背压意味着消费者应该能够告诉生产者发送多少数据以防止这种情况,这正是规范所规定的内容。
3. Maven 依赖
开始前,让我们添加 reactor-core 和 logback-classic 的 Maven 依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.4</version>
</dependency>
我们还添加了 Logback 依赖。这是因为我们将记录 Reactor 的输出以更好地理解数据流。
4. 生成数据流
要让应用具备响应式能力,首先要能生成数据流。这可以是前面提到的股票更新示例。没有这些数据,我们就无从响应,因此这是逻辑上的第一步。
Reactive Core 提供了两种数据类型来实现这一点。
4.1. Flux
第一种方式是使用 *Flux*,它是一个可以发射 0..n 个元素的流。让我们创建一个简单的 Flux:
Flux<Integer> just = Flux.just(1, 2, 3, 4);
这里我们创建了一个包含四个元素的静态流。
4.2. Mono
第二种方式是使用 *Mono*,它是一个 0..1 个元素的流。让我们实例化一个 Mono:
Mono<Integer> just = Mono.just(1);
这看起来和 Flux 几乎完全相同,只是这次我们被限制为最多一个元素。
4.3. 为什么不只用 Flux?
在进一步实验前,值得强调为什么我们需要这两种数据类型。
首先要注意的是,Flux 和 Mono 都是 Reactive Streams 规范中 Publisher 接口的实现。两个类都符合规范要求,我们可以使用该接口替代它们:
Publisher<String> just = Mono.just("foo");
但了解这种基数(cardinality)是有用的。因为某些操作只对其中一种类型有意义,而且它更具表现力(想象仓库中的 findOne() 方法)。
5. 订阅数据流
现在我们了解了如何生成数据流的高级概念,需要订阅它才能使元素开始发射。
5.1. 收集元素
使用 subscribe() 方法收集流中的所有元素:
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(elements::add);
assertThat(elements).containsExactly(1, 2, 3, 4);
数据在我们订阅前不会开始流动。注意我们还添加了日志记录,这在分析幕后运行情况时很有帮助。
5.2. 元素流动过程
通过日志记录,我们可以可视化数据在流中的流动过程:
20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()
首先,所有操作都在主线程上运行。我们暂时不深入讨论并发问题,因为本文稍后会专门探讨。不过这确实简化了问题,因为我们可以按顺序处理所有内容。
现在让我们逐个分析记录的序列:
- onSubscribe() – 订阅流时调用
- request(unbounded) – 调用 subscribe 时,幕后会创建一个 *Subscription*。该订阅向流请求元素。这里默认为 unbounded(无限制),表示请求所有可用元素
- onNext() – 每个元素都会调用此方法
- onComplete() – 接收最后一个元素后调用。实际上还有 *onError()*,在出现异常时调用,但本例中没有
这是 Reactive Streams 规范中 Subscriber 接口定义的流程。实际上,我们在 onSubscribe() 调用中实例化的就是这个接口。subscribe() 是个有用的辅助方法,但为了更好地理解幕后情况,让我们直接提供一个 Subscriber 实现:
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
可以看到,上述流程中的每个阶段都映射到 Subscriber 实现的一个方法。Flux 只是提供了辅助方法来减少这种冗长代码。
5.3. 与 Java 8 Streams 的对比
这看起来可能类似于 Java 8 Stream 的 collect 操作:
List<Integer> collected = Stream.of(1, 2, 3, 4)
.collect(toList());
但实际并非如此。
核心区别在于响应式是推模型,而 Java 8 Streams 是拉模型。在响应式方法中,事件是随着到来被推送给订阅者的。
另一个需要注意的是,Streams 的终端操作符确实是终端的——它会拉取所有数据并返回结果。而在响应式中,我们可能有来自外部资源的无限流,可以临时添加和移除多个订阅者。我们还可以执行流合并、流节流和应用背压等操作,这些将在下文介绍。
6. 背压
接下来需要考虑的是背压。在之前的示例中,订阅者告诉生产者一次性推送所有元素。这可能会让订阅者不堪重负,耗尽其资源。
背压是指下游可以告诉上游减少数据发送量,防止自身被淹没。
我们可以修改 Subscriber 实现来应用背压。通过 request() 告诉上游每次只发送两个元素:
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
再次运行代码,会看到 request(2) 被调用,接着是两次 onNext() 调用,然后再次 *request(2)*:
23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()
本质上,这是响应式拉取背压。我们请求上游只推送特定数量的元素,并且只在准备好时才请求。
想象我们正在接收 Twitter 的推文流,那么由上游决定如何处理。如果推文不断到来但下游没有请求,上游可以选择:
- ✅ 丢弃项目
- ✅ 存储到缓冲区
- ✅ 采用其他策略
7. 操作数据流
我们还可以对流中的数据执行操作,根据需要响应事件。
7.1. 转换流中的数据
一个简单操作是应用转换。这里我们将流中的所有数字翻倍:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> {
LOGGER.debug("{}:{}", i, Thread.currentThread());
return i * 2;
})
.subscribe(elements::add);
map() 会在 onNext() 被调用时应用。
7.2. 合并两个流
通过合并另一个流可以让事情更有趣。使用 zip() 函数尝试:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");
这里我们创建了另一个 Flux,它持续递增 1,并与原始流一起流式传输。通过检查日志可以看到它们如何协同工作:
20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()
注意现在每个 Flux 都有一个订阅。onNext() 调用也是交替进行的,因此流中每个元素的索引在应用 zip() 函数时会匹配。
8. 热流
目前我们主要关注冷流——这些是静态的、固定长度的流,易于处理。响应式更现实的用例可能是无限发生的事件。
例如,我们可能需要持续响应鼠标移动流或 Twitter 信息流。这类流称为热流,因为它们始终运行,可以在任何时间点订阅,但会错过数据起始部分。
8.1. 创建 ConnectableFlux
创建热流的一种方法是将冷流转换为热流。创建一个永远运行的 Flux,将结果输出到控制台,模拟来自外部资源的无限数据流:
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.publish();
通过调用 *publish()*,我们得到一个 *ConnectableFlux*。这意味着调用 subscribe() 不会使其开始发射,允许我们添加多个订阅:
publish.subscribe(System.out::println);
publish.subscribe(System.out::println);
如果运行这段代码,什么也不会发生。直到调用 connect(),Flux 才会开始发射:
publish.connect();
8.2. 节流
如果运行代码,控制台会被日志淹没。这模拟了数据过多传递给消费者的场景。让我们尝试通过节流解决:
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.sample(ofSeconds(2))
.publish();
这里我们引入了间隔为 2 秒的 sample() 方法。现在值只会每 2 秒推送给订阅者一次,控制台输出会少得多。
当然,还有多种减少下游数据量的策略,如窗口和缓冲,但本文暂不涉及。
9. 并发
以上所有示例都在主线程上运行。但我们可以控制代码运行的线程。Scheduler 接口提供了异步代码的抽象,并提供了多种实现。让我们尝试在不同于主线程的线程上订阅:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
Parallel 调度器会使订阅在另一个线程上运行,通过日志可以证明。我们看到第一个条目来自 main 线程,而 Flux 在名为 parallel-1 的线程中运行:
20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()
并发比这更有趣,值得我们在另一篇文章中深入探讨。
10. 总结
本文对 Reactive Core 进行了端到端的高级概述。我们解释了如何发布和订阅流、应用背压、操作流以及异步处理数据。这应该为我们编写响应式应用奠定基础。
本系列后续文章将涵盖更高级的并发和其他响应式概念。还有另一篇介绍Reactor 与 Spring的文章。
我们的应用源代码可在 GitHub 上获取。