1. 简介

本文深入浅出地介绍 Spring MVC 5.x 中用于异步和流式数据处理的三大核心组件:

  • ResponseBodyEmitter
  • SseEmitter
  • StreamingResponseBody

这些工具在构建高并发、实时响应的 Web 应用时非常实用,尤其适合推送日志、实时通知、长轮询等场景。我们还会演示如何通过 JavaScript 客户端与这些接口交互。

✅ 适合有 Spring 基础的开发者快速上手
⚠️ 不建议初学者直接用于生产,需理解底层线程模型和连接管理


2. ResponseBodyEmitter

ResponseBodyEmitter 是 Spring MVC 提供的异步响应机制,允许服务端分批发送数据,直到连接关闭。

它本身是一个抽象父类,支持多种子类扩展(比如 SseEmitter),适用于需要多次写入响应体的场景。

2.1 服务端实现

使用 ResponseBodyEmitter 时,推荐配合独立线程池处理耗时操作,避免阻塞主线程。同时可以将其包装进 ResponseEntity,便于控制状态码等元信息。

@Controller
public class ResponseBodyEmitterController {
 
    private ExecutorService executor = Executors.newCachedThreadPool();

    @GetMapping("/rbe")
    public ResponseEntity<ResponseBodyEmitter> handleRbe() {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        executor.execute(() -> {
            try {
                emitter.send("/rbe @ " + new Date(), MediaType.TEXT_PLAIN);
                emitter.complete();
            } catch (Exception ex) {
                emitter.completeWithError(ex);
            }
        });
        return new ResponseEntity<>(emitter, HttpStatus.OK);
    }
}

📌 关键点:

  • ✅ 不依赖 CompletableFuture@Async,更轻量
  • ✅ 手动控制线程生命周期,灵活性高
  • ❌ 需自行管理线程池,避免资源泄漏
  • ⚠️ 若未调用 complete(),连接会一直保持,直到超时

2.2 客户端调用

前端可通过标准 XHR 发起请求,像普通 AJAX 一样处理响应。

var xhr = function(url) {
    return new Promise(function(resolve, reject) {
        var xmhr = new XMLHttpRequest();
        xmhr.onreadystatechange = function() {
            if (xmhr.readyState === 4) {
                if (xmhr.status === 200) {
                    resolve(xmhr.responseText);
                } else {
                    reject(new Error(xmhr.statusText));
                }
            }
        };
        xmhr.onerror = function() {
            reject(new Error('Network error'));
        };
        xmhr.open("GET", url, true);
        xmhr.send();
    });
};

xhr('http://localhost:8080/javamvcasync/rbe')
  .then(function(success) {
      console.log('Received:', success);
  })
  .catch(function(err) {
      console.error('Error:', err);
  });

💡 虽然能拿到响应,但这是“一次性”响应,不是持续流。若要持续接收,得用 SSE。


3. SseEmitter

SseEmitterResponseBodyEmitter 的子类,专为 Server-Sent Events (SSE) 设计,天生支持浏览器端的事件流通信。

SSE 是一种基于 HTTP 的单向实时通信协议,服务端可连续推送消息,客户端通过 EventSource 接收。

3.1 服务端实现

@Controller
public class SseEmitterController {
    private ExecutorService nonBlockingService = Executors
      .newCachedThreadPool();
    
    @GetMapping("/sse")
    public SseEmitter handleSse() {
         SseEmitter emitter = new SseEmitter();
         nonBlockingService.execute(() -> {
             try {
                 emitter.send("/sse @ " + new Date());
                 // 可以多次 send,模拟持续推送
                 // emitter.send("event update", "data: update payload");
                 emitter.complete();
             } catch (Exception ex) {
                 emitter.completeWithError(ex);
             }
         });
         return emitter;
    }   
}

📌 特性说明:

  • ✅ 自动设置 Content-Type: text/event-stream
  • ✅ 支持 send(Object)send(Object, MediaType)send(String event, String data) 等多种格式
  • ✅ 天然适合“推模式”场景,如通知中心、实时监控
  • ❌ 只支持服务端 → 客户端 单向通信
  • ⚠️ 客户端断开后不会自动重连(除非前端手动处理)

💡 小贴士:生产环境中建议设置超时时间,例如 new SseEmitter(30_000L),防止连接长期挂起。

3.2 客户端调用

使用浏览器原生 EventSource API 接收 SSE 流:

var sse = new EventSource('http://localhost:8080/javamvcasync/sse');
sse.onmessage = function (evt) {
    var el = document.getElementById('sse');
    el.appendChild(document.createTextNode(evt.data));
    el.appendChild(document.createElement('br'));
};

sse.onerror = function (err) {
    console.error('SSE error:', err);
    // 可在此实现重连逻辑
};

📌 注意事项:

  • onmessage 监听默认事件(无 event 字段)
  • 若服务端发送 event: customEvent,可用 sse.addEventListener('customEvent', ...) 监听
  • 连接断开后需手动重建 EventSource

4. StreamingResponseBody

当你需要直接写入输出流(如下载大文件、生成 CSV/视频流),StreamingResponseBody 是最佳选择。

它允许你将逻辑写入 OutputStream,由 Spring 负责底层流传输。

4.1 服务端实现

@Controller
public class StreamingResponseBodyController {
 
    @GetMapping("/srb")
    public ResponseEntity<StreamingResponseBody> handleRbe() {
        StreamingResponseBody stream = out -> {
            String msg = "/srb @ " + new Date();
            out.write(msg.getBytes());
            out.flush(); // 建议显式 flush
        };
        return new ResponseEntity<>(stream, HttpStatus.OK);
    }
}

📌 使用场景:

  • ✅ 大文件下载(避免内存溢出)
  • ✅ 动态生成报表、日志流
  • ✅ 音视频流式传输
  • ❌ 不适合需要分段推送的“事件流”

⚠️ 注意:StreamingResponseBody 是一次性写入,不像 SseEmitter 支持多 send()。若需多次写入,请用 ResponseBodyEmitterSseEmitter

4.2 客户端调用

依然使用 XHR 即可:

var xhr = function(url) {
    return new Promise(function(resolve, reject) {
        var xmhr = new XMLHttpRequest();
        xmhr.onreadystatechange = function() {
            if (xmhr.readyState === 4) {
                if (xmhr.status === 200) {
                    resolve(xmhr.responseText);
                } else {
                    reject(new Error(xmhr.statusText));
                }
            }
        };
        xmhr.onerror = function() {
            reject(new Error('Network error'));
        };
        xmhr.open("GET", url, true);
        xmhr.send();
    });
};

xhr('http://localhost:8080/javamvcasync/srb')
  .then(function(success) {
      console.log('Stream result:', success);
  })
  .catch(function(err) {
      console.error('Error:', err);
  });

📌 提示:若返回内容较大,建议设置 responseType = 'text''blob' 以正确接收流数据。


5. 综合效果演示

启动应用后,访问提供的 index.jsp 页面,前端会自动调用各个接口并展示结果。

在浏览器中你会看到类似如下界面:

SpringMVCAsyncResponseReques.v3

同时服务端控制台输出对应时间戳日志:

Terminal

你也可以直接通过 curl 或 Postman 调用接口,观察流式响应的实时输出效果。

例如:

curl http://localhost:8080/javamvcasync/sse

你会看到类似如下输出,并持续保持连接:

data:/sse @ Mon Aug 28 10:30:45 CST 2023

6. 总结

虽然 FutureCompletableFuture 在 Spring 异步编程中表现稳定,但在处理高并发流式数据时,以下三类组件更具优势:

类型 适用场景 是否支持多 send() 是否 SSE 兼容
ResponseBodyEmitter 通用异步流
SseEmitter 服务端事件推送
StreamingResponseBody 文件/大数据流传输 ❌(一次性)

📌 选型建议:

  • 实时通知、监控 → 用 SseEmitter
  • 大文件下载、流式生成 → 用 StreamingResponseBody
  • 自定义异步流协议 → 用 ResponseBodyEmitter

✅ 踩坑提醒:
务必配置合理的超时机制和异常处理(completeWithError()),否则容易引发连接堆积,拖垮 Tomcat 线程池。

合理使用这三类工具,能让你的 Spring MVC 应用轻松支持“推”模式和流式响应,告别轮询,提升用户体验。


原始标题:Spring MVC Streaming and SSE Request Processing | Baeldung