1. 概述

当需要让 Web 客户端与服务器保持持续双向通信时,WebSockets 是一个非常合适的解决方案。✅
WebSockets 能建立持久化的全双工连接,允许服务器和客户端之间自由地收发消息。

本文将介绍如何在 Play Framework 中结合 Akka 来实现 WebSockets 通信。我们不仅会使用 Akka Actor 处理消息,还会展示如何用 Akka Streams 构建响应式流式通信。

2. 环境准备

我们将构建一个简单的聊天应用:用户发送消息给服务器,服务器从 JSONPlaceholder 获取随机内容并返回。

2.1. 初始化 Play Framework 项目

使用 Play Framework 创建项目,具体步骤可参考官方文档的 Java 版 Play 入门指南

2.2. 引入前端依赖

客户端需要使用 JavaScript 接收服务器推送的消息,这里我们引入 jQuery 简化 DOM 操作。

app/views/index.scala.html 文件底部添加:

<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>

2.3. 添加 Akka 依赖

build.sbt 中添加以下依赖:

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion

⚠️ 注意:akka-http-jackson 用于 JSON 反序列化,别忘了加上。

现在环境就绪,可以开始实现 WebSocket 了!

3. 使用 Akka Actor 处理 WebSocket

Play 的 WebSocket 机制基于 Akka Streams 构建。WebSocket 被建模为一个 Flow —— 所有进来的消息进入流,流处理后输出消息给客户端。

要使用 Actor 处理 WebSocket,我们需要 ActorFlow 工具类,它能将 ActorRef 转换为 Flow

3.1. WebSocket 控制器方法

首先在控制器中注入 ActorSystemMaterializer

private ActorSystem actorSystem;
private Materializer materializer;

@Inject
public HomeController(ActorSystem actorSystem, Materializer materializer) {
    this.actorSystem = actorSystem;
    this.materializer = materializer;
}

定义 WebSocket 接口方法:

public WebSocket socket() {
    return WebSocket.Json.acceptOrResult(this::createActorFlow);
}

acceptOrResult 接收请求头并返回一个 CompletionStage,用于异步决定是否接受连接。

构建连接逻辑:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      F.Either.Right(createFlowForActor()));
}

F.Either.Right 表示接受连接并返回 Flow。
F.Either.Left 用于拒绝连接(比如未认证)。

示例:基于 Session 认证判断是否放行:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow2(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      request.session()
        .getOptional("username")
        .map(username -> F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(createFlowForActor()))
        .orElseGet(() -> F.Either.Left(forbidden()))
    );
}

创建 Actor 流:

private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
    return ActorFlow.actorRef(out -> Messenger.props(out), 
      actorSystem, materializer);
}

ActorFlow.actorRef 会创建一个由 Messenger Actor 处理的 Flow。

3.2. 配置路由

conf/routes 中添加:

GET  /                    controllers.HomeController.index(request: Request)
GET  /chat                controllers.HomeController.socket
GET  /chat/with/streams   controllers.HomeController.akkaStreamsSocket
GET  /assets/*file        controllers.Assets.versioned(path="/public", file: Asset)

3.3. Actor 实现

Actor 的核心是 createReceive 方法,用于定义消息处理逻辑:

@Override
public Receive createReceive() {
    return receiveBuilder()
      .match(JsonNode.class, this::onSendMessage)
      .matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
      .build();
}

处理收到的消息:

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    processMessage(requestDTO);
}

发起 HTTP 请求并回复客户端:

private void processMessage(RequestDTO requestDTO) {
    CompletionStage<HttpResponse> responseFuture = getRandomMessage();
    responseFuture.thenCompose(this::consumeHttpResponse)
      .thenAccept(messageDTO ->
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}

3.4. 使用 Akka HTTP 调用 REST 接口

调用 JSONPlaceholder 获取随机文章:

private CompletionStage<HttpResponse> getRandomMessage() {
    int postId = ThreadLocalRandom.current().nextInt(1, 101);
    return Http.get(getContext().getSystem())
      .singleRequest(HttpRequest.create(
        "https://jsonplaceholder.typicode.com/posts/" + postId));
}

解析响应并反序列化为 DTO:

private CompletionStage<MessageDTO> consumeHttpResponse(HttpResponse httpResponse) {
    Materializer materializer = Materializer.matFromSystem(getContext().getSystem());
    return Jackson.unmarshaller(MessageDTO.class)
      .unmarshal(httpResponse.entity(), materializer)
      .thenApply(messageDTO -> {
          log.info("Received message: {}", messageDTO);
          discardEntity(httpResponse, materializer);
          return messageDTO;
      });
}

工具类用于 JSON 与 DTO 转换:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.convertValue(jsonNode, MessageDTO.class);
}

⚠️ 别忘了消费并丢弃 HTTP 响应实体,避免资源泄漏:

private void discardEntity(HttpResponse httpResponse, Materializer materializer) {
    HttpMessage.DiscardedEntity discarded = httpResponse.discardEntityBytes(materializer);
    discarded.completionStage()
      .whenComplete((done, ex) -> log.info("Entity discarded completely!"));
}

4. 前端 WebSocket 客户端实现

4.1. 渲染首页

控制器方法返回带 WebSocket URL 的页面:

public Result index(Http.Request request) {
    String url = routes.HomeController.socket().webSocketURL(request);
    return ok(views.html.index.render(url));
}

4.2. 页面模板

app/views/index.scala.html 内容:

@(url: String)

<div id="messageContent"></div>
<form>
    <textarea id="messageInput"></textarea>
    <button id="sendButton">Send</button>
</form>

4.3. JavaScript 事件处理

添加 WebSocket 事件监听:

var webSocket;
var messageInput;

function init() {
    initWebSocket();
}

function initWebSocket() {
    webSocket = new WebSocket("@url");
    webSocket.onopen = onOpen;
    webSocket.onclose = onClose;
    webSocket.onmessage = onMessage;
    webSocket.onerror = onError;
}

function onOpen(evt) {
    writeToScreen("CONNECTED");
}

function onClose(evt) {
    writeToScreen("DISCONNECTED");
}

function onError(evt) {
    writeToScreen("ERROR: " + JSON.stringify(evt));
}

function onMessage(evt) {
    var receivedData = JSON.parse(evt.data);
    appendMessageToView("Server", receivedData.body);
}

function appendMessageToView(title, message) {
    $("#messageContent").append("<p>" + title + ": " + message + "</p>");
}

function writeToScreen(message) {
    console.log("New message: ", message);
}

4.4. 运行测试

启动应用:

cd websockets
sbt run

访问 http://localhost:9000,输入消息点击发送,服务器会返回 JSONPlaceholder 的随机内容。

websocket interactive chat

5. 使用 Akka Streams 直接处理 WebSocket

如果只是定时推送消息,可以直接使用 Akka Streams,无需 Actor。

示例:每 2 秒向客户端推送一条消息:

public WebSocket akkaStreamsSocket() {
    return WebSocket.Json.accept(request -> {
        Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
        MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body");
        Source<JsonNode, ?> out = Source.tick(
          Duration.ofSeconds(2),
          Duration.ofSeconds(2),
          MessageConverter.messageToJsonNode(messageDTO)
        );
        return Flow.fromSinkAndSource(in, out);
    });
}

修改首页 URL 指向新接口:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

刷新页面,每 2 秒收到一条消息:

websocket timed chat

6. Actor 生命周期管理

6.1. 监听 Actor 停止

Play 会在 Actor 终止时自动关闭 WebSocket。我们可以通过重写 postStop 方法监听:

@Override
public void postStop() throws Exception {
    log.info("Messenger actor stopped at {}",
      OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}

6.2. 手动终止 Actor

收到 "stop" 消息时,发送 PoisonPill 结束 Actor:

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    if ("stop".equals(message)) {
        MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor");
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
        self().tell(PoisonPill.getInstance(), getSelf());
    } else {
        log.info("Actor received. {}", requestDTO);
        processMessage(requestDTO);
    }
}

简单粗暴,但有效。

7. 配置选项

7.1. WebSocket 帧长度

限制帧大小可防范 DoS 攻击:

play.server.websocket.frame.maxLength = 64k

也可通过启动参数设置:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. 连接空闲超时

默认 60 秒无活动连接会被关闭。可调整为无限:

play.server.http.idleTimeout = "infinite"

或通过命令行:

sbt -Dhttp.idleTimeout=infinite run

⚠️ 开发环境可在 build.sbt 中配置(仅开发生效):

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

也可设为具体时间:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

更多配置详见 Play 官方文档

8. 总结

本文展示了如何在 Play Framework 中结合 Akka 实现 WebSocket:

  • ✅ 使用 ActorFlow 将 Actor 集成到 WebSocket Flow
  • ✅ 通过 Akka HTTP 调用外部 REST 接口
  • ✅ 前端使用原生 WebSocket API 实现通信
  • ✅ 使用 Akka Streams 实现定时消息推送
  • ✅ 配置连接超时与帧大小等关键参数

完整源码见 GitHub 仓库
踩坑提示:记得消费并丢弃 HTTP 响应实体,否则可能引发内存泄漏!


原始标题:WebSockets with the Play Framework and Akka