1. 概述
本文将探讨如何为 CompletableFuture
对象添加重试逻辑。首先会介绍如何包装在 CompletableFuture
内部的任务实现重试,随后利用 CompletableFuture
API 创建链式调用,在异步任务异常时自动重新执行。
2. 重试任务本身
实现任务重试最简单的方式是采用 装饰器模式,通过传统的面向对象方式实现。但更简洁的做法是利用高阶函数进行函数式编程。
核心思路是创建一个函数,接收 Supplier<T>
和最大重试次数作为参数,通过循环和异常捕获实现重试逻辑,最后返回新的 Supplier<T>
:
static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
return () -> {
int retries = 0;
while (retries < maxRetries) {
try {
return supplier.get();
} catch (Exception e) {
retries++;
}
}
throw new IllegalStateException(String.format("Task failed after %s attempts", maxRetries));
};
}
✅ 可优化点:支持指定重试异常类型或添加重试延迟
基于此函数装饰器创建 CompletableFuture
:
static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
return CompletableFuture.supplyAsync(retryableSupplier);
}
测试验证:
准备一个测试方法,前四次抛异常,第五次返回结果:
AtomicInteger retriesCounter = new AtomicInteger(0);
@BeforeEach
void beforeEach() {
retriesCounter.set(0);
}
int failFourTimesThenReturn(int returnValue) {
int retryNr = retriesCounter.get();
if (retryNr < 4) {
retriesCounter.set(retryNr + 1);
throw new RuntimeException();
}
return returnValue;
}
✅ 成功重试测试:
@Test
void whenRetryingTask_thenReturnsCorrectlyAfterFourInvocations() {
Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);
CompletableFuture<Integer> result = retryTask(codeToRun, 10);
assertThat(result.join()).isEqualTo(100);
assertThat(retriesCounter).hasValue(4);
}
❌ 重试次数耗尽测试:
@Test
void whenRetryingTask_thenThrowsExceptionAfterThreeInvocations() {
Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);
CompletableFuture<Integer> result = retryTask(codeToRun, 3);
assertThatThrownBy(result::join)
.isInstanceOf(CompletionException.class)
.hasMessageContaining("IllegalStateException: Task failed after 3 attempts");
}
3. 重试 CompletableFuture 本身
CompletableFuture
API 提供了内置的异常处理机制,可直接使用 exceptionally()
等方法替代自定义函数装饰器。
3.1. 不安全的重试
exceptionally()
方法允许指定异常时的回退函数。通过链式调用可多次重试:
static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier)
.exceptionally(__ -> supplier.get())
.exceptionally(__ -> supplier.get());
}
优化为支持动态重试次数:
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
⚠️ 严重缺陷: 当初始任务快速完成时,后续重试会在主线程执行,破坏异步性!
验证代码(添加延迟和线程日志):
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
sleep(100l); // 模拟处理延迟
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
输出日志:
invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: main // 糟糕!主线程阻塞
invocation: 2, thread: main
invocation: 3, thread: main
invocation: 4, thread: main
3.2. 异步重试
Java 12+ 提供的 exceptionallyAsync()
可确保所有重试都在异步线程执行:
static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionallyAsync(__ -> supplier.get());
}
return cf;
}
验证日志(所有任务都在线程池执行):
invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: ForkJoinPool.commonPool-worker-1
invocation: 2, thread: ForkJoinPool.commonPool-worker-1
invocation: 3, thread: ForkJoinPool.commonPool-worker-2
invocation: 4, thread: ForkJoinPool.commonPool-worker-2
3.3. 嵌套 CompletableFuture(Java 8 兼容方案)
对于 Java 8 环境,可通过嵌套 CompletableFuture
实现完全异步重试。 关键步骤:
- 将初始结果包装为
CompletableFuture<CompletableFuture<T>>
- 使用
exceptionally()
提供异步回退 - 用
thenCompose()
扁平化结果
static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.thenApply(CompletableFuture::completedFuture)
.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
.thenCompose(Function.identity());
}
return cf;
}
4. 总结
我们探索了在 CompletableFuture
中实现任务重试的三种方案:
- 函数装饰器:直接重试任务本身,实现简单但缺乏灵活性
- 内置 API:
exceptionally()
:存在线程阻塞风险 ❌exceptionallyAsync()
(Java 12+):完美异步重试 ✅
- 嵌套方案:Java 8 兼容的异步重试实现 ✅
💡 实际开发建议:
- Java 12+ 直接用
exceptionallyAsync()
- 老版本采用嵌套方案
- 复杂场景考虑专业重试库(如 Resilience4j)
完整代码示例见 GitHub 仓库。