1. 概述

Server-Sent Events(SSE)是一种基于 HTTP 的协议规范,用于建立从服务器到客户端的长连接,且是单向通信(服务器 → 客户端)

客户端通过在 Accept 请求头中指定媒体类型 text/event-stream 来发起 SSE 连接。一旦连接建立,服务器就可以持续推送数据,而无需客户端反复轮询。

关于该协议的详细定义,可以参考官方文档:Server-Sent Events Specification

本文将重点介绍 JAX-RS 2.1 对 SSE 的原生支持。你会学到:

  • 如何使用 JAX-RS 服务端 API 发送事件
  • 如何通过 JAX-RS 客户端 API 或普通 HTTP 工具(如 curl)消费这些事件

✅ 适用场景:实时日志推送、股票行情更新、通知系统等需要服务端主动推送给浏览器的场景。
❌ 不适合:需要客户端频繁发消息给服务端的双向通信(这种情况建议用 WebSocket)。


2. 理解 SSE 事件格式

SSE 事件本质上是一段文本,由若干字段组成,每条消息以两个换行符 \n\n 分隔。常见字段包括:

字段 说明
event: 事件类型。客户端可监听特定类型,实现多路复用
data: 实际传输的数据。支持多行拼接
id: 事件唯一标识。断线重连时会通过 Last-Event-ID 请求头带回,避免重复推送
retry: 重连间隔(毫秒)。客户端断开后等待多久尝试重连
:(冒号开头) 注释行,客户端忽略

示例:多行数据与多种事件类型

event: stock
id: 1
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":1,
data: "name":"GOOG","price":75.7119}

event: stock
id: 2
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":2,"name":"IBM","price":83.4611}

⚠️ 注意:data 可跨多行,最终拼接为完整 JSON;idretry 是可选但实用的字段。

在 JAX-RS 中,SSE 事件被抽象为两个核心接口:

  • OutboundSseEvent:服务端发送的事件
  • InboundSseEvent:客户端接收的事件

两者分别对应发送和接收上下文,职责清晰。


3. 发送 SSE 事件

3.1. 项目准备

假设你已有一个基于 Maven 的 JAX-RS 项目(如使用 Jersey、CXF 或 RESTEasy)。关键依赖需包含 JAX-RS 2.1+ 支持 SSE 的实现库。

例如使用 Apache CXF:

<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-rs-sse</artifactId>
    <version>4.0.0</version>
</dependency>

💡 提示:Jersey 用户需引入 org.glassfish.jersey.media:jersey-media-sse


3.2. 定义 SSE 接口

一个典型的 SSE 资源方法必须满足以下条件:

  • 使用 @Produces("text/event-stream")
  • 参数中注入 SseEventSink 用于发送事件
  • 可选注入 Sse 实例来构建事件
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink, @Context Sse sse) {
    // 发送逻辑
}

客户端请求时需带上:

GET /prices HTTP/1.1
Accept: text/event-stream

否则服务器不会按 SSE 格式响应。


3.3. 构建事件:Sse 与事件构造器

Sse 是一个上下文对象,由 JAX-RS 容器注入,提供两个核心能力:

  1. 创建事件构造器:newEventBuilder()
  2. 创建广播器:newBroadcaster()

初始化 Sse 实例

@Context
private void setSse(Sse sse) {
    this.sse = sse;
    this.eventBuilder = sse.newEventBuilder();
    this.sseBroadcaster = sse.newBroadcaster();
}

使用 OutboundSseEvent.Builder 构造事件

OutboundSseEvent sseEvent = this.eventBuilder
    .name("stock")                        // event: stock
    .id(String.valueOf(lastEventId))      // id: 1
    .mediaType(MediaType.APPLICATION_JSON_TYPE)
    .data(Stock.class, stock)             // data: {...}
    .reconnectDelay(4000)                 // retry: 4000
    .comment("price change")              // : price change
    .build();

📌 关键点:

  • mediaType() 决定序列化方式,默认是 text/plain,处理 POJO 时建议设为 application/json
  • JAX-RS 会自动查找合适的 MessageBodyWriter 进行序列化
  • 提供快捷方法简化简单场景:
OutboundSseEvent event1 = sse.newEvent("cool Event"); 
// 等价于只带 data 字段的事件

OutboundSseEvent event2 = sse.newEvent("typed event", "data Event");
// event: typed event + data: data Event

3.4. 发送单个客户端事件

SseEventSink 表示一条与客户端的连接通道,只能通过方法参数注入。

发送事件非常简单,调用 .send() 即可:

@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink) {
    int lastEventId = 1;
    try (SseEventSink sink = sseEventSink) { // 自动关闭
        while (running && !sink.isClosed()) {
            Stock stock = stockService.getNextTransaction(lastEventId);
            if (stock != null) {
                OutboundSseEvent event = eventBuilder
                    .name("stock")
                    .id(String.valueOf(lastEventId))
                    .data(Stock.class, stock)
                    .reconnectDelay(3000)
                    .build();
                sink.send(event);
                lastEventId++;
            }
            Thread.sleep(1000); // 模拟实时流
        }
    } catch (Exception e) {
        // 日志记录异常
    }
}

✅ 最佳实践:使用 try-with-resources 自动关闭 SseEventSink,避免资源泄漏。
❌ 错误做法:忘记关闭连接,导致线程或连接池耗尽。

前端访问示例地址即可查看效果:

http://localhost:9080/sse-jaxrs-server/sse.html

3.5. 广播事件给多个客户端

当多个客户端都需要接收相同事件时(如行情广播),使用 SseBroadcaster 更高效。

三步走策略:

  1. 创建广播器
SseBroadcaster sseBroadcaster = sse.newBroadcaster();
  1. 客户端订阅
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listen(@Context SseEventSink sseEventSink) {
    sseBroadcaster.register(sseEventSink);
}

MediaType.SERVER_SENT_EVENTStext/event-stream 的常量别名,语义更清晰。

  1. 触发广播
@GET
@Path("publish")
public void broadcast() {
    OutboundSseEvent event = sse.newEvent("market-update", "New stock price available");
    sseBroadcaster.broadcast(event);
}

所有已注册的 SseEventSink 都会收到该事件。

测试方式

打开多个浏览器窗口访问:

http://localhost:9080/sse-jaxrs-server/sse-broadcast.html

然后通过 curl 触发广播:

curl -X GET http://localhost:9080/sse-jaxrs-server/sse/stock/publish

所有页面应同时收到更新。

⚠️ 注意:SseBroadcaster 不是分布式,仅限当前 JVM 实例内有效。如需跨节点广播,需结合消息中间件(如 Kafka、RabbitMQ)。


4. 消费 SSE 事件

你可以用任意 HTTP 客户端消费 SSE,但 JAX-RS 提供了更优雅的客户端 API。


4.1. 使用 JAX-RS 客户端消费事件

引入客户端依赖(以 CXF 为例):

<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-rs-client</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-rs-sse</artifactId>
    <version>4.0.0</version>
</dependency>

核心是 SseEventSource,它封装了连接管理和事件回调。

基本用法

Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:9080/sse/stock/prices");

try (SseEventSource source = SseEventSource.target(target).build()) {
    source.register(
        (InboundSseEvent event) -> {
            System.out.println("Received event: " + event.getName());
            System.out.println("Data: " + event.readData());
        }
    );
    source.open(); // 阻塞直到连接关闭或异常
}

解析结构化数据

source.register((InboundSseEvent event) -> {
    Stock stock = event.readData(Stock.class, MediaType.APPLICATION_JSON_TYPE);
    System.out.println("Stock update: " + stock.getName() + " -> " + stock.getPrice());
});

JAX-RS 会自动调用 MessageBodyReader 反序列化 JSON。

✅ 优势:类型安全、自动序列化、支持重连机制。
💡 小技巧:可通过 event.getId() 获取事件 ID,用于断点续传调试。


5. 总结

本文系统讲解了 JAX-RS 2.1 中 SSE 的使用方式,涵盖:

  • ✅ 服务端如何构建并发送事件
  • ✅ 单播 vs 广播模式的实现
  • ✅ 客户端如何高效消费事件流
  • ✅ 实际开发中的注意事项(资源释放、媒体类型、重连等)

SSE 是轻量级实时通信的优秀选择,相比 WebSocket 更简单、兼容性更好,特别适合服务端主动推送的场景。

完整示例代码已托管至 GitHub:

👉 https://github.com/eugenp/tutorials/tree/master/apache-cxf-modules/sse-jaxrs

建议动手实践,踩一遍坑才能真正掌握。


原始标题:Server-Sent Events (SSE) in JAX-RS