1. 概述
Server-Sent Events(SSE)是一种基于 HTTP 的协议规范,用于建立从服务器到客户端的长连接,且是单向通信(服务器 → 客户端)。
客户端通过在 Accept
请求头中指定媒体类型 text/event-stream
来发起 SSE 连接。一旦连接建立,服务器就可以持续推送数据,而无需客户端反复轮询。
关于该协议的详细定义,可以参考官方文档:Server-Sent Events Specification。
本文将重点介绍 JAX-RS 2.1 对 SSE 的原生支持。你会学到:
- 如何使用 JAX-RS 服务端 API 发送事件
- 如何通过 JAX-RS 客户端 API 或普通 HTTP 工具(如
curl
)消费这些事件
✅ 适用场景:实时日志推送、股票行情更新、通知系统等需要服务端主动推送给浏览器的场景。
❌ 不适合:需要客户端频繁发消息给服务端的双向通信(这种情况建议用 WebSocket)。
2. 理解 SSE 事件格式
SSE 事件本质上是一段文本,由若干字段组成,每条消息以两个换行符 \n\n
分隔。常见字段包括:
字段 | 说明 |
---|---|
event: |
事件类型。客户端可监听特定类型,实现多路复用 |
data: |
实际传输的数据。支持多行拼接 |
id: |
事件唯一标识。断线重连时会通过 Last-Event-ID 请求头带回,避免重复推送 |
retry: |
重连间隔(毫秒)。客户端断开后等待多久尝试重连 |
: (冒号开头) |
注释行,客户端忽略 |
示例:多行数据与多种事件类型
event: stock
id: 1
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":1,
data: "name":"GOOG","price":75.7119}
event: stock
id: 2
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":2,"name":"IBM","price":83.4611}
⚠️ 注意:
data
可跨多行,最终拼接为完整 JSON;id
和retry
是可选但实用的字段。
在 JAX-RS 中,SSE 事件被抽象为两个核心接口:
- ✅
OutboundSseEvent
:服务端发送的事件 - ✅
InboundSseEvent
:客户端接收的事件
两者分别对应发送和接收上下文,职责清晰。
3. 发送 SSE 事件
3.1. 项目准备
假设你已有一个基于 Maven 的 JAX-RS 项目(如使用 Jersey、CXF 或 RESTEasy)。关键依赖需包含 JAX-RS 2.1+ 支持 SSE 的实现库。
例如使用 Apache CXF:
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-sse</artifactId>
<version>4.0.0</version>
</dependency>
💡 提示:Jersey 用户需引入
org.glassfish.jersey.media:jersey-media-sse
。
3.2. 定义 SSE 接口
一个典型的 SSE 资源方法必须满足以下条件:
- 使用
@Produces("text/event-stream")
- 参数中注入
SseEventSink
用于发送事件 - 可选注入
Sse
实例来构建事件
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink, @Context Sse sse) {
// 发送逻辑
}
客户端请求时需带上:
GET /prices HTTP/1.1
Accept: text/event-stream
否则服务器不会按 SSE 格式响应。
3.3. 构建事件:Sse 与事件构造器
Sse
是一个上下文对象,由 JAX-RS 容器注入,提供两个核心能力:
- 创建事件构造器:
newEventBuilder()
- 创建广播器:
newBroadcaster()
初始化 Sse 实例
@Context
private void setSse(Sse sse) {
this.sse = sse;
this.eventBuilder = sse.newEventBuilder();
this.sseBroadcaster = sse.newBroadcaster();
}
使用 OutboundSseEvent.Builder 构造事件
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock") // event: stock
.id(String.valueOf(lastEventId)) // id: 1
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock) // data: {...}
.reconnectDelay(4000) // retry: 4000
.comment("price change") // : price change
.build();
📌 关键点:
mediaType()
决定序列化方式,默认是text/plain
,处理 POJO 时建议设为application/json
- JAX-RS 会自动查找合适的
MessageBodyWriter
进行序列化 - 提供快捷方法简化简单场景:
OutboundSseEvent event1 = sse.newEvent("cool Event");
// 等价于只带 data 字段的事件
OutboundSseEvent event2 = sse.newEvent("typed event", "data Event");
// event: typed event + data: data Event
3.4. 发送单个客户端事件
SseEventSink
表示一条与客户端的连接通道,只能通过方法参数注入。
发送事件非常简单,调用 .send()
即可:
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink) {
int lastEventId = 1;
try (SseEventSink sink = sseEventSink) { // 自动关闭
while (running && !sink.isClosed()) {
Stock stock = stockService.getNextTransaction(lastEventId);
if (stock != null) {
OutboundSseEvent event = eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.data(Stock.class, stock)
.reconnectDelay(3000)
.build();
sink.send(event);
lastEventId++;
}
Thread.sleep(1000); // 模拟实时流
}
} catch (Exception e) {
// 日志记录异常
}
}
✅ 最佳实践:使用
try-with-resources
自动关闭SseEventSink
,避免资源泄漏。
❌ 错误做法:忘记关闭连接,导致线程或连接池耗尽。
前端访问示例地址即可查看效果:
http://localhost:9080/sse-jaxrs-server/sse.html
3.5. 广播事件给多个客户端
当多个客户端都需要接收相同事件时(如行情广播),使用 SseBroadcaster
更高效。
三步走策略:
- 创建广播器
SseBroadcaster sseBroadcaster = sse.newBroadcaster();
- 客户端订阅
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listen(@Context SseEventSink sseEventSink) {
sseBroadcaster.register(sseEventSink);
}
✅
MediaType.SERVER_SENT_EVENTS
是text/event-stream
的常量别名,语义更清晰。
- 触发广播
@GET
@Path("publish")
public void broadcast() {
OutboundSseEvent event = sse.newEvent("market-update", "New stock price available");
sseBroadcaster.broadcast(event);
}
所有已注册的 SseEventSink
都会收到该事件。
测试方式
打开多个浏览器窗口访问:
http://localhost:9080/sse-jaxrs-server/sse-broadcast.html
然后通过 curl
触发广播:
curl -X GET http://localhost:9080/sse-jaxrs-server/sse/stock/publish
所有页面应同时收到更新。
⚠️ 注意:
SseBroadcaster
不是分布式,仅限当前 JVM 实例内有效。如需跨节点广播,需结合消息中间件(如 Kafka、RabbitMQ)。
4. 消费 SSE 事件
你可以用任意 HTTP 客户端消费 SSE,但 JAX-RS 提供了更优雅的客户端 API。
4.1. 使用 JAX-RS 客户端消费事件
引入客户端依赖(以 CXF 为例):
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-client</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-sse</artifactId>
<version>4.0.0</version>
</dependency>
核心是 SseEventSource
,它封装了连接管理和事件回调。
基本用法
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:9080/sse/stock/prices");
try (SseEventSource source = SseEventSource.target(target).build()) {
source.register(
(InboundSseEvent event) -> {
System.out.println("Received event: " + event.getName());
System.out.println("Data: " + event.readData());
}
);
source.open(); // 阻塞直到连接关闭或异常
}
解析结构化数据
source.register((InboundSseEvent event) -> {
Stock stock = event.readData(Stock.class, MediaType.APPLICATION_JSON_TYPE);
System.out.println("Stock update: " + stock.getName() + " -> " + stock.getPrice());
});
JAX-RS 会自动调用 MessageBodyReader
反序列化 JSON。
✅ 优势:类型安全、自动序列化、支持重连机制。
💡 小技巧:可通过event.getId()
获取事件 ID,用于断点续传调试。
5. 总结
本文系统讲解了 JAX-RS 2.1 中 SSE 的使用方式,涵盖:
- ✅ 服务端如何构建并发送事件
- ✅ 单播 vs 广播模式的实现
- ✅ 客户端如何高效消费事件流
- ✅ 实际开发中的注意事项(资源释放、媒体类型、重连等)
SSE 是轻量级实时通信的优秀选择,相比 WebSocket 更简单、兼容性更好,特别适合服务端主动推送的场景。
完整示例代码已托管至 GitHub:
👉 https://github.com/eugenp/tutorials/tree/master/apache-cxf-modules/sse-jaxrs
建议动手实践,踩一遍坑才能真正掌握。