1. 概述
本指南全面介绍 Java 8 并发 API 引入的 CompletableFuture
类的功能与典型应用场景。
2. Java 中的异步计算
异步计算通常难以处理。理想情况下,我们希望将计算视为一系列步骤,但在异步场景中,回调逻辑往往分散在代码各处或深度嵌套。当需要处理步骤中的错误时,情况会更复杂。
Java 5 引入的 Future
接口虽能表示异步计算结果,但缺乏组合计算或处理错误的能力。
Java 8 推出了 CompletableFuture
类。它不仅实现了 Future
接口,还实现了 CompletionStage
接口。该接口定义了异步计算步骤的契约,支持与其他步骤组合。
CompletableFuture
既是基础组件也是框架,提供约 50 种方法用于组合、执行异步计算步骤和处理错误。
庞大的 API 可能令人望而生畏,但它们主要适用于以下几类清晰的使用场景。
3. 将 CompletableFuture
作为简单 Future
使用
CompletableFuture
实现了 Future
接口,因此可将其作为增强版 Future
使用,并附加完成逻辑。
例如,可通过无参构造器创建实例表示未来结果,传递给消费者,并在未来某个时刻通过 complete
方法完成。消费者可用 get
方法阻塞当前线程直至结果就绪。
下例中,方法创建 CompletableFuture
实例,在另一个线程启动计算并立即返回 Future
。计算完成后,通过 complete
方法提供结果:
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
计算通过 Executor
API 启动。这种创建和完成 CompletableFuture
的方式适用于任何并发机制或 API,包括原生线程。
注意 calculateAsync
方法返回 Future
实例。调用该方法后,获取 Future
实例,在需要阻塞获取结果时调用 get
方法。
get
方法会抛出受检异常:ExecutionException
(封装计算期间发生的异常)和 InterruptedException
(表示线程在活动前后被中断):
Future<String> completableFuture = calculateAsync();
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
若已知计算结果,可使用静态方法 completedFuture
传入结果。此时 Future
的 get
方法永不阻塞,立即返回结果:
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
另一种场景是取消 Future
执行。
4. 封装计算逻辑的 CompletableFuture
上述代码允许选择任意并发执行机制,但若想跳过模板代码直接异步执行逻辑呢?
静态方法 runAsync
和 supplyAsync
允许分别基于 Runnable
和 Supplier
函数式接口创建 CompletableFuture
实例。
Runnable
和 Supplier
是函数式接口,得益于 Java 8 新特性,可将其作为 lambda 表达式传递。
Runnable
是线程中使用的传统接口,不返回值。Supplier
是通用函数式接口,其唯一方法无参数且返回参数化类型的值。
这使我们能将 Supplier
实例作为执行计算并返回结果的 lambda 表达式,用法如下:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
5. 处理异步计算结果
处理计算结果的最通用方式是将其传递给函数。thenApply
方法正是为此设计:它接收 Function
实例,用其处理结果,并返回持有函数返回值的 Future
:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
若无需在 Future
链中传递值,可使用 Consumer
函数式接口实例。其唯一方法接收参数且返回 void
。
CompletableFuture
提供了 thenAccept
方法:接收 Consumer
并传入计算结果。最终的 future.get()
调用返回 Void
类型实例:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
最后,若既不需要计算值也不想在链尾返回值,可将 Runnable
lambda 传递给 thenRun
方法。下例在调用 future.get()
后简单打印一行:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
6. 组合 Future
CompletableFuture
API 最强大的功能是将多个 CompletableFuture
实例组合成计算步骤链。
组合结果本身也是 CompletableFuture
,支持进一步链式组合。这种模式在函数式语言中很常见,称为单子设计模式。
**下例使用 thenCompose
方法顺序链式组合两个 Future
**。注意该方法接收返回 CompletableFuture
实例的函数,函数参数是前一步的计算结果,允许在下一个 CompletableFuture
的 lambda 中使用该值:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
thenCompose
与 thenApply
共同实现了单子模式的基础构件。它们与 Java 8 中的 Stream
和 Optional
类的 map
和 flatMap
方法密切相关。
两方法均接收函数并应用于计算结果,但 thenCompose
(即 flatMap
)接收返回同类型对象的函数。这种函数结构允许将类实例作为基础构件组合。
若要执行两个独立的 Future
并处理其结果,可使用 thenCombine
方法:接收一个 Future
和一个双参数 Function
处理两个结果:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
更简单的场景是:只需处理两个 Future
的结果,无需在链中传递值。此时可使用 thenAcceptBoth
方法:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
7. thenApply()
与 thenCompose()
的区别
前文展示了 thenApply()
和 thenCompose()
的示例。两 API 均用于链式调用 CompletableFuture
,但用法不同。
7.1 thenApply()
此方法用于处理前次调用的结果。关键点在于返回类型是所有调用的组合结果。
因此当需要转换 CompletableFuture
调用的结果时,此方法很有用:
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
7.2 thenCompose()
thenCompose()
与 thenApply()
类似,均返回新的 CompletionStage
。但 thenCompose()
将前一阶段作为参数。它会扁平化处理并直接返回结果的 Future
,而非像 thenApply()
那样返回嵌套的 Future
:
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
若目标是链式调用 CompletableFuture
方法,优先使用 thenCompose()
。
注意:两方法的区别类似于 [map()
与 flatMap()
的区别`](/java-difference-map-and-flatmap)。
8. 并行运行多个 Future
当需要并行执行多个 Future
时,通常希望等待所有任务完成,再处理组合结果。
静态方法 CompletableFuture.allOf
允许等待所有提供的 Future
完成(可变参数):
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
注意 CompletableFuture.allOf()
返回 CompletableFuture<Void>
。此方法的局限是不返回所有 Future
的组合结果,需手动获取结果。幸运的是,CompletableFuture.join()
方法和 Java 8 Streams API 可简化操作:
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
CompletableFuture.join()
方法与 get
类似,但在 Future
未正常完成时抛出非受检异常。这使其可在 Stream.map()
方法中作为方法引用使用。
9. 错误处理
在异步计算步骤链中处理错误时,需采用类似 throw/catch
的机制。
CompletableFuture
类通过特殊的 handle
方法处理异常,而非语法块捕获。该方法接收两个参数:计算结果(成功完成时)和抛出的异常(某步骤未正常完成时)。
下例使用 handle
方法在异步计算问候语出错时(因未提供名称)提供默认值:
String name = null;
// ...
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
另一种场景:想像首例那样手动完成 Future
,但也能用异常完成。此时可用 completeExceptionally
方法。下例中的 completableFuture.get()
抛出 ExecutionException
,其原因为 RuntimeException
:
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
上例中,可通过 handle
方法异步处理异常,但使用 get
方法时,可采用更典型的同步异常处理方式。
10. Async 方法
CompletableFuture
类的流式 API 中,大多数方法都有带 Async
后缀的两种变体。这些方法通常用于在另一个线程中执行相应步骤。
无 Async
后缀的方法使用调用线程执行下一阶段。带 Async
但无 Executor
参数的方法使用通用的 fork/join
池实现(通过 ForkJoinPool.commonPool()
访问),前提是并行度 > 1。带 Executor
参数的 Async
方法使用传入的 Executor
执行步骤。
下例修改为用 Function
实例处理计算结果。唯一可见差异是 thenApplyAsync
方法,但底层将函数应用包装到 ForkJoinTask
实例中(详见 Fork/Join 框架指南
)。这能进一步并行化计算,更高效利用系统资源:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
11. JDK 9 CompletableFuture
API
Java 9 通过以下改进增强了 CompletableFuture
API:
✅ 新增工厂方法
✅ 支持延迟和超时
✅ 改进子类化支持
新增实例 API:
✅ Executor defaultExecutor()
✅ CompletableFuture<U> newIncompleteFuture()
✅ CompletableFuture<T> copy()
✅ CompletionStage<T> minimalCompletionStage()
✅ CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
✅ CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
✅ CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
✅ CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)
新增静态工具方法:
✅ Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
✅ Executor delayedExecutor(long delay, TimeUnit unit)
✅ <U> CompletionStage<U> completedStage(U value)
✅ <U> CompletionStage<U> failedStage(Throwable ex)
✅ <U> CompletableFuture<U> failedFuture(Throwable ex)
为解决超时问题,Java 9 新增两个函数:
✅ orTimeout()
✅ completeOnTimeout()
详情可阅读:Java 9 CompletableFuture API 改进。
12. 总结
本文介绍了 CompletableFuture
类的方法和典型使用场景。
文章源码可在 GitHub 获取。