1. 概述

本文将探讨如何为 CompletableFuture 对象添加重试逻辑。首先会介绍如何包装在 CompletableFuture 内部的任务实现重试,随后利用 CompletableFuture API 创建链式调用,在异步任务异常时自动重新执行。

2. 重试任务本身

实现任务重试最简单的方式是采用 装饰器模式,通过传统的面向对象方式实现。但更简洁的做法是利用高阶函数进行函数式编程。

核心思路是创建一个函数,接收 Supplier<T> 和最大重试次数作为参数,通过循环和异常捕获实现重试逻辑,最后返回新的 Supplier<T>

static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
    return () -> {
        int retries = 0;
    while (retries < maxRetries) {
        try {
            return supplier.get();
        } catch (Exception e) {
            retries++;
        }
        }
    throw new IllegalStateException(String.format("Task failed after %s attempts", maxRetries));
    };
}

✅ 可优化点:支持指定重试异常类型或添加重试延迟

基于此函数装饰器创建 CompletableFuture

static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
    Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
    return CompletableFuture.supplyAsync(retryableSupplier);
}

测试验证:
准备一个测试方法,前四次抛异常,第五次返回结果:

AtomicInteger retriesCounter = new AtomicInteger(0);

@BeforeEach
void beforeEach() {
    retriesCounter.set(0);
}

int failFourTimesThenReturn(int returnValue) {
    int retryNr = retriesCounter.get();
    if (retryNr < 4) {
        retriesCounter.set(retryNr + 1);
    throw new RuntimeException();
    }
    return returnValue;
}

✅ 成功重试测试:

@Test
void whenRetryingTask_thenReturnsCorrectlyAfterFourInvocations() {
    Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);

    CompletableFuture<Integer> result = retryTask(codeToRun, 10);

    assertThat(result.join()).isEqualTo(100);
    assertThat(retriesCounter).hasValue(4);
}

❌ 重试次数耗尽测试:

@Test
void whenRetryingTask_thenThrowsExceptionAfterThreeInvocations() {
    Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);

    CompletableFuture<Integer> result = retryTask(codeToRun, 3);

    assertThatThrownBy(result::join)
      .isInstanceOf(CompletionException.class)
      .hasMessageContaining("IllegalStateException: Task failed after 3 attempts");
}

3. 重试 CompletableFuture 本身

CompletableFuture API 提供了内置的异常处理机制,可直接使用 exceptionally() 等方法替代自定义函数装饰器。

3.1. 不安全的重试

exceptionally() 方法允许指定异常时的回退函数。通过链式调用可多次重试:

static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
    return CompletableFuture.supplyAsync(supplier)
      .exceptionally(__ -> supplier.get())
      .exceptionally(__ -> supplier.get());
}

优化为支持动态重试次数:

static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}

⚠️ 严重缺陷: 当初始任务快速完成时,后续重试会在主线程执行,破坏异步性!

验证代码(添加延迟和线程日志):

static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);  
    sleep(100l); // 模拟处理延迟
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}

输出日志:

invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: main  // 糟糕!主线程阻塞
invocation: 2, thread: main
invocation: 3, thread: main
invocation: 4, thread: main

3.2. 异步重试

Java 12+ 提供的 exceptionallyAsync() 可确保所有重试都在异步线程执行:

static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
   CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
   for (int i = 0; i < maxRetries; i++) {
      cf = cf.exceptionallyAsync(__ -> supplier.get());
   }
   return cf;
}

验证日志(所有任务都在线程池执行):

invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: ForkJoinPool.commonPool-worker-1
invocation: 2, thread: ForkJoinPool.commonPool-worker-1
invocation: 3, thread: ForkJoinPool.commonPool-worker-2
invocation: 4, thread: ForkJoinPool.commonPool-worker-2

3.3. 嵌套 CompletableFuture(Java 8 兼容方案)

对于 Java 8 环境,可通过嵌套 CompletableFuture 实现完全异步重试。 关键步骤:

  1. 将初始结果包装为 CompletableFuture<CompletableFuture<T>>
  2. 使用 exceptionally() 提供异步回退
  3. thenCompose() 扁平化结果
static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.thenApply(CompletableFuture::completedFuture)
      .exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
      .thenCompose(Function.identity());
    }
    return cf;
}

4. 总结

我们探索了在 CompletableFuture 中实现任务重试的三种方案:

  1. 函数装饰器:直接重试任务本身,实现简单但缺乏灵活性
  2. 内置 API
    • exceptionally():存在线程阻塞风险 ❌
    • exceptionallyAsync()(Java 12+):完美异步重试 ✅
  3. 嵌套方案:Java 8 兼容的异步重试实现 ✅

💡 实际开发建议:

  • Java 12+ 直接用 exceptionallyAsync()
  • 老版本采用嵌套方案
  • 复杂场景考虑专业重试库(如 Resilience4j)

完整代码示例见 GitHub 仓库


原始标题:Retry Logic with CompletableFuture