1. 概述

本文将带你了解如何在 Spring 中实现基于 Server-Sent Events(SSE)的接口。

简单来说,Server-Sent Events 是一种 HTTP 标准,允许 Web 应用处理单向事件流,当服务器有新数据时,客户端可以自动接收到更新。

Spring 从 4.2 版本开始支持 SSE,但从 Spring 5 开始,我们有了更优雅、更符合响应式编程风格的方式来处理 SSE。

2. 使用 Spring 6 WebFlux 实现 SSE

在 Spring WebFlux 中实现 SSE,我们可以使用 Reactor 库提供的 Flux 类,或者使用 ServerSentEvent 实体,后者可以更好地控制事件的元数据。

2.1. 使用 Flux 流式发送事件

Flux 是一个响应式事件流的表示,其行为会根据请求或响应的媒体类型自动调整。

要创建一个 SSE 接口,我们需要遵循 W3C 的规范,并指定响应类型为 text/event-stream

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> "Flux - " + LocalTime.now().toString());
}

interval 方法会创建一个每隔一秒递增的 Flux<Long>,我们再将这些值映射为我们需要的输出内容。

启动项目后访问该接口,就可以看到浏览器每秒自动接收到一次事件推送。

如需深入了解 FluxReactor Core,可以参考 这篇介绍

2.2. 使用 ServerSentEvent 控制事件元数据

接下来,我们将字符串数据包装为 ServerSentEvent 对象,看看它带来的好处:

@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> ServerSentEvent.<String> builder()
        .id(String.valueOf(sequence))
        .event("periodic-event")
        .data("SSE - " + LocalTime.now().toString())
        .build());
}

使用 ServerSentEvent 有几个明显优势:

✅ 可以控制事件的元数据(如 ideventdata 等),这在实际项目中是必需的
✅ 不再需要手动设置 text/event-stream 媒体类型

在这个例子中,我们指定了事件的 idevent 名称和 data 内容。此外,还可以添加 commentretry 字段,控制客户端的重连时间。

2.3. 使用 WebClient 消费 SSE

现在我们用 WebClient 来消费这个事件流:

public void consumeServerSentEvent() {
    WebClient client = WebClient.create("http://localhost:8080/sse-server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream-sse")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}

subscribe 方法允许我们分别处理三种情况:成功接收事件、发生错误、流完成。

在这个例子中,我们使用了 retrieve() 方法,这是获取响应体最简单的方式。

⚠️ 如果服务器返回 4xx 或 5xx 状态码,retrieve() 会抛出 WebClientResponseException,除非你通过 onStatus() 显式处理。

当然,你也可以使用 exchange() 方法,它提供对 ClientResponse 的完全控制,并且不会自动抛出异常。

💡 如果你不需要元数据,也可以跳过 ServerSentEvent 包装,直接消费原始数据。

3. 在 Spring MVC 中实现 SSE

如前所述,SSE 从 Spring 4.2 就开始支持,当时引入了 SseEmitter 类。

简单来说,我们需要定义一个 ExecutorService,在其中运行 SseEmitter 实例,持续推送数据,并保持连接开放:

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                  .data("SSE MVC - " + LocalTime.now().toString())
                  .id(String.valueOf(i))
                  .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}

⚠️ 务必根据实际场景选择合适的 ExecutorService,避免线程资源浪费或阻塞。

如需更深入理解 Spring MVC 中的 SSE 实现,可以参考 这篇教程

4. 深入理解 Server-Sent Events

在掌握了如何实现 SSE 接口之后,我们来深入理解一些核心概念。

SSE 是一种浏览器广泛支持的规范,用于实现服务器向客户端单向推送事件。

事件内容本质上就是一串 UTF-8 编码的文本,格式由规范定义:

  • 每个事件由一系列键值对组成,包括:idretrydataevent(事件名称)
  • 支持注释(以 : 开头)
  • 数据格式不限,可以是普通字符串,也可以是 JSON 或 XML

✅ SSE 是单向通信,而 WebSockets 是双向通信
✅ WebSockets 不是基于 HTTP 的协议,也没有标准的错误处理机制,而 SSE 则基于 HTTP,兼容性更好

5. 总结

总结一下,SSE 是一种非常实用的事件推送技术,尤其适合需要服务器主动推送数据的场景。

通过本文的介绍和示例,你应该已经掌握了:

  • 如何在 Spring WebFlux 和 Spring MVC 中实现 SSE 接口
  • 如何使用 FluxServerSentEvent 控制事件流
  • 如何使用 WebClient 消费 SSE 接口
  • SSE 与 WebSocket 的区别

所有示例代码都可以在 GitHub 仓库 找到。


原始标题:Server-Sent Events in Spring | Baeldung