1. 概述

本文将使用 Spring 5 的新 WebSocket API 结合 Spring WebFlux 的响应式特性,快速构建一个实战示例。

WebSocket 是一种广为人知的协议,支持客户端与服务器之间的全双工通信,通常用于需要高频次、低延迟事件交换的 Web 应用场景。

Spring Framework 5 对框架中的 WebSocket 支持进行了现代化改造,为这个通信通道增加了响应式能力。

关于 Spring WebFlux 的更多细节可参考官方文档

2. Maven 依赖

我们将使用 spring-boot-starter-integrationspring-boot-starter-webflux 的 starter 依赖,这些依赖当前可在 Spring 里程碑仓库 获取。

本示例使用最新可用版本 2.0.0.M7,实际使用时应始终获取 Maven 仓库中的最新版本:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3. Spring WebSocket 配置

配置相当简单:我们将注入 WebSocketHandler 来处理 Spring WebSocket 应用中的 socket 会话。

@Autowired
private WebSocketHandler webSocketHandler;

接着创建一个 HandlerMapping bean 方法,负责处理请求与处理器对象之间的映射:

@Bean
public HandlerMapping webSocketHandlerMapping() {
    Map<String, WebSocketHandler> map = new HashMap<>();
    map.put("/event-emitter", webSocketHandler);

    SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
    handlerMapping.setOrder(1);
    handlerMapping.setUrlMap(map);
    return handlerMapping;
}

可连接的 URL 为:ws://localhost:<port>/event-emitter

4. Spring WebSocket 消息处理

ReactiveWebSocketHandler 类负责管理服务器端的 WebSocket 会话。

它实现了 WebSocketHandler 接口,因此需要重写 handle 方法,该方法用于向 WebSocket 客户端发送消息:

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
    
    // private fields ...

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        return webSocketSession.send(intervalFlux
          .map(webSocketSession::textMessage))
          .and(webSocketSession.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .log());
    }
}

5. 创建简单的响应式 WebSocket 客户端

现在创建一个 Spring 响应式 WebSocket 客户端,用于连接并与 WebSocket 服务器交换信息。

5.1 Maven 依赖

首先添加 Maven 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

这里使用与之前设置响应式 WebSocket 服务器相同的 spring-boot-starter-webflux

5.2 WebSocket 客户端

创建 ReactiveClientWebSocket 类,负责启动与服务器的通信:

public class ReactiveJavaClientWebSocket {
 
    public static void main(String[] args) throws InterruptedException {
 
        WebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(
          URI.create("ws://localhost:8080/event-emitter"), 
          session -> session.send(
            Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
            .thenMany(session.receive()
              .map(WebSocketMessage::getPayloadAsText)
              .log())
            .then())
            .block(Duration.ofSeconds(10L));
    }
}

代码说明:

  • 使用 ReactorNettyWebSocketClient(基于 Reactor Netty 的 WebSocketClient 实现)
  • 通过 URL ws://localhost:8080/event-emitter 连接服务器
  • 连接建立后立即发送消息:"event-spring-reactive-client-websocket"
  • send 方法期望 Publisher<T> 类型参数(此处为 Mono<T>
  • thenMany(...) 处理接收的消息流(Flux<String>
  • block() 强制客户端在指定时间(10秒)后断开连接

5.3 启动客户端

运行前确保响应式 WebSocket 服务器已启动。然后执行 ReactiveJavaClientWebSocket,可在控制台日志看到发出的事件:

[reactor-http-nio-4] INFO reactor.Flux.Map.1 - 
onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",
"eventDt":"2018-01-11T23:29:26.900"})

服务器日志中会显示客户端在连接时发送的消息:

[reactor-http-nio-2] reactor.Flux.Map.1: 
onNext(event-me-from-reactive-java-client)

客户端完成请求后(10秒后)会出现连接终止消息:

[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()

6. 创建浏览器 WebSocket 客户端

创建一个简单的 HTML/JavaScript WebSocket 客户端来消费响应式 WebSocket 服务器:

<div class="events"></div>
<script>
    var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
    clientWebSocket.onopen = function() {
        console.log("clientWebSocket.onopen", clientWebSocket);
        console.log("clientWebSocket.readyState", "websocketstatus");
        clientWebSocket.send("event-me-from-browser");
    }
    clientWebSocket.onclose = function(error) {
        console.log("clientWebSocket.onclose", clientWebSocket, error);
        events("Closing connection");
    }
    clientWebSocket.onerror = function(error) {
        console.log("clientWebSocket.onerror", clientWebSocket, error);
        events("An error occured");
    }
    clientWebSocket.onmessage = function(error) {
        console.log("clientWebSocket.onmessage", clientWebSocket, error);
        events(error.data);
    }
    function events(responseEvent) {
        document.querySelector(".events").innerHTML += responseEvent + "<br>";
    }
</script>

启动 WebSocket 服务器后,在浏览器(Chrome/IE/Firefox 等)中打开此 HTML 文件,将看到事件按服务器定义的 1 秒间隔打印在屏幕上:

{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2018-01-11T23:56:09.780"}
{"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2018-01-11T23:56:09.781"}
{"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2018-01-11T23:56:09.782"}

7. 总结

本文展示了使用 Spring 5 框架并借助 Spring WebFlux 响应式特性,在服务器与客户端之间创建 WebSocket 通信的实战案例。

完整示例代码可在我们的 GitHub 仓库 中获取。


原始标题:Reactive WebSockets with Spring | Baeldung