1. 简介

在本教程中,我们将探讨如何使用 MongoDB 的 Tailable Cursor(可拖尾游标)来实现一个无限数据流。结合 Spring Data MongoDB,我们可以轻松构建实时监听日志、消息、股票行情等持续更新的数据流应用。

2. 什么是 Tailable Cursor

当我们执行一个查询时,MongoDB 会创建一个 Cursor(游标)来返回匹配的文档。默认情况下,当客户端读取完所有结果后,这个游标就会被自动关闭,因此它是一个 有限的数据流

但如果我们使用 MongoDB 的 Capped Collection(固定大小集合)配合 Tailable Cursor,就可以实现一个 无限数据流

✅ 游标在读取完当前所有数据后仍保持打开
✅ 当有新数据插入集合时,自动推送至客户端
✅ 非常适合用于监听日志、事件流、实时消息等场景

Spring Data MongoDB 提供了对 Tailable Cursor 的支持,包括响应式和阻塞式两种方式。

3. 环境准备

为了演示 Tailable Cursor 的使用,我们构建一个日志计数器应用。假设有多个服务将日志集中写入 MongoDB 的 Capped Collection。

3.1 实体类

@Document
public class Log {
    private @Id String id;
    private String service;
    private LogLevel level;
    private String message;
}

3.2 创建 Capped Collection

Capped Collection 是 MongoDB 中一种固定大小的集合,按插入顺序存储文档。我们通过 MongoOperations.createCollection 创建:

db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
  .capped(true)
  .sizeInBytes(1024)
  .maxDocuments(5));
  • sizeInBytes:集合最大字节数,必填
  • maxDocuments:最大文档数,超出后自动删除旧数据

3.3 Maven 依赖

使用 Spring Boot 的响应式 MongoDB starter:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

4. 使用响应式 API 实现 Tailable Cursor

推荐使用 Spring Data MongoDB 的响应式 API 来操作 Tailable Cursor。

示例:监听 WARN 日志

private Disposable subscription;

public WarnLogsCounter(ReactiveMongoOperations template) {
    Flux<Log> stream = template.tail(
      query(where("level").is(LogLevel.WARN)), 
      Log.class);
    subscription = stream.subscribe(logEntity -> 
      counter.incrementAndGet()
    );
}

当有新的 WARN 日志写入集合时,订阅者(lambda 表达式)会自动执行 counter.incrementAndGet()

关闭流

public void close() {
    this.subscription.dispose();
}

⚠️ 注意:如果初始查询没有返回任何结果,Tailable Cursor 会立即关闭,后续新数据也不会触发通知。这是 MongoDB 的一个已知限制。因此,务必在创建游标前确保集合中有匹配数据。

5. 使用响应式 Repository 实现 Tailable Cursor

Spring Data 提供了基于注解的响应式 Repository 支持,可以更简洁地定义 Tailable Cursor 查询方法。

定义 Repository 接口

public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
    @Tailable
    Flux<Log> findByLevel(LogLevel level);
}

示例:监听 INFO 日志

private Disposable subscription;

public InfoLogsCounter(LogsRepository repository) {
    Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
    this.subscription = stream.subscribe(logEntity -> 
      counter.incrementAndGet()
    );
}

同样,需要手动关闭流:

public void close() {
    this.subscription.dispose();
}

6. 使用 MessageListener 实现 Tailable Cursor(阻塞方式)

如果你无法使用响应式 API,也可以使用 Spring 的 MessageListenerContainer 来实现 Tailable Cursor。

示例:监听 ERROR 日志

private String collectionName;
private MessageListenerContainer container;
private AtomicInteger counter = new AtomicInteger();

public ErrorLogsCounter(MongoTemplate mongoTemplate, String collectionName) {
    this.collectionName = collectionName;
    this.container = new DefaultMessageListenerContainer(mongoTemplate);

    container.start();
    TailableCursorRequest<Log> request = getTailableCursorRequest();
    container.register(request, Log.class);
}

private TailableCursorRequest<Log> getTailableCursorRequest() {
    MessageListener<Document, Log> listener = message -> 
      counter.incrementAndGet();

    return TailableCursorRequest.builder()
      .collection(collectionName)
      .filter(query(where("level").is(LogLevel.ERROR)))
      .publishTo(listener)
      .build();
}
  • MessageListenerContainer:管理 Tailable Cursor 的生命周期
  • TailableCursorRequest:构建监听请求,定义查询条件和回调逻辑

⚠️ 同样需要注意:初始查询必须有匹配数据,否则游标会立即关闭。

关闭容器

public void close() {
    container.stop();
}

7. 总结

通过 MongoDB 的 Capped Collection 和 Tailable Cursor,我们可以实现一个持续监听数据库更新的无限数据流。Spring Data MongoDB 提供了两种方式支持 Tailable Cursor:

  • 响应式 API(推荐):使用 ReactiveMongoOperations.tail()@Tailable 注解
  • 阻塞式 API:使用 MessageListenerContainerTailableCursorRequest

无论哪种方式,都需要注意:

  • ✅ 集合必须是 Capped Collection
  • ⚠️ 初始查询必须有匹配数据,否则游标会关闭
  • ✅ 使用完后记得关闭流或容器,避免资源泄漏

完整示例代码已托管在 GitHub


原始标题:Spring Data MongoDB Tailable Cursors