1. 概述

本文将介绍如何使用 WebSockets 实现从服务端向浏览器定时推送消息。虽然 Server Sent Events (SSE) 也是一种可选方案,但我们不会在本文中讨论。

Spring 提供了多种任务调度方式。首先我们会讲解最常用的 @Scheduled 注解方式;接着演示如何结合 Project Reactor 提供的 Flux::interval 方法实现响应式定时推送。后者常用于 WebFlux 应用,也可作为独立库集成到任意 Java 项目中。

此外,还有更复杂的调度机制,比如 Quartz 调度器,但本文不做深入探讨。

2. 一个简单的聊天应用

上一篇文章中,我们已经通过 WebSocket 构建了一个基础聊天应用。现在我们要为它添加一项新功能:聊天机器人(chatbot) —— 即服务端组件定时向客户端推送消息。

2.1. Maven 依赖配置

我们先来设置必要的 Maven 依赖项。以下是 pom.xml 中需要包含的内容:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
    <groupId>com.github.javafaker</groupId>
    <artifactId>javafaker</artifactId>
    <version>1.0.2</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

2.2. 使用 JavaFaker 生成消息内容

我们将借助 JavaFaker 库来生成聊天机器人发送的消息。这个库通常用于生成测试数据。这里我们会在聊天室中加入一个名为 “Chuck Norris” 的机器人用户。

示例代码如下:

Faker faker = new Faker();
ChuckNorris chuckNorris = faker.chuckNorris();
String messageFromChuck = chuckNorris.fact();

Faker 提供了多个数据生成器工厂方法,我们使用的是 ChuckNorris 生成器。调用 chuckNorris.fact() 可以随机返回一条预定义的语句。

2.3. 数据模型

聊天应用使用一个简单的 POJO 来封装消息:

public class OutputMessage {

    private String from;
    private String text;
    private String time;

   // 标准构造函数、getter/setter、equals 和 hashCode 省略
}

下面是创建一条聊天消息的示例:

OutputMessage message = new OutputMessage(
  "Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));

2.4. 客户端实现

客户端是一个简单的 HTML 页面,使用 SockJS 客户端STOMP 协议进行通信。

以下是客户端订阅主题的代码片段:

<html>
<head>
    <script src="./js/sockjs-0.3.4.js"></script>
    <script src="./js/stomp.js"></script>
    <script type="text/javascript">
        // ...
        stompClient = Stomp.over(socket);
    
        stompClient.connect({}, function(frame) {
            // ...
            stompClient.subscribe('/topic/pushmessages', function(messageOutput) {
                showMessageOutput(JSON.parse(messageOutput.body));
            });
        });
        // ...
    </script>
</head>
<!-- ... -->
</html>

这段代码首先创建了一个基于 SockJS 的 STOMP 客户端,并订阅了 /topic/pushmessages 主题。该主题作为服务器与客户端之间的通信通道。

在我们的示例仓库中,这段代码位于 webapp/bots.html 文件中。本地运行时可通过 http://localhost:8080/bots.html 访问。部署时请根据实际情况调整主机和端口。

2.5. 服务端配置

我们在之前的 Spring WebSocket 配置 基础上稍作修改:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chatwithbots");
        registry.addEndpoint("/chatwithbots").withSockJS();
    }
}

✅ **要推送消息,我们使用 Spring 提供的工具类 SimpMessagingTemplate**。默认情况下,Spring Boot 会自动将其注册为 Bean。当 AbstractMessageBrokerConfiguration 在类路径中时,该 Bean 就会被自动注入。

我们可以将其注入任意 Spring 组件中,然后使用如下方式推送消息:

simpMessagingTemplate.convertAndSend("/topic/pushmessages", 
  new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));

如前所述,客户端已订阅了该主题,因此可以实时接收并处理这些消息。

3. 定时推送消息的实现方式

在 Spring 生态中,我们可以选择不同的任务调度方式。如果使用 Spring MVC,推荐使用简单易用的 @Scheduled 注解;如果采用 Spring WebFlux,则可以使用 Project Reactor 的 Flux::interval 方法。下面分别给出两个示例。

3.1. 配置 Faker Bean

我们将 JavaFaker 的 Chuck Norris 生成器配置为一个 Spring Bean,以便后续注入使用:

@Configuration
class AppConfig {

    @Bean
    public ChuckNorris chuckNorris() {
        return (new Faker()).chuckNorris();
    }
}

3.2. 使用 @Scheduled 实现定时推送

我们定义一个定时任务类,每隔一段时间发送一条消息:

@Service
public class ScheduledPushMessages {

    @Scheduled(fixedRate = 5000)
    public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
        String time = new SimpleDateFormat("HH:mm").format(new Date());
        simpMessagingTemplate.convertAndSend("/topic/pushmessages", 
          new OutputMessage("Chuck Norris (@Scheduled)", chuckNorris().fact(), time));
    }
    
}

✅ 在 sendMessage 方法上加上 @Scheduled(fixedRate = 5000) 表示每 5 秒执行一次。方法参数中的 simpMessagingTemplatechuckNorris 是通过 Spring 上下文注入的。

3.3. 使用 Flux::interval() 实现响应式推送

⚠️ 如果你使用的是 WebFlux,那么可以利用 Project Reactor 的 Flux::interval 方法,它能以指定的时间间隔不断发出 Long 类型的事件流。

我们继续沿用上面的例子,目标是每隔 5 秒推送一条 Chuck Norris 语录。为此我们需要实现 InitializingBean 接口,在应用启动后订阅 Flux 流:

@Service
public class ReactiveScheduledPushMessages implements InitializingBean {

    private SimpMessagingTemplate simpMessagingTemplate;

    private ChuckNorris chuckNorris;

    @Autowired
    public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
        this.simpMessagingTemplate = simpMessagingTemplate;
        this.chuckNorris = chuckNorris;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Flux.interval(Duration.ofSeconds(5L))
            // 丢弃传入的 Long 值,替换为 OutputMessage 对象
            .map((n) -> new OutputMessage("Chuck Norris (Flux::interval)", 
                              chuckNorris.fact(), 
                              new SimpleDateFormat("HH:mm").format(new Date()))) 
            .subscribe(message -> simpMessagingTemplate.convertAndSend("/topic/pushmessages", message));
    }
}

在这个例子中,我们使用构造函数注入了所需的 Bean。调度逻辑写在 afterPropertiesSet() 方法中,该方法会在 Bean 初始化完成后立即执行。

interval 操作符每隔 5 秒发出一个 Long 值,随后通过 map 将其转换为消息对象,最后通过 subscribe 触发推送行为。

4. 总结

在这篇文章中,我们展示了如何使用 SimpMessagingTemplate 工具类轻松实现 WebSocket 消息推送,并介绍了两种常见的定时任务实现方式:

  • ✅ 使用 @Scheduled 进行传统定时调度(适用于 Spring MVC)
  • ✅ 使用 Flux::interval 实现响应式定时推送(适用于 WebFlux)

正如你所见,这两种方式都能很好地满足定时推送的需求,选择哪种取决于你的项目架构和偏好。

源码示例可在 GitHub 仓库 获取。


原始标题:Scheduled WebSocket Push with Spring Boot | Baeldung