1. 概述
RSocket 是一个基于响应式流(Reactive Streams)语义的应用层通信协议,可以作为 HTTP 的替代方案。它支持多种交互模型,如请求/响应(Request/Response)、单向通信(Fire-and-Forget)、请求流(Request Stream)、以及双向流(Channel),非常适合构建高性能、低延迟的分布式系统。
本文将介绍如何使用 Spring Boot 快速搭建基于 RSocket 的应用,并演示其支持的几种主要交互模型。
2. 添加依赖
首先,在你的 Spring Boot 项目中引入 RSocket 支持,只需添加如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
该依赖会自动引入以下核心库:
rsocket-core
:RSocket 协议的核心实现rsocket-transport-netty
:Netty 提供的传输层支持
✅ 使用 Spring Boot Starter 可以简化配置,避免手动集成 RSocket 的复杂性。
3. 示例应用
我们以一个模拟的“交易者”应用为例,演示 RSocket 的各种交互方式。该应用包含一个服务端和一个客户端。
3.1 服务端配置
Spring Boot 默认会为我们自动配置 RSocket 服务端。你可以通过 application.properties
文件修改默认配置,例如:
spring.rsocket.server.port=7000
这会将 RSocket 服务监听在本地 7000 端口上。
✅ 你还可以配置 SSL、传输协议(TCP/WebSocket)等高级选项。
3.2 客户端配置
虽然 Spring Boot 也自动配置了 RSocket 客户端相关组件,但我们需要手动定义 RSocketRequester
Bean:
@Configuration
public class ClientConfiguration {
@Bean
public RSocketRequester getRSocketRequester() {
return RSocketRequester.builder()
.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000);
}
}
⚠️ 注意:
- 使用
.tcp()
表示使用 TCP 协议连接服务端 reconnect()
配置了重连策略,增强客户端的健壮性
4. 请求/响应(Request/Response)
这是最常见的一种交互模型,客户端发送请求,服务端返回一次响应。
4.1 服务端处理
使用 @MessageMapping
注解来定义 RSocket 接口:
@Controller
public class MarketDataRSocketController {
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest request) {
return marketDataRepository.getOne(request.getStock());
}
}
✅ @MessageMapping
类似于 Spring MVC 中的 @RequestMapping
,用于定义 RSocket 的路由。
4.2 客户端调用
客户端使用 RSocketRequester
发起请求:
@RestController
public class MarketDataRestController {
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping("/current/{stock}")
public Publisher<MarketData> current(@PathVariable String stock) {
return rSocketRequester
.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}
✅ 使用 retrieveMono()
表示期望一个单次响应。
5. 单向通信(Fire-and-Forget)
客户端发送请求,不等待服务端响应,适用于日志上报、事件通知等场景。
5.1 服务端处理
@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
marketDataRepository.add(marketData);
return Mono.empty();
}
✅ 返回 Mono<Void>
表示不返回响应。
5.2 客户端调用
@GetMapping("/collect")
public Publisher<Void> collect() {
return rSocketRequester
.route("collectMarketData")
.data(getMarketData())
.send();
}
✅ 使用 send()
方法表示不期望响应。
6. 请求流(Request Stream)
客户端发送请求,服务端持续返回多个响应,适用于数据流推送场景。
6.1 服务端处理
@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest request) {
return marketDataRepository.getAll(request.getStock());
}
✅ 返回 Flux<MarketData>
表示多个响应。
6.2 客户端调用
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable String stock) {
return rSocketRequester
.route("feedMarketData")
.data(new MarketDataRequest(stock))
.retrieveFlux(MarketData.class);
}
✅ 使用 retrieveFlux()
表示期望多个响应。
⚠️ 注意设置 produces = MediaType.TEXT_EVENT_STREAM_VALUE
,以便浏览器等客户端能正确解析流式响应。
7. 异常处理
Spring Boot 提供了 @MessageExceptionHandler
注解用于统一处理异常。
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
✅ 该方法会处理所有未被捕获的异常。
你也可以为特定异常类型编写单独的处理器:
@MessageExceptionHandler
public Mono<MarketData> handleStockNotFoundException(StockNotFoundException e) {
return Mono.just(MarketData.fromException(e));
}
⚠️ 异常处理返回值类型应与交互模型一致,比如 Mono<T>
或 Flux<T>
。
8. 总结
本文介绍了使用 Spring Boot 快速构建 RSocket 应用的方法,并演示了以下几种交互模型:
交互模型 | 说明 | 客户端方法 | 服务端返回类型 |
---|---|---|---|
Request/Response | 客户端请求,服务端返回一次响应 | retrieveMono() | Mono |
Fire-and-Forget | 客户端发送请求,不等待响应 | send() | Mono |
Request Stream | 客户端请求,服务端返回多个响应 | retrieveFlux() | Flux |
Spring Boot 对 RSocket 的封装非常简洁,只需少量代码即可实现高性能的响应式通信。
完整示例代码请参考:GitHub 仓库(模拟地址,可根据实际项目替换)
✅ 推荐在微服务、实时通信、IoT 等场景中尝试 RSocket,体验其优于 HTTP 的通信能力。