1. 概述
本文将带你了解如何在 Spring 中实现基于 Server-Sent Events(SSE)的接口。
简单来说,Server-Sent Events 是一种 HTTP 标准,允许 Web 应用处理单向事件流,当服务器有新数据时,客户端可以自动接收到更新。
Spring 从 4.2 版本开始支持 SSE,但从 Spring 5 开始,我们有了更优雅、更符合响应式编程风格的方式来处理 SSE。
2. 使用 Spring 6 WebFlux 实现 SSE
在 Spring WebFlux 中实现 SSE,我们可以使用 Reactor 库提供的 Flux
类,或者使用 ServerSentEvent
实体,后者可以更好地控制事件的元数据。
2.1. 使用 Flux
流式发送事件
Flux
是一个响应式事件流的表示,其行为会根据请求或响应的媒体类型自动调整。
要创建一个 SSE 接口,我们需要遵循 W3C 的规范,并指定响应类型为 text/event-stream
:
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}
interval
方法会创建一个每隔一秒递增的 Flux<Long>
,我们再将这些值映射为我们需要的输出内容。
启动项目后访问该接口,就可以看到浏览器每秒自动接收到一次事件推送。
如需深入了解 Flux
和 Reactor Core
,可以参考 这篇介绍。
2.2. 使用 ServerSentEvent
控制事件元数据
接下来,我们将字符串数据包装为 ServerSentEvent
对象,看看它带来的好处:
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}
使用 ServerSentEvent
有几个明显优势:
✅ 可以控制事件的元数据(如 id
、event
、data
等),这在实际项目中是必需的
✅ 不再需要手动设置 text/event-stream
媒体类型
在这个例子中,我们指定了事件的 id
、event
名称和 data
内容。此外,还可以添加 comment
和 retry
字段,控制客户端的重连时间。
2.3. 使用 WebClient 消费 SSE
现在我们用 WebClient
来消费这个事件流:
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
subscribe
方法允许我们分别处理三种情况:成功接收事件、发生错误、流完成。
在这个例子中,我们使用了 retrieve()
方法,这是获取响应体最简单的方式。
⚠️ 如果服务器返回 4xx 或 5xx 状态码,retrieve()
会抛出 WebClientResponseException
,除非你通过 onStatus()
显式处理。
当然,你也可以使用 exchange()
方法,它提供对 ClientResponse
的完全控制,并且不会自动抛出异常。
💡 如果你不需要元数据,也可以跳过 ServerSentEvent
包装,直接消费原始数据。
3. 在 Spring MVC 中实现 SSE
如前所述,SSE 从 Spring 4.2 就开始支持,当时引入了 SseEmitter
类。
简单来说,我们需要定义一个 ExecutorService
,在其中运行 SseEmitter
实例,持续推送数据,并保持连接开放:
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now().toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
⚠️ 务必根据实际场景选择合适的 ExecutorService
,避免线程资源浪费或阻塞。
如需更深入理解 Spring MVC 中的 SSE 实现,可以参考 这篇教程。
4. 深入理解 Server-Sent Events
在掌握了如何实现 SSE 接口之后,我们来深入理解一些核心概念。
SSE 是一种浏览器广泛支持的规范,用于实现服务器向客户端单向推送事件。
事件内容本质上就是一串 UTF-8 编码的文本,格式由规范定义:
- 每个事件由一系列键值对组成,包括:
id
、retry
、data
、event
(事件名称) - 支持注释(以
:
开头) - 数据格式不限,可以是普通字符串,也可以是 JSON 或 XML
✅ SSE 是单向通信,而 WebSockets 是双向通信
✅ WebSockets 不是基于 HTTP 的协议,也没有标准的错误处理机制,而 SSE 则基于 HTTP,兼容性更好
5. 总结
总结一下,SSE 是一种非常实用的事件推送技术,尤其适合需要服务器主动推送数据的场景。
通过本文的介绍和示例,你应该已经掌握了:
- 如何在 Spring WebFlux 和 Spring MVC 中实现 SSE 接口
- 如何使用
Flux
和ServerSentEvent
控制事件流 - 如何使用
WebClient
消费 SSE 接口 - SSE 与 WebSocket 的区别
所有示例代码都可以在 GitHub 仓库 找到。