1. 概述
本教程是使用 Java 操作 Apache Cassandra 数据库的入门指南。我们将解释核心概念,并提供一个完整示例,展示从 Java 连接并操作这个 NoSQL 数据库的基本步骤。
2. Cassandra 简介
Cassandra 是一个可扩展的 NoSQL 数据库,具有持续可用性(无单点故障),并能以卓越性能处理海量数据。它采用环形设计而非主从架构:所有节点都是对等的,没有主节点,节点间以对等方式通信。这种设计使得 Cassandra 可以通过增量添加节点实现水平扩展,无需重新配置。
2.1 核心概念
以下是 Cassandra 的关键概念速览:
- 集群 (Cluster)
由节点或数据中心组成的环形架构集合。每个集群必须分配名称,供参与节点使用。 - 键空间 (Keyspace)
类似关系数据库中的 schema,是 Cassandra 中最外层的数据容器。主要属性包括:
✅ 复制因子 (Replication Factor)
✅ 副本放置策略 (Replica Placement Strategy)
✅ 列族 (Column Families) - 列族 (Column Family)
类似关系数据库中的表。每列族包含多行数据,结构为Map<RowKey, SortedMap<ColumnKey, ColumnValue>>
。通过键可以快速访问相关数据。 - 列 (Column)
Cassandra 的基本数据结构,包含列名、值和时间戳。与关系数据库不同,每行的列数和列内容可以灵活变化。
3. 使用 Java 客户端
3.1 Maven 依赖
在 pom.xml
中添加以下依赖(最新版本查看这里):
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.0</version>
</dependency>
测试时需添加 Test Containers 依赖(最新版本查看这里):
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<version>1.15.3</version>
</dependency>
3.2 连接 Cassandra
通过构建 Cluster
对象连接数据库。需提供节点地址作为接触点,未指定端口时默认使用 9042。这些设置使驱动能自动发现集群拓扑结构:
public class CassandraConnector {
private Cluster cluster;
private Session session;
public void connect(String node, Integer port) {
Builder b = Cluster.builder().addContactPoint(node);
if (port != null) {
b.withPort(port);
}
cluster = b.build();
session = cluster.connect();
}
public Session getSession() {
return this.session;
}
public void close() {
session.close();
cluster.close();
}
}
3.3 创建键空间
创建名为 "library" 的键空间:
public void createKeyspace(
String keyspaceName, String replicationStrategy, int replicationFactor) {
StringBuilder sb =
new StringBuilder("CREATE KEYSPACE IF NOT EXISTS ")
.append(keyspaceName).append(" WITH replication = {")
.append("'class':'").append(replicationStrategy)
.append("','replication_factor':").append(replicationFactor)
.append("};");
String query = sb.toString();
session.execute(query);
}
除键空间名称外,还需定义两个关键参数:
- 复制因子:数据副本数量
- 复制策略:副本在环中的分布方式
通过复制,Cassandra 在多个节点存储数据副本,确保可靠性和容错性。测试键空间创建是否成功:
private KeyspaceRepository schemaRepository;
private Session session;
@Before
public void connect() {
CassandraConnector client = new CassandraConnector();
client.connect("127.0.0.1", 9142);
this.session = client.getSession();
schemaRepository = new KeyspaceRepository(session);
}
@Test
public void whenCreatingAKeyspace_thenCreated() {
String keyspaceName = "library";
schemaRepository.createKeyspace(keyspaceName, "SimpleStrategy", 1);
ResultSet result =
session.execute("SELECT * FROM system_schema.keyspaces;");
List<String> matchedKeyspaces = result.all()
.stream()
.filter(r -> r.getString(0).equals(keyspaceName.toLowerCase()))
.map(r -> r.getString(0))
.collect(Collectors.toList());
assertEquals(matchedKeyspaces.size(), 1);
assertTrue(matchedKeyspaces.get(0).equals(keyspaceName.toLowerCase()));
}
3.4 创建列族
向现有键空间添加第一个列族 "books":
private static final String TABLE_NAME = "books";
private Session session;
public void createTable() {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append(TABLE_NAME).append("(")
.append("id uuid PRIMARY KEY, ")
.append("title text,")
.append("subject text);");
String query = sb.toString();
session.execute(query);
}
验证列族创建的测试代码:
private BookRepository bookRepository;
private Session session;
@Before
public void connect() {
CassandraConnector client = new CassandraConnector();
client.connect("127.0.0.1", 9142);
this.session = client.getSession();
bookRepository = new BookRepository(session);
}
@Test
public void whenCreatingATable_thenCreatedCorrectly() {
bookRepository.deleteTable(BOOKS);
bookRepository.createTable();
ResultSet result = session.execute("SELECT * FROM " + KEYSPACE_NAME + "." + BOOKS + ";");
// 收集所有列名
List columnNames = result.getColumnDefinitions().asList().stream().map(cl -> cl.getName()).collect(Collectors.toList());
assertEquals(columnNames.size(), 4);
assertTrue(columnNames.contains("id"));
assertTrue(columnNames.contains("title"));
assertTrue(columnNames.contains("author"));
assertTrue(columnNames.contains("subject"));
}
3.5 修改列族
如果需要为书籍添加出版社字段,可通过以下代码修改表结构:
public void alterTablebooks(String columnName, String columnType) {
StringBuilder sb = new StringBuilder("ALTER TABLE ")
.append(TABLE_NAME).append(" ADD ")
.append(columnName).append(" ")
.append(columnType).append(";");
String query = sb.toString();
session.execute(query);
}
验证新列 "publisher" 是否添加成功:
@Test
public void whenAlteringTable_thenAddedColumnExists() {
bookRepository.deleteTable(BOOKS);
bookRepository.createTable();
bookRepository.alterTablebooks("publisher", "text");
ResultSet result = session.execute("SELECT * FROM " + KEYSPACE_NAME + "." + BOOKS + ";");
boolean columnExists = result.getColumnDefinitions().asList().stream().anyMatch(cl -> cl.getName().equals("publisher"));
assertTrue(columnExists);
}
3.6 插入数据
现在可以向 "books" 表添加数据:
public void insertbookByTitle(Book book) {
StringBuilder sb = new StringBuilder("INSERT INTO ")
.append(TABLE_NAME_BY_TITLE).append("(id, title) ")
.append("VALUES (").append(book.getId())
.append(", '").append(book.getTitle()).append("');");
String query = sb.toString();
session.execute(query);
}
测试新行是否成功插入:
@Test
public void whenAddingANewBook_thenBookExists() {
bookRepository.deleteTable(BOOKS_BY_TITLE);
bookRepository.createTableBooksByTitle();
String title = "Effective Java";
String author = "Joshua Bloch";
Book book = new Book(UUIDs.timeBased(), title, author, "Programming");
bookRepository.insertbookByTitle(book);
Book savedBook = bookRepository.selectByTitle(title);
assertEquals(book.getTitle(), savedBook.getTitle());
}
测试中使用了另一种表创建方式(booksByTitle):
public void createTableBooksByTitle() {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append("booksByTitle").append("(")
.append("id uuid, ")
.append("title text,")
.append("PRIMARY KEY (title, id));");
String query = sb.toString();
session.execute(query);
}
⚠️ Cassandra 最佳实践:采用"一表一查询"模式。不同查询需要不同表。
本例中,为支持按标题查询,我们创建了复合主键表(title 为分区键,id 为集群键)。这会导致数据冗余,但能优化读取性能。
查询当前表数据:
public List<Book> selectAll() {
StringBuilder sb =
new StringBuilder("SELECT * FROM ").append(TABLE_NAME);
String query = sb.toString();
ResultSet rs = session.execute(query);
List<Book> books = new ArrayList<Book>();
rs.forEach(r -> {
books.add(new Book(
r.getUUID("id"),
r.getString("title"),
r.getString("subject")));
});
return books;
}
验证查询结果:
@Test
public void whenSelectingAll_thenReturnAllRecords() {
bookRepository.deleteTable(BOOKS);
bookRepository.createTable();
Book book = new Book(UUIDs.timeBased(), "Effective Java", "Joshua Bloch", "Programming");
bookRepository.insertbook(book);
book = new Book(UUIDs.timeBased(), "Clean Code", "Robert C. Martin", "Programming");
bookRepository.insertbook(book);
List<Books> books = bookRepository.selectAll();
assertEquals(2, books.size());
assertTrue(books.stream().anyMatch(b -> b.getTitle().equals("Effective Java")));
assertTrue(books.stream().anyMatch(b -> b.getTitle().equals("Clean Code")));
}
数据一致性问题:我们创建了 books 和 booksByTitle 两张表,但只向 booksByTitle 插入数据,导致数据不一致。解决方案是使用批量查询(batch):
public void insertBookBatch(Book book) {
StringBuilder sb = new StringBuilder("BEGIN BATCH ")
.append("INSERT INTO ").append(TABLE_NAME)
.append("(id, title, subject) ")
.append("VALUES (").append(book.getId()).append(", '")
.append(book.getTitle()).append("', '")
.append(book.getSubject()).append("');")
.append("INSERT INTO ")
.append(TABLE_NAME_BY_TITLE).append("(id, title) ")
.append("VALUES (").append(book.getId()).append(", '")
.append(book.getTitle()).append("');")
.append("APPLY BATCH;");
String query = sb.toString();
session.execute(query);
}
测试批量操作结果:
@Test
public void whenAddingANewBookBatch_ThenBookAddedInAllTables() {
// 创建 books 表
bookRepository.deleteTable(BOOKS);
bookRepository.createTable();
// 创建 booksByTitle 表
bookRepository.deleteTable(BOOKS_BY_TITLE);
bookRepository.createTableBooksByTitle();
String title = "Effective Java";
String author = "Joshua Bloch";
Book book = new Book(UUIDs.timeBased(), title, author, "Programming");
bookRepository.insertBookBatch(book);
List<Book> books = bookRepository.selectAll();
assertEquals(1, books.size());
assertTrue(books.stream().anyMatch(b -> b.getTitle().equals("Effective Java")));
List<Book> booksByTitle = bookRepository.selectAllBookByTitle();
assertEquals(1, booksByTitle.size());
assertTrue(booksByTitle.stream().anyMatch(b -> b.getTitle().equals("Effective Java")));
}
替代方案:Cassandra 3.0+ 支持"物化视图"(Materialized Views),可替代批量查询。详细示例见官方文档。
3.7 删除列族
删除表的代码:
public void deleteTable() {
StringBuilder sb =
new StringBuilder("DROP TABLE IF EXISTS ").append(TABLE_NAME);
String query = sb.toString();
session.execute(query);
}
查询不存在的表会抛出 InvalidQueryException
:
@Test(expected = InvalidQueryException.class)
public void whenDeletingATable_thenUnconfiguredTable() {
bookRepository.createTable();
bookRepository.deleteTable("books");
session.execute("SELECT * FROM " + KEYSPACE_NAME + ".books;");
}
3.8 删除键空间
删除键空间的代码:
public void deleteKeyspace(String keyspaceName) {
StringBuilder sb =
new StringBuilder("DROP KEYSPACE ").append(keyspaceName);
String query = sb.toString();
session.execute(query);
}
验证键空间是否删除:
@Test
public void whenDeletingAKeyspace_thenDoesNotExist() {
String keyspaceName = "testBaeldungKeyspace";
schemaRepository.createKeyspace(keyspaceName, "SimpleStrategy", 1);
schemaRepository.deleteKeyspace(keyspaceName);
ResultSet result = session.execute("SELECT * FROM system_schema.keyspaces;");
boolean isKeyspaceCreated = result.all().stream().anyMatch(r -> r.getString(0).equals(keyspaceName.toLowerCase()));
assertFalse(isKeyspaceCreated);
}
4. 总结
本教程介绍了使用 Java 连接和操作 Cassandra 数据库的基本步骤,并讨论了关键概念,助你快速上手。完整实现代码可在 GitHub 项目 中查看。