1. 概述

现代 Web 服务常常需要调用其他外部服务来完成业务逻辑。如果处理不当,这些依赖可能导致响应延迟升高,请求堆积,资源耗尽。尤其当某个下游服务变慢时,整个系统可能被拖垮。

这时候,非阻塞(non-blocking)的异步编程模型就派上用场了。

本文将演示如何在 Play Framework 应用中发起多个异步 HTTP 请求。借助 Java 的非阻塞 HTTP 能力,我们可以在不阻塞主线程的前提下,高效地与外部资源交互。

我们将重点使用 Play 提供的 WebService Library (WS) 来实现这些功能。

2. Play WebService (WS) 库简介

WS 是一个强大的库,支持通过 Java Action 发起异步 HTTP 调用。

它的核心特点是:发送请求后立即返回,不阻塞当前线程。我们通过提供一个回调函数(即 Consumer 接口的实现)来处理请求完成后的结果。

这种模式与 JavaScript 中的回调、Promiseasync/await 有异曲同工之妙。

来看一个简单的 Consumer 示例,它仅将响应信息记录到日志:

ws.url(url)
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()))

这里的 Consumer 只是打印日志,实际场景中它可能将结果存入数据库、更新缓存等。

深入底层你会发现,WS 库本质上是对 Java 标准库中的 AsyncHttpClient 进行了封装和配置。这个库本身不依赖 Play,但 Play 为其提供了更友好的 API 和集成。

3. 准备示例项目

为了实践这些功能,我们将创建一个简单的测试项目:一个骨架 Web 应用用于接收请求,再通过 WS 库发起调用。

3.1. 创建骨架 Web 应用

首先,使用 sbt new 命令生成项目骨架:

sbt new playframework/play-java-seed.g8

进入新生成的目录后,编辑 build.sbt 文件,添加 WS 库依赖:

libraryDependencies += javaWs

接着启动服务:

$ sbt run
...
--- (Running the application, auto-reloading is enabled) ---

[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

服务启动后,访问 http://localhost:9000 应能看到 Play 的欢迎页面,说明一切正常。

3.2. 搭建测试环境

我们将使用单元测试类 HomeControllerTest 来验证功能。

首先,继承 WithServer 类,它能自动管理测试服务器的生命周期:

public class HomeControllerTest extends WithServer {

WithServer 会在测试开始前启动骨架服务(使用随机端口),并在测试结束后自动关闭。

接下来,通过 Guice 的 GuiceApplicationBuilder 创建测试用的 Application 实例:

@Override
protected Application provideApplication() {
    return new GuiceApplicationBuilder().build();
}

最后,在 @Before 方法中获取测试服务器的端口,并构建用于测试的 URL:

@Override
@Before
public void setup() {
    OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
    if (optHttpsPort.isPresent()) {
        port = optHttpsPort.getAsInt();
        url = "https://localhost:" + port;
    } else {
        port = testServer.getRunningHttpPort()
          .getAsInt();
        url = "http://localhost:" + port;
    }
}

至此,测试环境准备就绪,我们可以专注于编写请求测试逻辑。

4. 构建 WSRequest 请求

本节介绍如何发起常见的请求类型:GET、POST 以及用于文件上传的 multipart 请求。

4.1. 初始化 WSRequest 对象

发起请求前,需要先获取 WSClient 实例。

在真实项目中,通常通过依赖注入获取已配置好的客户端:

@Autowired
WSClient ws;

在测试类中,我们使用 Play 测试框架提供的 WSTestClient

WSClient ws = play.test.WSTestClient.newClient(port);

获取客户端后,调用 url 方法初始化 WSRequest 对象:

ws.url(url)

⚠️ url 方法是必需的,但仅此不足以发起请求。 通常还需要进一步配置:

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + num);

如上所示,添加 Header 和查询参数非常直观。

配置完成后,即可调用具体方法(如 get()post())发起请求。

4.2. 发起 GET 请求

发起 GET 请求只需调用 get 方法:

ws.url(url)
  ...
  .get();

由于是非阻塞调用,代码会立即继续执行下一行。

get 方法返回一个 CompletionStage 实例(属于 CompletableFuture API)。

当 HTTP 请求完成后,该阶段会将响应封装成 WSResponse 对象。

⚠️ 如果没有提供后续处理逻辑(如 thenAccept),这个结果就会被丢弃——这就是所谓的“发后即忘”(fire-and-forget)模式。

4.3. 提交表单(POST)

提交表单与 GET 请求类似,主要区别是调用 post 方法并传入请求体:

ws.url(url)
  ...
  .setContentType("application/x-www-form-urlencoded")
  .post("key1=value1&key2=value2");

post 方法需要一个参数作为请求体。 这个参数可以是:

  • 纯文本字符串
  • JSON 或 XML 文档
  • BodyWritable 对象
  • Source

4.4. 提交 Multipart/Form-Data

Multipart 请求需要同时发送表单字段和文件数据。

在框架中,我们使用 post 方法配合 Source 来实现:

Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> filePart = 
  new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");

ws.url(url)
...
  .post(Source.from(Arrays.asList(filePart, data)));

虽然配置稍复杂,但整体模式与其他请求保持一致。

5. 处理异步响应

到目前为止,我们只演示了“发后即忘”的请求。接下来探讨如何真正处理响应数据。

主要有两种方式:

  • ❌ 阻塞主线程,等待 CompletableFuture 完成
  • ✅ 提供 ConsumerFunction 异步消费结果

5.1. 通过 CompletableFuture 阻塞处理

尽管推荐非阻塞,但有时为了强一致性,可能需要阻塞等待结果:

WSResponse response = ws.url(url)
  .get()
  .toCompletableFuture()
  .get();

⚠️ 这种方式会阻塞当前线程直到响应返回,可能影响吞吐量,应谨慎使用。

5.2. 异步处理响应

推荐方式:提供一个 ConsumerFunction,由异步框架在响应就绪时自动执行。

例如,为请求添加日志:

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + 1)
  .get()
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()));

日志输出如下:

[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
  "Result" : "ok",
  "Params" : {
    "num" : [ "1" ]
  },
  "Headers" : {
    "accept" : [ "*/*" ],
    "host" : [ "localhost:19001" ],
    "key" : [ "value" ],
    "user-agent" : [ "AHC/2.1" ]
  }
} | Current Time:1579303109613

注意:

  • 使用 thenAccept(接受 Consumer)适合无需返回值的场景(如日志)。
  • 使用 thenApply(接受 Function)适合需要将结果传递给下一阶段的场景。

这遵循了标准的 Java 函数式接口 规范。

5.3. 处理大响应体

前述方法适用于小响应体。若需处理数百 MB 的数据,直接加载到内存可能导致 OutOfMemoryError

⚠️ getpost 等方法会将整个响应体加载到内存中。

解决方案:使用 Akka Streams 流式处理响应,避免内存溢出。

例如,将响应体直接写入文件:

ws.url(url)
  .stream()
  .thenAccept(
    response -> {
        try {
            OutputStream outputStream = Files.newOutputStream(path);
            Sink<ByteString, CompletionStage<Done>> outputWriter =
              Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
            response.getBodyAsSource().runWith(outputWriter, materializer);
        } catch (IOException e) {
            log.error("An error happened while opening the output stream", e);
        }
    });

关键点:

  • 使用 stream() 方法而非 get()
  • 返回的 WSResponse 提供 getBodyAsSource(),返回 Source<ByteString, ?>
  • 通过 Akka Sink 定义如何处理数据流(本例为写入文件)。

5.4. 超时控制

为防止慢服务拖垮系统,可设置请求超时:

ws.url(url)
  .setRequestTimeout(Duration.of(1, SECONDS));

⚠️ 但这只解决了“等待响应”的超时。如果响应已到,但处理逻辑(Consumer)本身很慢(如大量计算、DB 操作),仍可能阻塞。

解决方案:使用 futures.timeout 包装整个异步链:

CompletionStage<Result> f = futures.timeout(
  ws.url(url)
    .get()
    .thenApply(result -> {
        try {
            Thread.sleep(10000L); // 模拟慢处理
            return Results.ok();
        } catch (InterruptedException e) {
            return Results.status(SERVICE_UNAVAILABLE);
        }
    }), 1L, TimeUnit.SECONDS);

这样,无论网络请求还是后续处理,总耗时超过 1 秒都会触发 TimeoutException

5.5. 异常处理

异步链可能成功也可能失败。使用 handleAsync 可统一处理两种情况:

CompletionStage<Object> res = f.handleAsync((result, e) -> {
    if (e != null) {
        log.error("Exception thrown", e);
        return e.getCause();
    } else {
        return result;
    }
});

该方法接收 (result, exception) 两个参数,返回值会成为新的 CompletionStage 结果。

可通过断言验证超时异常:

Class<?> clazz = res.toCompletableFuture().get().getClass();
assertEquals(TimeoutException.class, clazz);

运行测试会看到日志:

[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...

6. 请求过滤器(Request Filters)

有时需要在请求发出前执行一些逻辑(如审计、日志)。

虽然可以直接修改 WSRequest,但更优雅的方式是使用 WSRequestFilter

过滤器在请求触发前附加到请求逻辑上。

可自定义实现 WSRequestFilter,也可使用现成的:

ws.url(url)
  ...
  .setRequestFilter(new AhcCurlRequestLogger())
  ...
  .get();

AhcCurlRequestLogger 会以 curl 命令格式输出日志:

[info] p.l.w.a.AhcCurlRequestLogger - curl \
  --verbose \
  --request GET \
  --header 'key: value' \
  'http://localhost:19001'

可通过修改 logback.xml 调整日志级别。

7. 响应缓存

WSClient 支持响应缓存,对重复请求或下游服务不稳定时非常有用。

7.1. 添加缓存依赖

build.sbt 中添加:

libraryDependencies += ehcache

这会集成 Ehcache 作为缓存层。你也可以选用任何符合 JSR-107 规范的实现。

7.2. 启用启发式缓存

⚠️ 默认情况下,如果远程服务未返回缓存头(如 Cache-Control),Play WS 不会缓存响应。

解决方案:在 application.conf 中启用启发式缓存:

play.ws.cache.heuristics.enabled=true

启用后,系统会根据启发式规则(如响应码、内容类型)决定是否缓存,不再依赖服务端的缓存配置。

8. 高级调优

根据下游服务特性,可能需要调整客户端行为。可在 application.conf 中配置:

play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# 建立连接超时(秒)
play.ws.timeout.connection=30
# 连接建立后等待数据超时(秒)
play.ws.timeout.idle=30
# 整个请求最大超时(秒)
play.ws.timeout.request=300

也可直接配置底层的 AsyncHttpClient

所有可用配置项详见 AhcConfig 源码。

9. 总结

本文深入探讨了 Play WS 库的核心功能:

  • 项目配置与基础请求发起
  • 同步与异步响应处理
  • 大数据流的流式处理
  • 超时与异常处理
  • 请求过滤与响应缓存
  • 客户端高级调优

掌握这些技巧,能让你的 Play 应用在面对外部依赖时更加健壮和高效。

文中所有示例代码均可在 GitHub 上获取:https://github.com/eugenp/tutorials/tree/master/web-modules/play-modules/async-http


原始标题:Asynchronous HTTP Programming with Play Framework

« 上一篇: Java Weekly, 第322期