1. 概述
当需要让 Web 客户端与服务器保持持续双向通信时,WebSockets 是一个非常合适的解决方案。✅
WebSockets 能建立持久化的全双工连接,允许服务器和客户端之间自由地收发消息。
本文将介绍如何在 Play Framework 中结合 Akka 来实现 WebSockets 通信。我们不仅会使用 Akka Actor 处理消息,还会展示如何用 Akka Streams 构建响应式流式通信。
2. 环境准备
我们将构建一个简单的聊天应用:用户发送消息给服务器,服务器从 JSONPlaceholder 获取随机内容并返回。
2.1. 初始化 Play Framework 项目
使用 Play Framework 创建项目,具体步骤可参考官方文档的 Java 版 Play 入门指南。
2.2. 引入前端依赖
客户端需要使用 JavaScript 接收服务器推送的消息,这里我们引入 jQuery 简化 DOM 操作。
在 app/views/index.scala.html
文件底部添加:
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
2.3. 添加 Akka 依赖
在 build.sbt
中添加以下依赖:
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
⚠️ 注意:akka-http-jackson
用于 JSON 反序列化,别忘了加上。
现在环境就绪,可以开始实现 WebSocket 了!
3. 使用 Akka Actor 处理 WebSocket
Play 的 WebSocket 机制基于 Akka Streams 构建。WebSocket 被建模为一个 Flow
—— 所有进来的消息进入流,流处理后输出消息给客户端。
要使用 Actor 处理 WebSocket,我们需要 ActorFlow
工具类,它能将 ActorRef
转换为 Flow
。
3.1. WebSocket 控制器方法
首先在控制器中注入 ActorSystem
和 Materializer
:
private ActorSystem actorSystem;
private Materializer materializer;
@Inject
public HomeController(ActorSystem actorSystem, Materializer materializer) {
this.actorSystem = actorSystem;
this.materializer = materializer;
}
定义 WebSocket 接口方法:
public WebSocket socket() {
return WebSocket.Json.acceptOrResult(this::createActorFlow);
}
acceptOrResult
接收请求头并返回一个 CompletionStage
,用于异步决定是否接受连接。
构建连接逻辑:
private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>>
createActorFlow(Http.RequestHeader request) {
return CompletableFuture.completedFuture(
F.Either.Right(createFlowForActor()));
}
✅ F.Either.Right
表示接受连接并返回 Flow。
❌ F.Either.Left
用于拒绝连接(比如未认证)。
示例:基于 Session 认证判断是否放行:
private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>>
createActorFlow2(Http.RequestHeader request) {
return CompletableFuture.completedFuture(
request.session()
.getOptional("username")
.map(username -> F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(createFlowForActor()))
.orElseGet(() -> F.Either.Left(forbidden()))
);
}
创建 Actor 流:
private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
return ActorFlow.actorRef(out -> Messenger.props(out),
actorSystem, materializer);
}
✅ ActorFlow.actorRef
会创建一个由 Messenger
Actor 处理的 Flow。
3.2. 配置路由
在 conf/routes
中添加:
GET / controllers.HomeController.index(request: Request)
GET /chat controllers.HomeController.socket
GET /chat/with/streams controllers.HomeController.akkaStreamsSocket
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)
3.3. Actor 实现
Actor 的核心是 createReceive
方法,用于定义消息处理逻辑:
@Override
public Receive createReceive() {
return receiveBuilder()
.match(JsonNode.class, this::onSendMessage)
.matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
.build();
}
处理收到的消息:
private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
processMessage(requestDTO);
}
发起 HTTP 请求并回复客户端:
private void processMessage(RequestDTO requestDTO) {
CompletionStage<HttpResponse> responseFuture = getRandomMessage();
responseFuture.thenCompose(this::consumeHttpResponse)
.thenAccept(messageDTO ->
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}
3.4. 使用 Akka HTTP 调用 REST 接口
调用 JSONPlaceholder 获取随机文章:
private CompletionStage<HttpResponse> getRandomMessage() {
int postId = ThreadLocalRandom.current().nextInt(1, 101);
return Http.get(getContext().getSystem())
.singleRequest(HttpRequest.create(
"https://jsonplaceholder.typicode.com/posts/" + postId));
}
解析响应并反序列化为 DTO:
private CompletionStage<MessageDTO> consumeHttpResponse(HttpResponse httpResponse) {
Materializer materializer = Materializer.matFromSystem(getContext().getSystem());
return Jackson.unmarshaller(MessageDTO.class)
.unmarshal(httpResponse.entity(), materializer)
.thenApply(messageDTO -> {
log.info("Received message: {}", messageDTO);
discardEntity(httpResponse, materializer);
return messageDTO;
});
}
工具类用于 JSON 与 DTO 转换:
public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(jsonNode, MessageDTO.class);
}
⚠️ 别忘了消费并丢弃 HTTP 响应实体,避免资源泄漏:
private void discardEntity(HttpResponse httpResponse, Materializer materializer) {
HttpMessage.DiscardedEntity discarded = httpResponse.discardEntityBytes(materializer);
discarded.completionStage()
.whenComplete((done, ex) -> log.info("Entity discarded completely!"));
}
4. 前端 WebSocket 客户端实现
4.1. 渲染首页
控制器方法返回带 WebSocket URL 的页面:
public Result index(Http.Request request) {
String url = routes.HomeController.socket().webSocketURL(request);
return ok(views.html.index.render(url));
}
4.2. 页面模板
app/views/index.scala.html
内容:
@(url: String)
<div id="messageContent"></div>
<form>
<textarea id="messageInput"></textarea>
<button id="sendButton">Send</button>
</form>
4.3. JavaScript 事件处理
添加 WebSocket 事件监听:
var webSocket;
var messageInput;
function init() {
initWebSocket();
}
function initWebSocket() {
webSocket = new WebSocket("@url");
webSocket.onopen = onOpen;
webSocket.onclose = onClose;
webSocket.onmessage = onMessage;
webSocket.onerror = onError;
}
function onOpen(evt) {
writeToScreen("CONNECTED");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
}
function onError(evt) {
writeToScreen("ERROR: " + JSON.stringify(evt));
}
function onMessage(evt) {
var receivedData = JSON.parse(evt.data);
appendMessageToView("Server", receivedData.body);
}
function appendMessageToView(title, message) {
$("#messageContent").append("<p>" + title + ": " + message + "</p>");
}
function writeToScreen(message) {
console.log("New message: ", message);
}
4.4. 运行测试
启动应用:
cd websockets
sbt run
访问 http://localhost:9000,输入消息点击发送,服务器会返回 JSONPlaceholder 的随机内容。
5. 使用 Akka Streams 直接处理 WebSocket
如果只是定时推送消息,可以直接使用 Akka Streams,无需 Actor。
示例:每 2 秒向客户端推送一条消息:
public WebSocket akkaStreamsSocket() {
return WebSocket.Json.accept(request -> {
Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body");
Source<JsonNode, ?> out = Source.tick(
Duration.ofSeconds(2),
Duration.ofSeconds(2),
MessageConverter.messageToJsonNode(messageDTO)
);
return Flow.fromSinkAndSource(in, out);
});
}
修改首页 URL 指向新接口:
String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);
刷新页面,每 2 秒收到一条消息:
6. Actor 生命周期管理
6.1. 监听 Actor 停止
Play 会在 Actor 终止时自动关闭 WebSocket。我们可以通过重写 postStop
方法监听:
@Override
public void postStop() throws Exception {
log.info("Messenger actor stopped at {}",
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
6.2. 手动终止 Actor
收到 "stop" 消息时,发送 PoisonPill
结束 Actor:
private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
if ("stop".equals(message)) {
MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor");
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
self().tell(PoisonPill.getInstance(), getSelf());
} else {
log.info("Actor received. {}", requestDTO);
processMessage(requestDTO);
}
}
简单粗暴,但有效。
7. 配置选项
7.1. WebSocket 帧长度
限制帧大小可防范 DoS 攻击:
play.server.websocket.frame.maxLength = 64k
也可通过启动参数设置:
sbt -Dwebsocket.frame.maxLength=64k run
7.2. 连接空闲超时
默认 60 秒无活动连接会被关闭。可调整为无限:
play.server.http.idleTimeout = "infinite"
或通过命令行:
sbt -Dhttp.idleTimeout=infinite run
⚠️ 开发环境可在 build.sbt
中配置(仅开发生效):
PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"
也可设为具体时间:
PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"
更多配置详见 Play 官方文档。
8. 总结
本文展示了如何在 Play Framework 中结合 Akka 实现 WebSocket:
- ✅ 使用
ActorFlow
将 Actor 集成到 WebSocket Flow - ✅ 通过 Akka HTTP 调用外部 REST 接口
- ✅ 前端使用原生 WebSocket API 实现通信
- ✅ 使用 Akka Streams 实现定时消息推送
- ✅ 配置连接超时与帧大小等关键参数
完整源码见 GitHub 仓库。
踩坑提示:记得消费并丢弃 HTTP 响应实体,否则可能引发内存泄漏!