1. 简介
本文将介绍 RethinkDB——一个专为实时应用设计的开源 NoSQL 数据库。我们将探讨它为应用带来的特性、能做什么以及如何与之交互。
2. 什么是 RethinkDB?
RethinkDB 是一个强调可扩展性和高可用性的开源 NoSQL 数据库。它允许我们存储 JSON 文档,后续可以对这些文档进行查询。我们还能在数据库内执行多表连接操作,并对数据执行 Map-Reduce 函数。
但真正让 RethinkDB 脱颖而出的是其实时流式推送能力。我们可以执行查询,让结果集的变更持续推送到客户端,从而实现数据的实时更新。这意味着当数据发生变化时,应用能立即向用户展示最新状态。
3. 运行和使用 RethinkDB
RethinkDB 是用 C++ 编写的原生应用。预编译包 已支持大多数平台,同时提供官方 Docker 镜像。
安装完成后,直接运行可执行文件即可启动数据库。可通过参数指定数据文件存储路径(不指定则使用默认值),还能配置监听端口,甚至以集群模式运行多台服务器实现扩展和高可用。更多配置详见官方文档。
要在应用中使用数据库,需要通过合适的客户端连接——支持多种语言。本文将使用 Java 客户端。
添加客户端依赖只需一行配置:
<dependency>
<groupId>com.rethinkdb</groupId>
<artifactId>rethinkdb-driver</artifactId>
<version>2.4.4</version>
</dependency>
接下来创建连接:
Connection conn = RethinkDB.r.connection()
.hostname("localhost")
.port(28015)
.connect();
4. 与 RethinkDB 交互
连接建立后,需要掌握基础操作:创建、操作和检索数据。
RethinkDB 所有交互都通过编程接口完成。无需编写自定义查询语言,直接使用标准 Java 和丰富的类型模型构建查询。✅ 编译器能帮我们校验查询有效性,避免运行时才发现问题。
4.1. 操作表
RethinkDB 实例包含多个数据库,每个数据库通过表存储数据。概念上类似 SQL 数据库的表,但 RethinkDB 不强制表结构约束,由应用自行管理。
通过连接创建新表:
r.db("test").tableCreate(tableName).run(conn);
删除表使用 Db.tableDrop()
,列出所有表用 Db.tableList()
:
r.db(dbName).tableCreate(tableName).run(conn);
List<String> tables = r.db(dbName).tableList().run(conn, List.class).first();
assertTrue(tables.contains(tableName));
4.2. 插入数据
有了表之后,需要填充数据。使用 Table.insert()
并提供数据即可。
用 RethinkDB API 构造对象插入:
r.db(DB_NAME).table(tableName)
.insert(r.hashMap().with("name", "Baeldung"))
.run(conn);
或直接使用 Java 集合:
r.db(DB_NAME).table(tableName)
.insert(Map.of("name", "Baeldung"))
.run(conn);
插入的数据可以是简单的键值对,也可以是任意复杂的结构,包括嵌套对象、数组等:
r.db(DB_NAME).table(tableName)
.insert(
r.hashMap()
.with("name", "Baeldung")
.with("articles", r.array(
r.hashMap()
.with("id", "article1")
.with("name", "String Interpolation in Java")
.with("url", "https://www.baeldung.com/java-string-interpolation"),
r.hashMap()
.with("id", "article2")
.with("name", "Access HTTPS REST Service Using Spring RestTemplate")
.with("url", "https://www.baeldung.com/spring-resttemplate-secure-https-service"))
)
).run(conn);
每条插入的记录都有唯一 ID——要么由记录中的 "id" 字段指定,要么由数据库自动生成。
4.3. 检索数据
数据库存有数据后,需要通过查询获取结果。最简单的查询是直接获取表数据:
Result<Map> results = r.db(DB_NAME).table(tableName).run(conn, Map.class);
结果对象提供多种访问方式,包括直接作为迭代器使用:
for (Map result : results) {
// 处理结果
}
还能转换为 List
或 Stream
(包括并行流),像普通 Java 集合一样操作。
若只需部分结果,可在查询时添加过滤器。通过 Java Lambda 指定过滤条件:
Result<Map> results = r.db(DB_NAME)
.table(tableName)
.filter(r -> r.g("name").eq("String Interpolation in Java"))
.run(conn, Map.class);
过滤器会逐行匹配,仅返回符合条件的记录。
已知 ID 时,可直接获取单条记录:
Result<Map> results = r.db(DB_NAME).table(tableName).get(id).run(conn, Map.class);
4.4. 更新和删除数据
无法修改数据的数据库用途有限。RethinkDB 提供 update()
方法,可链式调用在查询语句末尾,对匹配的记录应用更新。
更新是补丁操作而非整体替换,只需指定要修改的字段:
r.db(DB_NAME).table(tableName).update(r.hashMap().with("site", "Baeldung")).run(conn);
与查询类似,可通过过滤器精确选择要更新的记录。注意:过滤器必须先于 update()
应用,因为它们实际是筛选待更新记录的查询条件:
r.db(DB_NAME).table(tableName)
.filter(r -> r.g("name").eq("String Interpolation in Java"))
.update(r.hashMap().with("category", "java"))
.run(conn);
删除记录同样简单,将 update()
替换为 delete()
即可:
r.db(DB_NAME).table(tableName)
.filter(r -> r.g("name").eq("String Interpolation in Java"))
.delete()
.run(conn);
5. 实时更新
目前展示的都是基础操作,其他数据库也能实现。RethinkDB 的独门绝技是无需轮询即可获取数据实时更新。执行查询时保持游标开放,数据库会在数据变更时主动推送:
Result<Map> cursor = r.db(DB_NAME).table(tableName).changes().run(conn, Map.class);
cursor.stream().forEach(record -> System.out.println("Record: " + record));
这对实时应用(如股票价格、游戏比分)极其有用。
执行此类查询返回的游标与之前相同,但添加 changes()
后,游标不再返回现有记录,而是持续推送匹配记录的变更流(包括插入、更新和删除)。
由于游标无界,迭代操作(for
循环或 Stream
)会持续进行。⚠️ 切勿尝试收集到 List,因为数据流永无止境。
游标返回的记录包含变更前后的值。通过以下逻辑判断操作类型:
- 插入:无旧值
- 删除:无新值
- 更新:新旧值均存在
与其他查询一样,获取变更时也能应用过滤器。游标将只推送符合过滤条件的记录,即使插入操作在查询执行后才发生:
Result<Map> cursor = r.db(DB_NAME).table(tableName)
.filter(r -> r.g("index").eq(5))
.changes()
.run(conn, Map.class);
6. 总结
本文简要介绍了 RethinkDB 数据库引擎,展示了如何用它完成传统数据库任务,以及如何利用其自动推送变更的独特特性。这只是快速概览,系统功能远不止于此,不妨亲自上手试试?
本文所有代码示例可在 GitHub 获取。