1. 概述
本文将使用 Spring 5 的新 WebSocket API 结合 Spring WebFlux 的响应式特性,快速构建一个实战示例。
WebSocket 是一种广为人知的协议,支持客户端与服务器之间的全双工通信,通常用于需要高频次、低延迟事件交换的 Web 应用场景。
Spring Framework 5 对框架中的 WebSocket 支持进行了现代化改造,为这个通信通道增加了响应式能力。
关于 Spring WebFlux 的更多细节可参考官方文档。
2. Maven 依赖
我们将使用 spring-boot-starter-integration 和 spring-boot-starter-webflux 的 starter 依赖,这些依赖当前可在 Spring 里程碑仓库 获取。
本示例使用最新可用版本 2.0.0.M7,实际使用时应始终获取 Maven 仓库中的最新版本:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
3. Spring WebSocket 配置
配置相当简单:我们将注入 WebSocketHandler
来处理 Spring WebSocket 应用中的 socket 会话。
@Autowired
private WebSocketHandler webSocketHandler;
接着创建一个 HandlerMapping
bean 方法,负责处理请求与处理器对象之间的映射:
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
可连接的 URL 为:ws://localhost:<port>/event-emitter
4. Spring WebSocket 消息处理
ReactiveWebSocketHandler
类负责管理服务器端的 WebSocket 会话。
它实现了 WebSocketHandler
接口,因此需要重写 handle
方法,该方法用于向 WebSocket 客户端发送消息:
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
// private fields ...
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText)
.log());
}
}
5. 创建简单的响应式 WebSocket 客户端
现在创建一个 Spring 响应式 WebSocket 客户端,用于连接并与 WebSocket 服务器交换信息。
5.1 Maven 依赖
首先添加 Maven 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
这里使用与之前设置响应式 WebSocket 服务器相同的 spring-boot-starter-webflux
。
5.2 WebSocket 客户端
创建 ReactiveClientWebSocket
类,负责启动与服务器的通信:
public class ReactiveJavaClientWebSocket {
public static void main(String[] args) throws InterruptedException {
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create("ws://localhost:8080/event-emitter"),
session -> session.send(
Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log())
.then())
.block(Duration.ofSeconds(10L));
}
}
代码说明:
- 使用
ReactorNettyWebSocketClient
(基于 Reactor Netty 的WebSocketClient
实现) - 通过 URL
ws://localhost:8080/event-emitter
连接服务器 - 连接建立后立即发送消息:
"event-spring-reactive-client-websocket"
send
方法期望Publisher<T>
类型参数(此处为Mono<T>
)thenMany(...)
处理接收的消息流(Flux<String>
)block()
强制客户端在指定时间(10秒)后断开连接
5.3 启动客户端
运行前确保响应式 WebSocket 服务器已启动。然后执行 ReactiveJavaClientWebSocket
,可在控制台日志看到发出的事件:
[reactor-http-nio-4] INFO reactor.Flux.Map.1 -
onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",
"eventDt":"2018-01-11T23:29:26.900"})
服务器日志中会显示客户端在连接时发送的消息:
[reactor-http-nio-2] reactor.Flux.Map.1:
onNext(event-me-from-reactive-java-client)
客户端完成请求后(10秒后)会出现连接终止消息:
[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()
6. 创建浏览器 WebSocket 客户端
创建一个简单的 HTML/JavaScript WebSocket 客户端来消费响应式 WebSocket 服务器:
<div class="events"></div>
<script>
var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
clientWebSocket.onopen = function() {
console.log("clientWebSocket.onopen", clientWebSocket);
console.log("clientWebSocket.readyState", "websocketstatus");
clientWebSocket.send("event-me-from-browser");
}
clientWebSocket.onclose = function(error) {
console.log("clientWebSocket.onclose", clientWebSocket, error);
events("Closing connection");
}
clientWebSocket.onerror = function(error) {
console.log("clientWebSocket.onerror", clientWebSocket, error);
events("An error occured");
}
clientWebSocket.onmessage = function(error) {
console.log("clientWebSocket.onmessage", clientWebSocket, error);
events(error.data);
}
function events(responseEvent) {
document.querySelector(".events").innerHTML += responseEvent + "<br>";
}
</script>
启动 WebSocket 服务器后,在浏览器(Chrome/IE/Firefox 等)中打开此 HTML 文件,将看到事件按服务器定义的 1 秒间隔打印在屏幕上:
{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2018-01-11T23:56:09.780"}
{"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2018-01-11T23:56:09.781"}
{"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2018-01-11T23:56:09.782"}
7. 总结
本文展示了使用 Spring 5 框架并借助 Spring WebFlux 响应式特性,在服务器与客户端之间创建 WebSocket 通信的实战案例。
完整示例代码可在我们的 GitHub 仓库 中获取。