1. 简介
在本教程中,我们将探讨如何使用 MongoDB 的 Tailable Cursor(可拖尾游标)来实现一个无限数据流。结合 Spring Data MongoDB,我们可以轻松构建实时监听日志、消息、股票行情等持续更新的数据流应用。
2. 什么是 Tailable Cursor
当我们执行一个查询时,MongoDB 会创建一个 Cursor(游标)来返回匹配的文档。默认情况下,当客户端读取完所有结果后,这个游标就会被自动关闭,因此它是一个 有限的数据流。
但如果我们使用 MongoDB 的 Capped Collection(固定大小集合)配合 Tailable Cursor,就可以实现一个 无限数据流:
✅ 游标在读取完当前所有数据后仍保持打开
✅ 当有新数据插入集合时,自动推送至客户端
✅ 非常适合用于监听日志、事件流、实时消息等场景
Spring Data MongoDB 提供了对 Tailable Cursor 的支持,包括响应式和阻塞式两种方式。
3. 环境准备
为了演示 Tailable Cursor 的使用,我们构建一个日志计数器应用。假设有多个服务将日志集中写入 MongoDB 的 Capped Collection。
3.1 实体类
@Document
public class Log {
private @Id String id;
private String service;
private LogLevel level;
private String message;
}
3.2 创建 Capped Collection
Capped Collection 是 MongoDB 中一种固定大小的集合,按插入顺序存储文档。我们通过 MongoOperations.createCollection
创建:
db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
.capped(true)
.sizeInBytes(1024)
.maxDocuments(5));
sizeInBytes
:集合最大字节数,必填maxDocuments
:最大文档数,超出后自动删除旧数据
3.3 Maven 依赖
使用 Spring Boot 的响应式 MongoDB starter:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
4. 使用响应式 API 实现 Tailable Cursor
推荐使用 Spring Data MongoDB 的响应式 API 来操作 Tailable Cursor。
示例:监听 WARN 日志
private Disposable subscription;
public WarnLogsCounter(ReactiveMongoOperations template) {
Flux<Log> stream = template.tail(
query(where("level").is(LogLevel.WARN)),
Log.class);
subscription = stream.subscribe(logEntity ->
counter.incrementAndGet()
);
}
当有新的 WARN
日志写入集合时,订阅者(lambda 表达式)会自动执行 counter.incrementAndGet()
。
关闭流
public void close() {
this.subscription.dispose();
}
⚠️ 注意:如果初始查询没有返回任何结果,Tailable Cursor 会立即关闭,后续新数据也不会触发通知。这是 MongoDB 的一个已知限制。因此,务必在创建游标前确保集合中有匹配数据。
5. 使用响应式 Repository 实现 Tailable Cursor
Spring Data 提供了基于注解的响应式 Repository 支持,可以更简洁地定义 Tailable Cursor 查询方法。
定义 Repository 接口
public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
@Tailable
Flux<Log> findByLevel(LogLevel level);
}
示例:监听 INFO 日志
private Disposable subscription;
public InfoLogsCounter(LogsRepository repository) {
Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
this.subscription = stream.subscribe(logEntity ->
counter.incrementAndGet()
);
}
同样,需要手动关闭流:
public void close() {
this.subscription.dispose();
}
6. 使用 MessageListener 实现 Tailable Cursor(阻塞方式)
如果你无法使用响应式 API,也可以使用 Spring 的 MessageListenerContainer
来实现 Tailable Cursor。
示例:监听 ERROR 日志
private String collectionName;
private MessageListenerContainer container;
private AtomicInteger counter = new AtomicInteger();
public ErrorLogsCounter(MongoTemplate mongoTemplate, String collectionName) {
this.collectionName = collectionName;
this.container = new DefaultMessageListenerContainer(mongoTemplate);
container.start();
TailableCursorRequest<Log> request = getTailableCursorRequest();
container.register(request, Log.class);
}
private TailableCursorRequest<Log> getTailableCursorRequest() {
MessageListener<Document, Log> listener = message ->
counter.incrementAndGet();
return TailableCursorRequest.builder()
.collection(collectionName)
.filter(query(where("level").is(LogLevel.ERROR)))
.publishTo(listener)
.build();
}
MessageListenerContainer
:管理 Tailable Cursor 的生命周期TailableCursorRequest
:构建监听请求,定义查询条件和回调逻辑
⚠️ 同样需要注意:初始查询必须有匹配数据,否则游标会立即关闭。
关闭容器
public void close() {
container.stop();
}
7. 总结
通过 MongoDB 的 Capped Collection 和 Tailable Cursor,我们可以实现一个持续监听数据库更新的无限数据流。Spring Data MongoDB 提供了两种方式支持 Tailable Cursor:
- ✅ 响应式 API(推荐):使用
ReactiveMongoOperations.tail()
或@Tailable
注解 - ✅ 阻塞式 API:使用
MessageListenerContainer
和TailableCursorRequest
无论哪种方式,都需要注意:
- ✅ 集合必须是 Capped Collection
- ⚠️ 初始查询必须有匹配数据,否则游标会关闭
- ✅ 使用完后记得关闭流或容器,避免资源泄漏
完整示例代码已托管在 GitHub。