1. 引言
在开发软件功能时,一个常见需求是从不同数据源获取信息并聚合到响应中。在微服务架构中,这些数据源通常是外部 REST 接口。
本文将介绍如何使用 Java 的 CompletableFuture
高效并行调用多个外部 REST 接口,避免串行调用导致的性能瓶颈。
2. 为什么在 REST 调用中使用并行
假设需要更新对象的多个字段,每个字段值来自不同的 REST 接口调用。传统做法是串行调用每个接口:
graph LR
A[开始] --> B[调用接口1]
B --> C[等待响应]
C --> D[调用接口2]
D --> E[等待响应]
E --> F[返回结果]
⚠️ 这种方式的问题:
- 总响应时间 = 所有接口调用时间之和
- 例如两个各需 5 秒的接口,总耗时至少 10 秒
✅ 并行调用的优势:
- 总响应时间 = 最慢接口的耗时
- 例如 7 秒和 5 秒的接口,总耗时仅需 7 秒
- 显著提升服务响应速度和可扩展性
3. 使用 CompletableFuture 实现并行
CompletableFuture
是组合并行任务、处理错误的利器。下面演示如何为输入列表中的每个对象并行执行三个 REST 调用。
3.1 创建演示应用
首先定义需要更新的 POJO 类:
public class Purchase {
String orderDescription;
String paymentDescription;
String buyerName;
String orderId;
String paymentId;
String userId;
// 全参构造器、getter 和 setter
}
创建 REST 调用执行器类:
@Component
public class PurchaseRestCallsAsyncExecutor {
RestTemplate restTemplate;
static final String BASE_URL = "https://internal-api.com";
// 全参构造器
}
定义三个 REST 接口调用方法:
public String getOrderDescription(String orderId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/orders/%s", BASE_URL, orderId), String.class);
return result.getBody();
}
public String getPaymentDescription(String paymentId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/payments/%s", BASE_URL, paymentId), String.class);
return result.getBody();
}
public String getUserName(String userId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/users/%s", BASE_URL, userId), String.class);
return result.getBody();
}
3.2 使用 CompletableFuture 进行多个 REST 调用
核心方法:构建并执行三个并行任务:
public void updatePurchase(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription),
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription),
CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
.thenAccept(purchase::setBuyerName)
).join();
}
关键点解析:
supplyAsync()
异步执行 REST 调用thenAccept()
处理结果并更新对象字段allOf()
组合所有任务join()
触发并行执行并等待所有结果
⚠️ 性能优化要点:
join()
是阻塞操作,只在最后调用一次- 避免在任务中间阻塞,减少线程阻塞次数
- 未指定线程池时默认使用
ForkJoinPool.commonPool()
💡 最佳实践:建议自定义 ExecutorService
传递给 supplyAsync()
,以便更好地控制线程池参数。
3.3 为列表中的每个元素执行多个 REST 调用
批量处理列表数据:
public void updatePurchases(List<Purchase> purchases) {
purchases.forEach(this::updatePurchase);
}
执行流程:
- 遍历
Purchase
列表 - 为每个对象创建独立的
CompletableFuture
- 每个对象内部并行执行三个 REST 调用
- 所有对象并行处理
4. 错误处理
分布式系统中,服务不可用或网络故障是常态。必须妥善处理这些异常情况。
4.1 使用 handle() 优雅处理错误
handle()
方法可捕获前序步骤的异常:
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
实战示例:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription)
.handle((result, exception) -> {
if (exception != null) {
// 处理异常
return null;
}
return result;
})
).join();
}
处理逻辑:
thenAccept()
返回Void
类型exception
参数捕获所有异常- 通过
if
判断进行异常处理 - 无异常时返回原结果
4.2 处理 REST 调用超时
为任务添加超时控制:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription)
.orTimeout(5, TimeUnit.SECONDS) // 5秒超时
.handle((result, exception) -> {
if (exception instanceof TimeoutException) {
// 处理超时异常
return null;
}
return result;
})
).join();
}
关键改进:
orTimeout(5, TimeUnit.SECONDS)
设置超时- 超时后抛出
TimeoutException
- 在
handle()
中单独处理超时异常
✅ 超时控制的重要性:
- 避免线程永久阻塞
- 防止线程长期处于
RUNNING
状态 - 提升应用健壮性
5. 总结
在分布式系统中,组合多个 REST 接口调用是常见需求。本文通过实战演示了:
- ✅ 使用
CompletableFuture
实现并行 REST 调用 - ✅ 批量处理列表数据时的并行优化
- ✅ 通过
handle()
优雅处理异常 - ✅ 使用
orTimeout()
防止任务无限等待
核心优势:
- 显著降低响应时间(最慢接口耗时 vs 接口耗时总和)
- 提升服务吞吐量和可扩展性
- 增强系统健壮性(完善的异常和超时处理)
完整源码可在 GitHub 获取。