1. 引言

在开发软件功能时,一个常见需求是从不同数据源获取信息并聚合到响应中。在微服务架构中,这些数据源通常是外部 REST 接口。

本文将介绍如何使用 Java 的 CompletableFuture 高效并行调用多个外部 REST 接口,避免串行调用导致的性能瓶颈。

2. 为什么在 REST 调用中使用并行

假设需要更新对象的多个字段,每个字段值来自不同的 REST 接口调用。传统做法是串行调用每个接口:

graph LR
    A[开始] --> B[调用接口1]
    B --> C[等待响应]
    C --> D[调用接口2]
    D --> E[等待响应]
    E --> F[返回结果]

⚠️ 这种方式的问题:

  • 总响应时间 = 所有接口调用时间之和
  • 例如两个各需 5 秒的接口,总耗时至少 10 秒

✅ 并行调用的优势:

  • 总响应时间 = 最慢接口的耗时
  • 例如 7 秒和 5 秒的接口,总耗时仅需 7 秒
  • 显著提升服务响应速度和可扩展性

3. 使用 CompletableFuture 实现并行

CompletableFuture 是组合并行任务、处理错误的利器。下面演示如何为输入列表中的每个对象并行执行三个 REST 调用。

3.1 创建演示应用

首先定义需要更新的 POJO 类:

public class Purchase {
    String orderDescription;
    String paymentDescription;
    String buyerName;
    String orderId;
    String paymentId;
    String userId;

    // 全参构造器、getter 和 setter
}

创建 REST 调用执行器类:

@Component
public class PurchaseRestCallsAsyncExecutor {
    RestTemplate restTemplate;
    static final String BASE_URL = "https://internal-api.com";

    // 全参构造器
}

定义三个 REST 接口调用方法:

public String getOrderDescription(String orderId) {
    ResponseEntity<String> result = restTemplate.getForEntity(
        String.format("%s/orders/%s", BASE_URL, orderId), String.class);
    return result.getBody();
}

public String getPaymentDescription(String paymentId) {
    ResponseEntity<String> result = restTemplate.getForEntity(
        String.format("%s/payments/%s", BASE_URL, paymentId), String.class);
    return result.getBody();
}

public String getUserName(String userId) {
    ResponseEntity<String> result = restTemplate.getForEntity(
        String.format("%s/users/%s", BASE_URL, userId), String.class);
    return result.getBody();
}

3.2 使用 CompletableFuture 进行多个 REST 调用

核心方法:构建并执行三个并行任务:

public void updatePurchase(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
            .thenAccept(purchase::setOrderDescription),
        CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
            .thenAccept(purchase::setPaymentDescription),
        CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
            .thenAccept(purchase::setBuyerName)
    ).join();
}

关键点解析:

  • supplyAsync() 异步执行 REST 调用
  • thenAccept() 处理结果并更新对象字段
  • allOf() 组合所有任务
  • join() 触发并行执行并等待所有结果

⚠️ 性能优化要点:

  • join() 是阻塞操作,只在最后调用一次
  • 避免在任务中间阻塞,减少线程阻塞次数
  • 未指定线程池时默认使用 ForkJoinPool.commonPool()

💡 最佳实践:建议自定义 ExecutorService 传递给 supplyAsync(),以便更好地控制线程池参数。

3.3 为列表中的每个元素执行多个 REST 调用

批量处理列表数据:

public void updatePurchases(List<Purchase> purchases) {
    purchases.forEach(this::updatePurchase);
}

执行流程:

  1. 遍历 Purchase 列表
  2. 为每个对象创建独立的 CompletableFuture
  3. 每个对象内部并行执行三个 REST 调用
  4. 所有对象并行处理

4. 错误处理

分布式系统中,服务不可用或网络故障是常态。必须妥善处理这些异常情况。

4.1 使用 handle() 优雅处理错误

handle() 方法可捕获前序步骤的异常:

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

实战示例:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
            .thenAccept(purchase::setPaymentDescription)
            .handle((result, exception) -> {
                if (exception != null) {
                    // 处理异常
                    return null;
                }
                return result;
            })
    ).join();
}

处理逻辑:

  • thenAccept() 返回 Void 类型
  • exception 参数捕获所有异常
  • 通过 if 判断进行异常处理
  • 无异常时返回原结果

4.2 处理 REST 调用超时

为任务添加超时控制:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
            .thenAccept(purchase::setOrderDescription)
            .orTimeout(5, TimeUnit.SECONDS)  // 5秒超时
            .handle((result, exception) -> {
                if (exception instanceof TimeoutException) {
                    // 处理超时异常
                    return null;
                }
                return result;
            })
    ).join();
}

关键改进:

  • orTimeout(5, TimeUnit.SECONDS) 设置超时
  • 超时后抛出 TimeoutException
  • handle() 中单独处理超时异常

✅ 超时控制的重要性:

  • 避免线程永久阻塞
  • 防止线程长期处于 RUNNING 状态
  • 提升应用健壮性

5. 总结

在分布式系统中,组合多个 REST 接口调用是常见需求。本文通过实战演示了:

  1. ✅ 使用 CompletableFuture 实现并行 REST 调用
  2. ✅ 批量处理列表数据时的并行优化
  3. ✅ 通过 handle() 优雅处理异常
  4. ✅ 使用 orTimeout() 防止任务无限等待

核心优势:

  • 显著降低响应时间(最慢接口耗时 vs 接口耗时总和)
  • 提升服务吞吐量和可扩展性
  • 增强系统健壮性(完善的异常和超时处理)

完整源码可在 GitHub 获取。


原始标题:How to Make Multiple REST Calls in CompletableFuture | Baeldung