1. 引言
在分布式系统中,调用外部服务时保持低延迟是关键任务。本教程将演示如何结合使用OpenFeign和CompletableFuture
来:
- 并行执行多个HTTP请求
- 处理外部服务错误
- 设置网络和线程超时
2. 搭建演示应用
为演示并行请求功能,我们将创建一个电商购买场景:
- 根据客户国家获取可用支付方式
- 生成购买报告(不包含支付方式信息)
首先添加Spring Cloud OpenFeign依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
3. 创建外部服务客户端
使用@FeignClient
创建两个指向localhost:8083
的客户端:
支付方式客户端:
@FeignClient(name = "paymentMethodClient", url = "http://localhost:8083")
public interface PaymentMethodClient {
@RequestMapping(method = RequestMethod.GET, value = "/payment_methods")
String getAvailablePaymentMethods(@RequestParam(name = "site_id") String siteId);
}
报告生成客户端:
@FeignClient(name = "reportClient", url = "http://localhost:8083")
public interface ReportClient {
@RequestMapping(method = RequestMethod.POST, value = "/reports")
void sendReport(@RequestBody String reportRequest);
}
4. 创建并行请求执行器
4.1 实现并行执行器
顺序调用两个客户端会导致响应时间叠加(总时间≈请求1时间+请求2时间)。由于两个请求相互独立,我们可以并行执行:
@Service
public class PurchaseService {
private final PaymentMethodClient paymentMethodClient;
private final ReportClient reportClient;
// 全参构造器
public String executePurchase(String siteId) throws ExecutionException, InterruptedException {
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() ->
paymentMethodClient.getAvailablePaymentMethods(siteId));
CompletableFuture.runAsync(() -> reportClient.sendReport("Purchase Order Report"));
return String.format("Purchase executed with payment method %s", paymentMethodsFuture.get());
}
}
关键点:
- ✅
supplyAsync()
:用于有返回值的任务(获取支付方式) - ✅
runAsync()
:用于无返回值的任务(生成报告) - ⚠️
runAsync()
立即启动新线程,而supplyAsync()
可能受线程池调度影响
使用WireMock进行集成测试:
@BeforeEach
public void startWireMockServer() {
wireMockServer = new WireMockServer(8083);
configureFor("localhost", 8083);
wireMockServer.start();
stubFor(post(urlEqualTo("/reports"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value())));
}
@AfterEach
public void stopWireMockServer() {
wireMockServer.stop();
}
@Test
void givenRestCalls_whenBothReturnsOk_thenReturnCorrectResult() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value()).withBody("credit_card")));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method credit_card", result);
}
4.2 使用exceptionally()处理外部API错误
当支付方式接口返回404(无可用支付方式)时,可通过exceptionally()
返回默认值:
CompletableFuture <String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.exceptionally(ex -> {
if (ex.getCause() instanceof FeignException &&
((FeignException) ex.getCause()).status() == 404) {
return "cash";
}
throw new RuntimeException(ex); // 其他异常继续抛出
});
测试用例:
@Test
void givenRestCalls_whenPurchaseReturns404_thenReturnDefault() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.NOT_FOUND.value())));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method cash", result);
}
5. 为并行任务和网络请求添加超时
5.1 添加Feign客户端网络超时
网络超时针对单个请求,在TCP层面切断连接。在application.properties
中配置:
feign.client.config.paymentMethodClient.readTimeout=200
feign.client.config.paymentMethodClient.connectTimeout=100
超时类型:
connectTimeout
:TCP握手超时(毫秒)readTimeout
:数据读取超时(毫秒)
在异常处理器中捕获网络超时:
.exceptionally(ex -> {
if (ex.getCause() instanceof RetryableException) {
throw new RuntimeException("REST call network timeout!");
}
// 其他异常处理...
})
测试网络超时场景:
@Test
void givenRestCalls_whenPurchaseRequestWebTimeout_thenReturnDefault() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(250))); // 模拟250ms延迟
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: REST call network timeout!", error.getMessage());
}
5.2 添加线程超时
线程超时控制整个CompletableFuture
生命周期(包括重试尝试):
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.orTimeout(400, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
throw new RuntimeException("Thread timeout!", ex);
}
// 其他异常处理...
});
测试线程超时场景:
@Test
void givenRestCalls_whenPurchaseCompletableFutureTimeout_thenThrowNewException() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(450))); // 超过线程超时阈值
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: Thread timeout!", error.getMessage());
}
6. 总结
通过本教程,我们掌握了:
- ✅ 使用
CompletableFuture
与Feign Client实现并行HTTP调用 - ✅ 通过
exceptionally()
优雅处理404和超时异常 - ✅ 区分网络超时(单次请求)和线程超时(整个任务生命周期)
- ⚠️ 超时配置的常见踩坑点:
- 网络超时仅影响单次请求
- 线程超时包含重试总耗时
- 需在异常处理器中区分不同超时类型
这种并行调用模式特别适合处理独立的外部依赖,能有效降低总响应时间。