1. 简介

本文将探讨使用 Spring 5 WebClient 限制每秒请求数的几种方法。虽然我们通常希望利用其非阻塞特性,但某些场景可能需要我们添加延迟。我们将结合 Project Reactor 的特性来控制向服务器发送的请求流。

2. 初始设置

典型的限流场景是避免压垮服务器。 此外,某些 Web 服务会限制每小时的请求数量,还有些服务会限制每个客户端的并发请求数。

2.1 编写简单 Web 服务

我们先创建一个返回随机数的 @RestController

@RestController
@RequestMapping("/random")
public class RandomController {

    @GetMapping
    Integer getRandom() {
        return new Random().nextInt(50);
    }
}

接下来我们将模拟耗时操作并限制并发请求数。

2.2 服务器限流

修改服务以模拟更真实的场景:

首先限制服务器能处理的并发请求数,超限时抛出异常。

其次添加响应延迟,模拟耗时操作。 虽然有更健壮的解决方案,但这里仅作演示:

public class Concurrency {

    public static final int MAX_CONCURRENT = 5;
    static final Map<String, AtomicInteger> CONCURRENT_REQUESTS = new HashMap<>();

    public static int protect(IntSupplier supplier) {
        try {
            if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
                throw new UnsupportedOperationException("max concurrent requests reached");
            }

            TimeUnit.SECONDS.sleep(2);
            return supplier.getAsInt();
        } finally {
            CONCURRENT_REQUESTS.decrementAndGet();
        }
    }
}

最后修改接口使用该保护机制:

@GetMapping
Integer getRandom() {
    return Concurrency.protect(() -> new Random().nextInt(50));
}

现在当并发请求数超过 MAX_CONCURRENT 时,接口会拒绝处理并返回错误。

2.3 编写简单客户端

所有示例都遵循这个模式:生成 n 个请求的 Flux 并向服务发送 GET 请求:

Flux.range(1, n)
  .flatMap(i -> {
    // GET request
  });

为减少样板代码,封装可复用的请求方法:

public interface RandomConsumer {

    static <T> Mono<T> get(WebClient client) {
        return client.get()
          .retrieve()
          .bodyToMono(new ParameterizedTypeReference<T>() {});
    }
}

现在可以开始探索解决方案了。

3. 使用 zipWith(Flux.interval()) 延迟

第一个方案通过 zipWith() 添加固定延迟:

public class ZipWithInterval {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .zipWith(Flux.interval(Duration.ofMillis(delay)))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

注意:此延迟在发送请求前生效。

4. 使用 Flux.delayElements() 延迟

Flux 提供了更直接的延迟方式:

public class DelayElements {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .delayElements(Duration.ofMillis(delay))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

delayElements() 直接作用于 Subscriber.onNext() 信号。 即延迟 Flux.range() 的每个元素,导致 flatMap 中的函数延迟执行。例如 delay=1000 时,请求会延迟一秒启动。

4.1 方案调整

若延迟不足会触发错误:

@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
    int delay = 100;

    int requests = 10;
    assertThrows(InternalServerError.class, () -> {
      DelayElements.fetch(client, requests, delay)
        .blockLast();
    });
}

原因: 每个请求间隔 100ms,但服务器处理耗时 2 秒,很快达到并发限制导致 500 错误。

增加延迟可规避限制,但会导致不必要的等待。 过度等待可能严重影响性能,接下来探讨更合适的方案。

5. 使用 flatMap() 控制并发

针对服务限制,最佳方案是限制并发请求数不超过 Concurrency.MAX_CONCURRENT通过 flatMap() 的并发参数实现:

public class LimitConcurrency {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency) {
        return Flux.range(1, requests)
          .flatMap(i -> RandomConsumer.get(client), concurrency);
    }
}

该参数确保并发请求数不超过 concurrency,且避免不必要的延迟:

@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
    int limit = Concurrency.MAX_CONCURRENT;

    int requests = 10;
    assertDoesNotThrow(() -> {
      LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
        .blockLast();
    });
}

根据场景和偏好,还有其他方案值得探讨。

6. 使用 Resilience4j RateLimiter

Resilience4j 是处理应用容错的强大库。我们将用它限制区间内的并发请求数并添加超时。

添加依赖:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

构建限流器:

public class Resilience4jRateLimit {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency, int interval) {
        RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
          .limitRefreshPeriod(Duration.ofMillis(interval))
          .limitForPeriod(concurrency)
          .timeoutDuration(Duration.ofMillis(interval * concurrency))
          .build());

        // ...
    }
}

通过 transformDeferred() 应用到 Flux:

return Flux.range(1, requests)
  .flatMap(i -> RandomConsumer.get(client)
    .transformDeferred(RateLimiterOperator.of(limiter))
  );

⚠️ 注意:若 interval 设置过低仍可能出问题,但该方案便于与其他操作共享限流配置。

7. 使用 Guava 实现精确限流

Guava 的通用限流器非常适合此场景。它采用令牌桶算法,仅在必要时阻塞,不像 delayElements() 每次都延迟。

添加依赖:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.2.1-jre</version>
</dependency>

使用方式:

public class GuavaRateLimit {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int requestsPerSecond) {
        RateLimiter limiter = RateLimiter.create(requestsPerSecond);

        return Flux.range(1, requests)
          .flatMap(i -> {
            limiter.acquire();

            return RandomConsumer.get(client);
          });
    }
}

该方案简洁高效:不会造成不必要的阻塞。 例如,若某请求意外耗时较长,只要在 requestsPerSecond 范围内,下一个请求无需等待即可执行。

8. 总结

本文探讨了 WebClient 限流的多种方案。通过模拟受限服务,我们观察了不同方案对代码和测试的影响。结合 Project Reactor 和第三方库,我们实现了相同目标的不同实现。

完整代码可在 GitHub 获取。


原始标题:Limiting the Requests per Second With WebClient | Baeldung