1. 简介
本文将探讨使用 Spring 5 WebClient 限制每秒请求数的几种方法。虽然我们通常希望利用其非阻塞特性,但某些场景可能需要我们添加延迟。我们将结合 Project Reactor 的特性来控制向服务器发送的请求流。
2. 初始设置
典型的限流场景是避免压垮服务器。 此外,某些 Web 服务会限制每小时的请求数量,还有些服务会限制每个客户端的并发请求数。
2.1 编写简单 Web 服务
我们先创建一个返回随机数的 @RestController
:
@RestController
@RequestMapping("/random")
public class RandomController {
@GetMapping
Integer getRandom() {
return new Random().nextInt(50);
}
}
接下来我们将模拟耗时操作并限制并发请求数。
2.2 服务器限流
修改服务以模拟更真实的场景:
首先限制服务器能处理的并发请求数,超限时抛出异常。
其次添加响应延迟,模拟耗时操作。 虽然有更健壮的解决方案,但这里仅作演示:
public class Concurrency {
public static final int MAX_CONCURRENT = 5;
static final Map<String, AtomicInteger> CONCURRENT_REQUESTS = new HashMap<>();
public static int protect(IntSupplier supplier) {
try {
if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
throw new UnsupportedOperationException("max concurrent requests reached");
}
TimeUnit.SECONDS.sleep(2);
return supplier.getAsInt();
} finally {
CONCURRENT_REQUESTS.decrementAndGet();
}
}
}
最后修改接口使用该保护机制:
@GetMapping
Integer getRandom() {
return Concurrency.protect(() -> new Random().nextInt(50));
}
现在当并发请求数超过 MAX_CONCURRENT 时,接口会拒绝处理并返回错误。
2.3 编写简单客户端
所有示例都遵循这个模式:生成 n 个请求的 Flux 并向服务发送 GET 请求:
Flux.range(1, n)
.flatMap(i -> {
// GET request
});
为减少样板代码,封装可复用的请求方法:
public interface RandomConsumer {
static <T> Mono<T> get(WebClient client) {
return client.get()
.retrieve()
.bodyToMono(new ParameterizedTypeReference<T>() {});
}
}
现在可以开始探索解决方案了。
3. 使用 zipWith(Flux.interval()) 延迟
第一个方案通过 zipWith()
添加固定延迟:
public class ZipWithInterval {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.zipWith(Flux.interval(Duration.ofMillis(delay)))
.flatMap(i -> RandomConsumer.get(client));
}
}
注意:此延迟在发送请求前生效。
4. 使用 Flux.delayElements() 延迟
Flux 提供了更直接的延迟方式:
public class DelayElements {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.delayElements(Duration.ofMillis(delay))
.flatMap(i -> RandomConsumer.get(client));
}
}
delayElements() 直接作用于 Subscriber.onNext() 信号。 即延迟 Flux.range() 的每个元素,导致 flatMap 中的函数延迟执行。例如 delay=1000 时,请求会延迟一秒启动。
4.1 方案调整
若延迟不足会触发错误:
@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
int delay = 100;
int requests = 10;
assertThrows(InternalServerError.class, () -> {
DelayElements.fetch(client, requests, delay)
.blockLast();
});
}
原因: 每个请求间隔 100ms,但服务器处理耗时 2 秒,很快达到并发限制导致 500 错误。
增加延迟可规避限制,但会导致不必要的等待。 过度等待可能严重影响性能,接下来探讨更合适的方案。
5. 使用 flatMap() 控制并发
针对服务限制,最佳方案是限制并发请求数不超过 Concurrency.MAX_CONCURRENT
。通过 flatMap() 的并发参数实现:
public class LimitConcurrency {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency) {
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client), concurrency);
}
}
该参数确保并发请求数不超过 concurrency,且避免不必要的延迟:
@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
int limit = Concurrency.MAX_CONCURRENT;
int requests = 10;
assertDoesNotThrow(() -> {
LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
.blockLast();
});
}
根据场景和偏好,还有其他方案值得探讨。
6. 使用 Resilience4j RateLimiter
Resilience4j 是处理应用容错的强大库。我们将用它限制区间内的并发请求数并添加超时。
添加依赖:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>1.7.1</version>
</dependency>
构建限流器:
public class Resilience4jRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency, int interval) {
RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(interval))
.limitForPeriod(concurrency)
.timeoutDuration(Duration.ofMillis(interval * concurrency))
.build());
// ...
}
}
通过 transformDeferred() 应用到 Flux:
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client)
.transformDeferred(RateLimiterOperator.of(limiter))
);
⚠️ 注意:若 interval 设置过低仍可能出问题,但该方案便于与其他操作共享限流配置。
7. 使用 Guava 实现精确限流
Guava 的通用限流器非常适合此场景。它采用令牌桶算法,仅在必要时阻塞,不像 delayElements() 每次都延迟。
添加依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.1-jre</version>
</dependency>
使用方式:
public class GuavaRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int requestsPerSecond) {
RateLimiter limiter = RateLimiter.create(requestsPerSecond);
return Flux.range(1, requests)
.flatMap(i -> {
limiter.acquire();
return RandomConsumer.get(client);
});
}
}
该方案简洁高效:不会造成不必要的阻塞。 例如,若某请求意外耗时较长,只要在 requestsPerSecond 范围内,下一个请求无需等待即可执行。
8. 总结
本文探讨了 WebClient 限流的多种方案。通过模拟受限服务,我们观察了不同方案对代码和测试的影响。结合 Project Reactor 和第三方库,我们实现了相同目标的不同实现。
完整代码可在 GitHub 获取。