1. 概述
本文将介绍如何使用 WebSockets 实现从服务端向浏览器定时推送消息。虽然 Server Sent Events (SSE) 也是一种可选方案,但我们不会在本文中讨论。
Spring 提供了多种任务调度方式。首先我们会讲解最常用的 @Scheduled 注解方式;接着演示如何结合 Project Reactor 提供的 Flux::interval 方法实现响应式定时推送。后者常用于 WebFlux 应用,也可作为独立库集成到任意 Java 项目中。
此外,还有更复杂的调度机制,比如 Quartz 调度器,但本文不做深入探讨。
2. 一个简单的聊天应用
在上一篇文章中,我们已经通过 WebSocket 构建了一个基础聊天应用。现在我们要为它添加一项新功能:聊天机器人(chatbot) —— 即服务端组件定时向客户端推送消息。
2.1. Maven 依赖配置
我们先来设置必要的 Maven 依赖项。以下是 pom.xml 中需要包含的内容:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
2.2. 使用 JavaFaker 生成消息内容
我们将借助 JavaFaker 库来生成聊天机器人发送的消息。这个库通常用于生成测试数据。这里我们会在聊天室中加入一个名为 “Chuck Norris” 的机器人用户。
示例代码如下:
Faker faker = new Faker();
ChuckNorris chuckNorris = faker.chuckNorris();
String messageFromChuck = chuckNorris.fact();
Faker 提供了多个数据生成器工厂方法,我们使用的是 ChuckNorris 生成器。调用 chuckNorris.fact()
可以随机返回一条预定义的语句。
2.3. 数据模型
聊天应用使用一个简单的 POJO 来封装消息:
public class OutputMessage {
private String from;
private String text;
private String time;
// 标准构造函数、getter/setter、equals 和 hashCode 省略
}
下面是创建一条聊天消息的示例:
OutputMessage message = new OutputMessage(
"Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));
2.4. 客户端实现
客户端是一个简单的 HTML 页面,使用 SockJS 客户端 和 STOMP 协议进行通信。
以下是客户端订阅主题的代码片段:
<html>
<head>
<script src="./js/sockjs-0.3.4.js"></script>
<script src="./js/stomp.js"></script>
<script type="text/javascript">
// ...
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
// ...
stompClient.subscribe('/topic/pushmessages', function(messageOutput) {
showMessageOutput(JSON.parse(messageOutput.body));
});
});
// ...
</script>
</head>
<!-- ... -->
</html>
这段代码首先创建了一个基于 SockJS 的 STOMP 客户端,并订阅了 /topic/pushmessages
主题。该主题作为服务器与客户端之间的通信通道。
在我们的示例仓库中,这段代码位于 webapp/bots.html 文件中。本地运行时可通过 http://localhost:8080/bots.html 访问。部署时请根据实际情况调整主机和端口。
2.5. 服务端配置
我们在之前的 Spring WebSocket 配置 基础上稍作修改:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chatwithbots");
registry.addEndpoint("/chatwithbots").withSockJS();
}
}
✅ **要推送消息,我们使用 Spring 提供的工具类 SimpMessagingTemplate**。默认情况下,Spring Boot 会自动将其注册为 Bean。当 AbstractMessageBrokerConfiguration 在类路径中时,该 Bean 就会被自动注入。
我们可以将其注入任意 Spring 组件中,然后使用如下方式推送消息:
simpMessagingTemplate.convertAndSend("/topic/pushmessages",
new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));
如前所述,客户端已订阅了该主题,因此可以实时接收并处理这些消息。
3. 定时推送消息的实现方式
在 Spring 生态中,我们可以选择不同的任务调度方式。如果使用 Spring MVC,推荐使用简单易用的 @Scheduled 注解;如果采用 Spring WebFlux,则可以使用 Project Reactor 的 Flux::interval 方法。下面分别给出两个示例。
3.1. 配置 Faker Bean
我们将 JavaFaker 的 Chuck Norris 生成器配置为一个 Spring Bean,以便后续注入使用:
@Configuration
class AppConfig {
@Bean
public ChuckNorris chuckNorris() {
return (new Faker()).chuckNorris();
}
}
3.2. 使用 @Scheduled 实现定时推送
我们定义一个定时任务类,每隔一段时间发送一条消息:
@Service
public class ScheduledPushMessages {
@Scheduled(fixedRate = 5000)
public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
String time = new SimpleDateFormat("HH:mm").format(new Date());
simpMessagingTemplate.convertAndSend("/topic/pushmessages",
new OutputMessage("Chuck Norris (@Scheduled)", chuckNorris().fact(), time));
}
}
✅ 在 sendMessage
方法上加上 @Scheduled(fixedRate = 5000)
表示每 5 秒执行一次。方法参数中的 simpMessagingTemplate
和 chuckNorris
是通过 Spring 上下文注入的。
3.3. 使用 Flux::interval() 实现响应式推送
⚠️ 如果你使用的是 WebFlux,那么可以利用 Project Reactor 的 Flux::interval 方法,它能以指定的时间间隔不断发出 Long 类型的事件流。
我们继续沿用上面的例子,目标是每隔 5 秒推送一条 Chuck Norris 语录。为此我们需要实现 InitializingBean 接口,在应用启动后订阅 Flux 流:
@Service
public class ReactiveScheduledPushMessages implements InitializingBean {
private SimpMessagingTemplate simpMessagingTemplate;
private ChuckNorris chuckNorris;
@Autowired
public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
this.simpMessagingTemplate = simpMessagingTemplate;
this.chuckNorris = chuckNorris;
}
@Override
public void afterPropertiesSet() throws Exception {
Flux.interval(Duration.ofSeconds(5L))
// 丢弃传入的 Long 值,替换为 OutputMessage 对象
.map((n) -> new OutputMessage("Chuck Norris (Flux::interval)",
chuckNorris.fact(),
new SimpleDateFormat("HH:mm").format(new Date())))
.subscribe(message -> simpMessagingTemplate.convertAndSend("/topic/pushmessages", message));
}
}
在这个例子中,我们使用构造函数注入了所需的 Bean。调度逻辑写在 afterPropertiesSet()
方法中,该方法会在 Bean 初始化完成后立即执行。
interval 操作符每隔 5 秒发出一个 Long 值,随后通过 map 将其转换为消息对象,最后通过 subscribe 触发推送行为。
4. 总结
在这篇文章中,我们展示了如何使用 SimpMessagingTemplate 工具类轻松实现 WebSocket 消息推送,并介绍了两种常见的定时任务实现方式:
- ✅ 使用
@Scheduled
进行传统定时调度(适用于 Spring MVC) - ✅ 使用
Flux::interval
实现响应式定时推送(适用于 WebFlux)
正如你所见,这两种方式都能很好地满足定时推送的需求,选择哪种取决于你的项目架构和偏好。
源码示例可在 GitHub 仓库 获取。