1. 概述

本指南全面介绍 Java 8 并发 API 引入的 CompletableFuture 类的功能与典型应用场景。

2. Java 中的异步计算

异步计算通常难以处理。理想情况下,我们希望将计算视为一系列步骤,但在异步场景中,回调逻辑往往分散在代码各处或深度嵌套。当需要处理步骤中的错误时,情况会更复杂。

Java 5 引入的 Future 接口虽能表示异步计算结果,但缺乏组合计算或处理错误的能力

Java 8 推出了 CompletableFuture。它不仅实现了 Future 接口,还实现了 CompletionStage 接口。该接口定义了异步计算步骤的契约,支持与其他步骤组合。

CompletableFuture 既是基础组件也是框架,提供约 50 种方法用于组合、执行异步计算步骤和处理错误

庞大的 API 可能令人望而生畏,但它们主要适用于以下几类清晰的使用场景。

3. 将 CompletableFuture 作为简单 Future 使用

CompletableFuture 实现了 Future 接口,因此可将其作为增强版 Future 使用,并附加完成逻辑。

例如,可通过无参构造器创建实例表示未来结果,传递给消费者,并在未来某个时刻通过 complete 方法完成。消费者可用 get 方法阻塞当前线程直至结果就绪。

下例中,方法创建 CompletableFuture 实例,在另一个线程启动计算并立即返回 Future。计算完成后,通过 complete 方法提供结果:

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

计算通过 Executor API 启动。这种创建和完成 CompletableFuture 的方式适用于任何并发机制或 API,包括原生线程。

注意 calculateAsync 方法返回 Future 实例。调用该方法后,获取 Future 实例,在需要阻塞获取结果时调用 get 方法。

get 方法会抛出受检异常:ExecutionException(封装计算期间发生的异常)和 InterruptedException(表示线程在活动前后被中断):

Future<String> completableFuture = calculateAsync();

// ... 

String result = completableFuture.get();
assertEquals("Hello", result);

若已知计算结果,可使用静态方法 completedFuture 传入结果。此时 Futureget 方法永不阻塞,立即返回结果:

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

另一种场景是取消 Future 执行

4. 封装计算逻辑的 CompletableFuture

上述代码允许选择任意并发执行机制,但若想跳过模板代码直接异步执行逻辑呢?

静态方法 runAsyncsupplyAsync 允许分别基于 RunnableSupplier 函数式接口创建 CompletableFuture 实例。

RunnableSupplier 是函数式接口,得益于 Java 8 新特性,可将其作为 lambda 表达式传递。

Runnable 是线程中使用的传统接口,不返回值。Supplier 是通用函数式接口,其唯一方法无参数且返回参数化类型的值。

这使我们能Supplier 实例作为执行计算并返回结果的 lambda 表达式,用法如下:

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5. 处理异步计算结果

处理计算结果的最通用方式是将其传递给函数。thenApply 方法正是为此设计:它接收 Function 实例,用其处理结果,并返回持有函数返回值的 Future

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

若无需在 Future 链中传递值,可使用 Consumer 函数式接口实例。其唯一方法接收参数且返回 void

CompletableFuture 提供了 thenAccept 方法:接收 Consumer 并传入计算结果。最终的 future.get() 调用返回 Void 类型实例:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

最后,若既不需要计算值也不想在链尾返回值,可将 Runnable lambda 传递给 thenRun 方法。下例在调用 future.get() 后简单打印一行:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();

6. 组合 Future

CompletableFuture API 最强大的功能是将多个 CompletableFuture 实例组合成计算步骤链

组合结果本身也是 CompletableFuture,支持进一步链式组合。这种模式在函数式语言中很常见,称为单子设计模式

**下例使用 thenCompose 方法顺序链式组合两个 Future**。注意该方法接收返回 CompletableFuture 实例的函数,函数参数是前一步的计算结果,允许在下一个 CompletableFuture 的 lambda 中使用该值:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

thenComposethenApply 共同实现了单子模式的基础构件。它们与 Java 8 中的 StreamOptional 类的 mapflatMap 方法密切相关。

两方法均接收函数并应用于计算结果,但 thenCompose(即 flatMap接收返回同类型对象的函数。这种函数结构允许将类实例作为基础构件组合。

若要执行两个独立的 Future 并处理其结果,可使用 thenCombine 方法:接收一个 Future 和一个双参数 Function 处理两个结果:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

更简单的场景是:只需处理两个 Future 的结果,无需在链中传递值。此时可使用 thenAcceptBoth 方法:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

7. thenApply()thenCompose() 的区别

前文展示了 thenApply()thenCompose() 的示例。两 API 均用于链式调用 CompletableFuture,但用法不同。

7.1 thenApply()

此方法用于处理前次调用的结果。关键点在于返回类型是所有调用的组合结果。

因此当需要转换 CompletableFuture 调用的结果时,此方法很有用:

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

7.2 thenCompose()

thenCompose()thenApply() 类似,均返回新的 CompletionStage。但 thenCompose() 将前一阶段作为参数。它会扁平化处理并直接返回结果的 Future,而非像 thenApply() 那样返回嵌套的 Future

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

若目标是链式调用 CompletableFuture 方法,优先使用 thenCompose()

注意:两方法的区别类似于 [map()flatMap() 的区别`](/java-difference-map-and-flatmap)。

8. 并行运行多个 Future

当需要并行执行多个 Future 时,通常希望等待所有任务完成,再处理组合结果。

静态方法 CompletableFuture.allOf 允许等待所有提供的 Future 完成(可变参数):

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

注意 CompletableFuture.allOf() 返回 CompletableFuture<Void>。此方法的局限是不返回所有 Future 的组合结果,需手动获取结果。幸运的是,CompletableFuture.join() 方法和 Java 8 Streams API 可简化操作:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

CompletableFuture.join() 方法与 get 类似,但在 Future 未正常完成时抛出非受检异常。这使其可在 Stream.map() 方法中作为方法引用使用。

9. 错误处理

在异步计算步骤链中处理错误时,需采用类似 throw/catch 的机制。

CompletableFuture 类通过特殊的 handle 方法处理异常,而非语法块捕获。该方法接收两个参数:计算结果(成功完成时)和抛出的异常(某步骤未正常完成时)。

下例使用 handle 方法在异步计算问候语出错时(因未提供名称)提供默认值:

String name = null;

// ...

CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

另一种场景:想像首例那样手动完成 Future,但也能用异常完成。此时可用 completeExceptionally 方法。下例中的 completableFuture.get() 抛出 ExecutionException,其原因为 RuntimeException

CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
  new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

上例中,可通过 handle 方法异步处理异常,但使用 get 方法时,可采用更典型的同步异常处理方式。

10. Async 方法

CompletableFuture 类的流式 API 中,大多数方法都有带 Async 后缀的两种变体。这些方法通常用于在另一个线程中执行相应步骤

Async 后缀的方法使用调用线程执行下一阶段。带 Async 但无 Executor 参数的方法使用通用的 fork/join 池实现(通过 ForkJoinPool.commonPool() 访问),前提是并行度 > 1。带 Executor 参数的 Async 方法使用传入的 Executor 执行步骤。

下例修改为用 Function 实例处理计算结果。唯一可见差异是 thenApplyAsync 方法,但底层将函数应用包装到 ForkJoinTask 实例中(详见 Fork/Join 框架指南)。这能进一步并行化计算,更高效利用系统资源:

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

Java 9 通过以下改进增强了 CompletableFuture API:

✅ 新增工厂方法
✅ 支持延迟和超时
✅ 改进子类化支持

新增实例 API:

Executor defaultExecutor()
CompletableFuture<U> newIncompleteFuture()
CompletableFuture<T> copy()
CompletionStage<T> minimalCompletionStage()
CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

新增静态工具方法:

Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
Executor delayedExecutor(long delay, TimeUnit unit)
<U> CompletionStage<U> completedStage(U value)
<U> CompletionStage<U> failedStage(Throwable ex)
<U> CompletableFuture<U> failedFuture(Throwable ex)

为解决超时问题,Java 9 新增两个函数:

orTimeout()
completeOnTimeout()

详情可阅读:Java 9 CompletableFuture API 改进

12. 总结

本文介绍了 CompletableFuture 类的方法和典型使用场景。

文章源码可在 GitHub 获取。