1. 简介
本文深入浅出地介绍 Spring MVC 5.x 中用于异步和流式数据处理的三大核心组件:
ResponseBodyEmitter
SseEmitter
StreamingResponseBody
这些工具在构建高并发、实时响应的 Web 应用时非常实用,尤其适合推送日志、实时通知、长轮询等场景。我们还会演示如何通过 JavaScript 客户端与这些接口交互。
✅ 适合有 Spring 基础的开发者快速上手
⚠️ 不建议初学者直接用于生产,需理解底层线程模型和连接管理
2. ResponseBodyEmitter
ResponseBodyEmitter
是 Spring MVC 提供的异步响应机制,允许服务端分批发送数据,直到连接关闭。
它本身是一个抽象父类,支持多种子类扩展(比如 SseEmitter
),适用于需要多次写入响应体的场景。
2.1 服务端实现
使用 ResponseBodyEmitter
时,推荐配合独立线程池处理耗时操作,避免阻塞主线程。同时可以将其包装进 ResponseEntity
,便于控制状态码等元信息。
@Controller
public class ResponseBodyEmitterController {
private ExecutorService executor = Executors.newCachedThreadPool();
@GetMapping("/rbe")
public ResponseEntity<ResponseBodyEmitter> handleRbe() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
executor.execute(() -> {
try {
emitter.send("/rbe @ " + new Date(), MediaType.TEXT_PLAIN);
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return new ResponseEntity<>(emitter, HttpStatus.OK);
}
}
📌 关键点:
- ✅ 不依赖
CompletableFuture
或@Async
,更轻量 - ✅ 手动控制线程生命周期,灵活性高
- ❌ 需自行管理线程池,避免资源泄漏
- ⚠️ 若未调用
complete()
,连接会一直保持,直到超时
2.2 客户端调用
前端可通过标准 XHR 发起请求,像普通 AJAX 一样处理响应。
var xhr = function(url) {
return new Promise(function(resolve, reject) {
var xmhr = new XMLHttpRequest();
xmhr.onreadystatechange = function() {
if (xmhr.readyState === 4) {
if (xmhr.status === 200) {
resolve(xmhr.responseText);
} else {
reject(new Error(xmhr.statusText));
}
}
};
xmhr.onerror = function() {
reject(new Error('Network error'));
};
xmhr.open("GET", url, true);
xmhr.send();
});
};
xhr('http://localhost:8080/javamvcasync/rbe')
.then(function(success) {
console.log('Received:', success);
})
.catch(function(err) {
console.error('Error:', err);
});
💡 虽然能拿到响应,但这是“一次性”响应,不是持续流。若要持续接收,得用 SSE。
3. SseEmitter
SseEmitter
是 ResponseBodyEmitter
的子类,专为 Server-Sent Events (SSE) 设计,天生支持浏览器端的事件流通信。
SSE 是一种基于 HTTP 的单向实时通信协议,服务端可连续推送消息,客户端通过 EventSource
接收。
3.1 服务端实现
@Controller
public class SseEmitterController {
private ExecutorService nonBlockingService = Executors
.newCachedThreadPool();
@GetMapping("/sse")
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter();
nonBlockingService.execute(() -> {
try {
emitter.send("/sse @ " + new Date());
// 可以多次 send,模拟持续推送
// emitter.send("event update", "data: update payload");
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
}
📌 特性说明:
- ✅ 自动设置
Content-Type: text/event-stream
- ✅ 支持
send(Object)
、send(Object, MediaType)
、send(String event, String data)
等多种格式 - ✅ 天然适合“推模式”场景,如通知中心、实时监控
- ❌ 只支持服务端 → 客户端 单向通信
- ⚠️ 客户端断开后不会自动重连(除非前端手动处理)
💡 小贴士:生产环境中建议设置超时时间,例如
new SseEmitter(30_000L)
,防止连接长期挂起。
3.2 客户端调用
使用浏览器原生 EventSource
API 接收 SSE 流:
var sse = new EventSource('http://localhost:8080/javamvcasync/sse');
sse.onmessage = function (evt) {
var el = document.getElementById('sse');
el.appendChild(document.createTextNode(evt.data));
el.appendChild(document.createElement('br'));
};
sse.onerror = function (err) {
console.error('SSE error:', err);
// 可在此实现重连逻辑
};
📌 注意事项:
onmessage
监听默认事件(无 event 字段)- 若服务端发送
event: customEvent
,可用sse.addEventListener('customEvent', ...)
监听 - 连接断开后需手动重建
EventSource
4. StreamingResponseBody
当你需要直接写入输出流(如下载大文件、生成 CSV/视频流),StreamingResponseBody
是最佳选择。
它允许你将逻辑写入 OutputStream
,由 Spring 负责底层流传输。
4.1 服务端实现
@Controller
public class StreamingResponseBodyController {
@GetMapping("/srb")
public ResponseEntity<StreamingResponseBody> handleRbe() {
StreamingResponseBody stream = out -> {
String msg = "/srb @ " + new Date();
out.write(msg.getBytes());
out.flush(); // 建议显式 flush
};
return new ResponseEntity<>(stream, HttpStatus.OK);
}
}
📌 使用场景:
- ✅ 大文件下载(避免内存溢出)
- ✅ 动态生成报表、日志流
- ✅ 音视频流式传输
- ❌ 不适合需要分段推送的“事件流”
⚠️ 注意:StreamingResponseBody
是一次性写入,不像 SseEmitter
支持多 send()
。若需多次写入,请用 ResponseBodyEmitter
或 SseEmitter
。
4.2 客户端调用
依然使用 XHR 即可:
var xhr = function(url) {
return new Promise(function(resolve, reject) {
var xmhr = new XMLHttpRequest();
xmhr.onreadystatechange = function() {
if (xmhr.readyState === 4) {
if (xmhr.status === 200) {
resolve(xmhr.responseText);
} else {
reject(new Error(xmhr.statusText));
}
}
};
xmhr.onerror = function() {
reject(new Error('Network error'));
};
xmhr.open("GET", url, true);
xmhr.send();
});
};
xhr('http://localhost:8080/javamvcasync/srb')
.then(function(success) {
console.log('Stream result:', success);
})
.catch(function(err) {
console.error('Error:', err);
});
📌 提示:若返回内容较大,建议设置 responseType = 'text'
或 'blob'
以正确接收流数据。
5. 综合效果演示
启动应用后,访问提供的 index.jsp
页面,前端会自动调用各个接口并展示结果。
在浏览器中你会看到类似如下界面:
同时服务端控制台输出对应时间戳日志:
你也可以直接通过 curl 或 Postman 调用接口,观察流式响应的实时输出效果。
例如:
curl http://localhost:8080/javamvcasync/sse
你会看到类似如下输出,并持续保持连接:
data:/sse @ Mon Aug 28 10:30:45 CST 2023
6. 总结
虽然 Future
和 CompletableFuture
在 Spring 异步编程中表现稳定,但在处理高并发流式数据时,以下三类组件更具优势:
类型 | 适用场景 | 是否支持多 send() |
是否 SSE 兼容 |
---|---|---|---|
ResponseBodyEmitter |
通用异步流 | ✅ | ❌ |
SseEmitter |
服务端事件推送 | ✅ | ✅ |
StreamingResponseBody |
文件/大数据流传输 | ❌(一次性) | ❌ |
📌 选型建议:
- 实时通知、监控 → 用
SseEmitter
- 大文件下载、流式生成 → 用
StreamingResponseBody
- 自定义异步流协议 → 用
ResponseBodyEmitter
✅ 踩坑提醒:
务必配置合理的超时机制和异常处理(completeWithError()
),否则容易引发连接堆积,拖垮 Tomcat 线程池。
合理使用这三类工具,能让你的 Spring MVC 应用轻松支持“推”模式和流式响应,告别轮询,提升用户体验。