1. 概述

RSocket 是一个基于响应式流(Reactive Streams)语义的应用层通信协议,可以作为 HTTP 的替代方案。它支持多种交互模型,如请求/响应(Request/Response)、单向通信(Fire-and-Forget)、请求流(Request Stream)、以及双向流(Channel),非常适合构建高性能、低延迟的分布式系统。

本文将介绍如何使用 Spring Boot 快速搭建基于 RSocket 的应用,并演示其支持的几种主要交互模型。


2. 添加依赖

首先,在你的 Spring Boot 项目中引入 RSocket 支持,只需添加如下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

该依赖会自动引入以下核心库:

  • rsocket-core:RSocket 协议的核心实现
  • rsocket-transport-netty:Netty 提供的传输层支持

✅ 使用 Spring Boot Starter 可以简化配置,避免手动集成 RSocket 的复杂性。


3. 示例应用

我们以一个模拟的“交易者”应用为例,演示 RSocket 的各种交互方式。该应用包含一个服务端和一个客户端。

3.1 服务端配置

Spring Boot 默认会为我们自动配置 RSocket 服务端。你可以通过 application.properties 文件修改默认配置,例如:

spring.rsocket.server.port=7000

这会将 RSocket 服务监听在本地 7000 端口上。

✅ 你还可以配置 SSL、传输协议(TCP/WebSocket)等高级选项。

3.2 客户端配置

虽然 Spring Boot 也自动配置了 RSocket 客户端相关组件,但我们需要手动定义 RSocketRequester Bean:

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocketRequester getRSocketRequester() {
        return RSocketRequester.builder()
            .rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
            .dataMimeType(MimeTypeUtils.APPLICATION_JSON)
            .tcp("localhost", 7000);
    }
}

⚠️ 注意:

  • 使用 .tcp() 表示使用 TCP 协议连接服务端
  • reconnect() 配置了重连策略,增强客户端的健壮性

4. 请求/响应(Request/Response)

这是最常见的一种交互模型,客户端发送请求,服务端返回一次响应。

4.1 服务端处理

使用 @MessageMapping 注解来定义 RSocket 接口:

@Controller
public class MarketDataRSocketController {

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest request) {
        return marketDataRepository.getOne(request.getStock());
    }
}

@MessageMapping 类似于 Spring MVC 中的 @RequestMapping,用于定义 RSocket 的路由。

4.2 客户端调用

客户端使用 RSocketRequester 发起请求:

@RestController
public class MarketDataRestController {

    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping("/current/{stock}")
    public Publisher<MarketData> current(@PathVariable String stock) {
        return rSocketRequester
            .route("currentMarketData")
            .data(new MarketDataRequest(stock))
            .retrieveMono(MarketData.class);
    }
}

✅ 使用 retrieveMono() 表示期望一个单次响应。


5. 单向通信(Fire-and-Forget)

客户端发送请求,不等待服务端响应,适用于日志上报、事件通知等场景。

5.1 服务端处理

@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
    marketDataRepository.add(marketData);
    return Mono.empty();
}

✅ 返回 Mono<Void> 表示不返回响应。

5.2 客户端调用

@GetMapping("/collect")
public Publisher<Void> collect() {
    return rSocketRequester
        .route("collectMarketData")
        .data(getMarketData())
        .send();
}

✅ 使用 send() 方法表示不期望响应。


6. 请求流(Request Stream)

客户端发送请求,服务端持续返回多个响应,适用于数据流推送场景。

6.1 服务端处理

@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest request) {
    return marketDataRepository.getAll(request.getStock());
}

✅ 返回 Flux<MarketData> 表示多个响应。

6.2 客户端调用

@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable String stock) {
    return rSocketRequester
        .route("feedMarketData")
        .data(new MarketDataRequest(stock))
        .retrieveFlux(MarketData.class);
}

✅ 使用 retrieveFlux() 表示期望多个响应。

⚠️ 注意设置 produces = MediaType.TEXT_EVENT_STREAM_VALUE,以便浏览器等客户端能正确解析流式响应。


7. 异常处理

Spring Boot 提供了 @MessageExceptionHandler 注解用于统一处理异常。

@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
    return Mono.just(MarketData.fromException(e));
}

✅ 该方法会处理所有未被捕获的异常。

你也可以为特定异常类型编写单独的处理器:

@MessageExceptionHandler
public Mono<MarketData> handleStockNotFoundException(StockNotFoundException e) {
    return Mono.just(MarketData.fromException(e));
}

⚠️ 异常处理返回值类型应与交互模型一致,比如 Mono<T>Flux<T>


8. 总结

本文介绍了使用 Spring Boot 快速构建 RSocket 应用的方法,并演示了以下几种交互模型:

交互模型 说明 客户端方法 服务端返回类型
Request/Response 客户端请求,服务端返回一次响应 retrieveMono() Mono
Fire-and-Forget 客户端发送请求,不等待响应 send() Mono
Request Stream 客户端请求,服务端返回多个响应 retrieveFlux() Flux

Spring Boot 对 RSocket 的封装非常简洁,只需少量代码即可实现高性能的响应式通信。

完整示例代码请参考:GitHub 仓库(模拟地址,可根据实际项目替换)

✅ 推荐在微服务、实时通信、IoT 等场景中尝试 RSocket,体验其优于 HTTP 的通信能力。


原始标题:RSocket Using Spring Boot | Baeldung