1. 概述
现代 Web 服务常常需要调用其他外部服务来完成业务逻辑。如果处理不当,这些依赖可能导致响应延迟升高,请求堆积,资源耗尽。尤其当某个下游服务变慢时,整个系统可能被拖垮。
这时候,非阻塞(non-blocking)的异步编程模型就派上用场了。
本文将演示如何在 Play Framework 应用中发起多个异步 HTTP 请求。借助 Java 的非阻塞 HTTP 能力,我们可以在不阻塞主线程的前提下,高效地与外部资源交互。
我们将重点使用 Play 提供的 WebService Library (WS) 来实现这些功能。
2. Play WebService (WS) 库简介
✅ WS 是一个强大的库,支持通过 Java Action
发起异步 HTTP 调用。
它的核心特点是:发送请求后立即返回,不阻塞当前线程。我们通过提供一个回调函数(即 Consumer
接口的实现)来处理请求完成后的结果。
这种模式与 JavaScript 中的回调、Promise
和 async/await
有异曲同工之妙。
来看一个简单的 Consumer
示例,它仅将响应信息记录到日志:
ws.url(url)
.thenAccept(r ->
log.debug("Thread#" + Thread.currentThread().getId()
+ " Request complete: Response code = " + r.getStatus()
+ " | Response: " + r.getBody()
+ " | Current Time:" + System.currentTimeMillis()))
这里的 Consumer
只是打印日志,实际场景中它可能将结果存入数据库、更新缓存等。
深入底层你会发现,WS 库本质上是对 Java 标准库中的 AsyncHttpClient
进行了封装和配置。这个库本身不依赖 Play,但 Play 为其提供了更友好的 API 和集成。
3. 准备示例项目
为了实践这些功能,我们将创建一个简单的测试项目:一个骨架 Web 应用用于接收请求,再通过 WS 库发起调用。
3.1. 创建骨架 Web 应用
首先,使用 sbt new
命令生成项目骨架:
sbt new playframework/play-java-seed.g8
进入新生成的目录后,编辑 build.sbt
文件,添加 WS 库依赖:
libraryDependencies += javaWs
接着启动服务:
$ sbt run
...
--- (Running the application, auto-reloading is enabled) ---
[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
服务启动后,访问 http://localhost:9000 应能看到 Play 的欢迎页面,说明一切正常。
3.2. 搭建测试环境
我们将使用单元测试类 HomeControllerTest
来验证功能。
首先,继承 WithServer
类,它能自动管理测试服务器的生命周期:
public class HomeControllerTest extends WithServer {
✅ WithServer
会在测试开始前启动骨架服务(使用随机端口),并在测试结束后自动关闭。
接下来,通过 Guice 的 GuiceApplicationBuilder
创建测试用的 Application 实例:
@Override
protected Application provideApplication() {
return new GuiceApplicationBuilder().build();
}
最后,在 @Before
方法中获取测试服务器的端口,并构建用于测试的 URL:
@Override
@Before
public void setup() {
OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
if (optHttpsPort.isPresent()) {
port = optHttpsPort.getAsInt();
url = "https://localhost:" + port;
} else {
port = testServer.getRunningHttpPort()
.getAsInt();
url = "http://localhost:" + port;
}
}
至此,测试环境准备就绪,我们可以专注于编写请求测试逻辑。
4. 构建 WSRequest 请求
本节介绍如何发起常见的请求类型:GET、POST 以及用于文件上传的 multipart 请求。
4.1. 初始化 WSRequest
对象
发起请求前,需要先获取 WSClient
实例。
在真实项目中,通常通过依赖注入获取已配置好的客户端:
@Autowired
WSClient ws;
在测试类中,我们使用 Play 测试框架提供的 WSTestClient
:
WSClient ws = play.test.WSTestClient.newClient(port);
获取客户端后,调用 url
方法初始化 WSRequest
对象:
ws.url(url)
⚠️ url
方法是必需的,但仅此不足以发起请求。 通常还需要进一步配置:
ws.url(url)
.addHeader("key", "value")
.addQueryParameter("num", "" + num);
如上所示,添加 Header 和查询参数非常直观。
配置完成后,即可调用具体方法(如 get()
、post()
)发起请求。
4.2. 发起 GET 请求
发起 GET 请求只需调用 get
方法:
ws.url(url)
...
.get();
由于是非阻塞调用,代码会立即继续执行下一行。
✅ get
方法返回一个 CompletionStage
实例(属于 CompletableFuture
API)。
当 HTTP 请求完成后,该阶段会将响应封装成 WSResponse
对象。
⚠️ 如果没有提供后续处理逻辑(如 thenAccept
),这个结果就会被丢弃——这就是所谓的“发后即忘”(fire-and-forget)模式。
4.3. 提交表单(POST)
提交表单与 GET 请求类似,主要区别是调用 post
方法并传入请求体:
ws.url(url)
...
.setContentType("application/x-www-form-urlencoded")
.post("key1=value1&key2=value2");
✅ post
方法需要一个参数作为请求体。 这个参数可以是:
- 纯文本字符串
- JSON 或 XML 文档
BodyWritable
对象Source
流
4.4. 提交 Multipart/Form-Data
Multipart 请求需要同时发送表单字段和文件数据。
✅ 在框架中,我们使用 post
方法配合 Source
来实现:
Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> filePart =
new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");
ws.url(url)
...
.post(Source.from(Arrays.asList(filePart, data)));
虽然配置稍复杂,但整体模式与其他请求保持一致。
5. 处理异步响应
到目前为止,我们只演示了“发后即忘”的请求。接下来探讨如何真正处理响应数据。
主要有两种方式:
- ❌ 阻塞主线程,等待
CompletableFuture
完成 - ✅ 提供
Consumer
或Function
异步消费结果
5.1. 通过 CompletableFuture
阻塞处理
尽管推荐非阻塞,但有时为了强一致性,可能需要阻塞等待结果:
WSResponse response = ws.url(url)
.get()
.toCompletableFuture()
.get();
⚠️ 这种方式会阻塞当前线程直到响应返回,可能影响吞吐量,应谨慎使用。
5.2. 异步处理响应
✅ 推荐方式:提供一个 Consumer
或 Function
,由异步框架在响应就绪时自动执行。
例如,为请求添加日志:
ws.url(url)
.addHeader("key", "value")
.addQueryParameter("num", "" + 1)
.get()
.thenAccept(r ->
log.debug("Thread#" + Thread.currentThread().getId()
+ " Request complete: Response code = " + r.getStatus()
+ " | Response: " + r.getBody()
+ " | Current Time:" + System.currentTimeMillis()));
日志输出如下:
[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
"Result" : "ok",
"Params" : {
"num" : [ "1" ]
},
"Headers" : {
"accept" : [ "*/*" ],
"host" : [ "localhost:19001" ],
"key" : [ "value" ],
"user-agent" : [ "AHC/2.1" ]
}
} | Current Time:1579303109613
注意:
- 使用
thenAccept
(接受Consumer
)适合无需返回值的场景(如日志)。 - 使用
thenApply
(接受Function
)适合需要将结果传递给下一阶段的场景。
这遵循了标准的 Java 函数式接口 规范。
5.3. 处理大响应体
前述方法适用于小响应体。若需处理数百 MB 的数据,直接加载到内存可能导致 OutOfMemoryError
。
⚠️ get
和 post
等方法会将整个响应体加载到内存中。
✅ 解决方案:使用 Akka Streams 流式处理响应,避免内存溢出。
例如,将响应体直接写入文件:
ws.url(url)
.stream()
.thenAccept(
response -> {
try {
OutputStream outputStream = Files.newOutputStream(path);
Sink<ByteString, CompletionStage<Done>> outputWriter =
Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
response.getBodyAsSource().runWith(outputWriter, materializer);
} catch (IOException e) {
log.error("An error happened while opening the output stream", e);
}
});
关键点:
- 使用
stream()
方法而非get()
。 - 返回的
WSResponse
提供getBodyAsSource()
,返回Source<ByteString, ?>
。 - 通过 Akka
Sink
定义如何处理数据流(本例为写入文件)。
5.4. 超时控制
为防止慢服务拖垮系统,可设置请求超时:
ws.url(url)
.setRequestTimeout(Duration.of(1, SECONDS));
⚠️ 但这只解决了“等待响应”的超时。如果响应已到,但处理逻辑(Consumer
)本身很慢(如大量计算、DB 操作),仍可能阻塞。
✅ 解决方案:使用 futures.timeout
包装整个异步链:
CompletionStage<Result> f = futures.timeout(
ws.url(url)
.get()
.thenApply(result -> {
try {
Thread.sleep(10000L); // 模拟慢处理
return Results.ok();
} catch (InterruptedException e) {
return Results.status(SERVICE_UNAVAILABLE);
}
}), 1L, TimeUnit.SECONDS);
这样,无论网络请求还是后续处理,总耗时超过 1 秒都会触发 TimeoutException
。
5.5. 异常处理
异步链可能成功也可能失败。使用 handleAsync
可统一处理两种情况:
CompletionStage<Object> res = f.handleAsync((result, e) -> {
if (e != null) {
log.error("Exception thrown", e);
return e.getCause();
} else {
return result;
}
});
该方法接收 (result, exception)
两个参数,返回值会成为新的 CompletionStage
结果。
可通过断言验证超时异常:
Class<?> clazz = res.toCompletableFuture().get().getClass();
assertEquals(TimeoutException.class, clazz);
运行测试会看到日志:
[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...
6. 请求过滤器(Request Filters)
有时需要在请求发出前执行一些逻辑(如审计、日志)。
虽然可以直接修改 WSRequest
,但更优雅的方式是使用 WSRequestFilter
。
✅ 过滤器在请求触发前附加到请求逻辑上。
可自定义实现 WSRequestFilter
,也可使用现成的:
ws.url(url)
...
.setRequestFilter(new AhcCurlRequestLogger())
...
.get();
AhcCurlRequestLogger
会以 curl
命令格式输出日志:
[info] p.l.w.a.AhcCurlRequestLogger - curl \
--verbose \
--request GET \
--header 'key: value' \
'http://localhost:19001'
可通过修改 logback.xml
调整日志级别。
7. 响应缓存
✅ WSClient
支持响应缓存,对重复请求或下游服务不稳定时非常有用。
7.1. 添加缓存依赖
在 build.sbt
中添加:
libraryDependencies += ehcache
这会集成 Ehcache 作为缓存层。你也可以选用任何符合 JSR-107 规范的实现。
7.2. 启用启发式缓存
⚠️ 默认情况下,如果远程服务未返回缓存头(如 Cache-Control
),Play WS 不会缓存响应。
✅ 解决方案:在 application.conf
中启用启发式缓存:
play.ws.cache.heuristics.enabled=true
启用后,系统会根据启发式规则(如响应码、内容类型)决定是否缓存,不再依赖服务端的缓存配置。
8. 高级调优
根据下游服务特性,可能需要调整客户端行为。可在 application.conf
中配置:
play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# 建立连接超时(秒)
play.ws.timeout.connection=30
# 连接建立后等待数据超时(秒)
play.ws.timeout.idle=30
# 整个请求最大超时(秒)
play.ws.timeout.request=300
✅ 也可直接配置底层的 AsyncHttpClient
。
所有可用配置项详见 AhcConfig
源码。
9. 总结
本文深入探讨了 Play WS 库的核心功能:
- 项目配置与基础请求发起
- 同步与异步响应处理
- 大数据流的流式处理
- 超时与异常处理
- 请求过滤与响应缓存
- 客户端高级调优
掌握这些技巧,能让你的 Play 应用在面对外部依赖时更加健壮和高效。
文中所有示例代码均可在 GitHub 上获取:https://github.com/eugenp/tutorials/tree/master/web-modules/play-modules/async-http