1. 概述

本教程是使用 Java 操作 Apache Cassandra 数据库的入门指南。我们将解释核心概念,并提供一个完整示例,展示从 Java 连接并操作这个 NoSQL 数据库的基本步骤。

2. Cassandra 简介

Cassandra 是一个可扩展的 NoSQL 数据库,具有持续可用性(无单点故障),并能以卓越性能处理海量数据。它采用环形设计而非主从架构:所有节点都是对等的,没有主节点,节点间以对等方式通信。这种设计使得 Cassandra 可以通过增量添加节点实现水平扩展,无需重新配置。

2.1 核心概念

以下是 Cassandra 的关键概念速览:

  • 集群 (Cluster)
    由节点或数据中心组成的环形架构集合。每个集群必须分配名称,供参与节点使用。
  • 键空间 (Keyspace)
    类似关系数据库中的 schema,是 Cassandra 中最外层的数据容器。主要属性包括:
    ✅ 复制因子 (Replication Factor)
    ✅ 副本放置策略 (Replica Placement Strategy)
    ✅ 列族 (Column Families)
  • 列族 (Column Family)
    类似关系数据库中的表。每列族包含多行数据,结构为 Map<RowKey, SortedMap<ColumnKey, ColumnValue>>。通过键可以快速访问相关数据。
  • 列 (Column)
    Cassandra 的基本数据结构,包含列名、值和时间戳。与关系数据库不同,每行的列数和列内容可以灵活变化。

3. 使用 Java 客户端

3.1 Maven 依赖

pom.xml 中添加以下依赖(最新版本查看这里):

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.1.0</version>
</dependency>

测试时需添加 Test Containers 依赖(最新版本查看这里):

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>cassandra</artifactId>
    <version>1.15.3</version>
</dependency>

3.2 连接 Cassandra

通过构建 Cluster 对象连接数据库。需提供节点地址作为接触点,未指定端口时默认使用 9042。这些设置使驱动能自动发现集群拓扑结构:

public class CassandraConnector {

    private Cluster cluster;
    private Session session;

    public void connect(String node, Integer port) {
        Builder b = Cluster.builder().addContactPoint(node);
        if (port != null) {
            b.withPort(port);
        }
        cluster = b.build();
        session = cluster.connect();
    }

    public Session getSession() {
        return this.session;
    }

    public void close() {
        session.close();
        cluster.close();
    }
}

3.3 创建键空间

创建名为 "library" 的键空间:

public void createKeyspace(
  String keyspaceName, String replicationStrategy, int replicationFactor) {
  StringBuilder sb = 
    new StringBuilder("CREATE KEYSPACE IF NOT EXISTS ")
      .append(keyspaceName).append(" WITH replication = {")
      .append("'class':'").append(replicationStrategy)
      .append("','replication_factor':").append(replicationFactor)
      .append("};");
        
    String query = sb.toString();
    session.execute(query);
}

除键空间名称外,还需定义两个关键参数:

  • 复制因子:数据副本数量
  • 复制策略:副本在环中的分布方式

通过复制,Cassandra 在多个节点存储数据副本,确保可靠性和容错性。测试键空间创建是否成功:

private KeyspaceRepository schemaRepository;
private Session session;

@Before
public void connect() {
    CassandraConnector client = new CassandraConnector();
    client.connect("127.0.0.1", 9142);
    this.session = client.getSession();
    schemaRepository = new KeyspaceRepository(session);
}
@Test
public void whenCreatingAKeyspace_thenCreated() {
    String keyspaceName = "library";
    schemaRepository.createKeyspace(keyspaceName, "SimpleStrategy", 1);

    ResultSet result = 
      session.execute("SELECT * FROM system_schema.keyspaces;");

    List<String> matchedKeyspaces = result.all()
      .stream()
      .filter(r -> r.getString(0).equals(keyspaceName.toLowerCase()))
      .map(r -> r.getString(0))
      .collect(Collectors.toList());

    assertEquals(matchedKeyspaces.size(), 1);
    assertTrue(matchedKeyspaces.get(0).equals(keyspaceName.toLowerCase()));
}

3.4 创建列族

向现有键空间添加第一个列族 "books":

private static final String TABLE_NAME = "books";
private Session session;

public void createTable() {
    StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
      .append(TABLE_NAME).append("(")
      .append("id uuid PRIMARY KEY, ")
      .append("title text,")
      .append("subject text);");

    String query = sb.toString();
    session.execute(query);
}

验证列族创建的测试代码:

private BookRepository bookRepository;
private Session session;

@Before
public void connect() {
    CassandraConnector client = new CassandraConnector();
    client.connect("127.0.0.1", 9142);
    this.session = client.getSession();
    bookRepository = new BookRepository(session);
}
@Test
public void whenCreatingATable_thenCreatedCorrectly() {
    bookRepository.deleteTable(BOOKS);
    bookRepository.createTable();
    ResultSet result = session.execute("SELECT * FROM " + KEYSPACE_NAME + "." + BOOKS + ";");

     // 收集所有列名
    List columnNames = result.getColumnDefinitions().asList().stream().map(cl -> cl.getName()).collect(Collectors.toList());
    assertEquals(columnNames.size(), 4);
    assertTrue(columnNames.contains("id"));
    assertTrue(columnNames.contains("title"));
    assertTrue(columnNames.contains("author"));
    assertTrue(columnNames.contains("subject"));
}

3.5 修改列族

如果需要为书籍添加出版社字段,可通过以下代码修改表结构:

public void alterTablebooks(String columnName, String columnType) {
    StringBuilder sb = new StringBuilder("ALTER TABLE ")
      .append(TABLE_NAME).append(" ADD ")
      .append(columnName).append(" ")
      .append(columnType).append(";");

    String query = sb.toString();
    session.execute(query);
}

验证新列 "publisher" 是否添加成功:

@Test
public void whenAlteringTable_thenAddedColumnExists() {
    bookRepository.deleteTable(BOOKS);
    bookRepository.createTable();

    bookRepository.alterTablebooks("publisher", "text");

    ResultSet result = session.execute("SELECT * FROM " + KEYSPACE_NAME + "." + BOOKS + ";");

    boolean columnExists = result.getColumnDefinitions().asList().stream().anyMatch(cl -> cl.getName().equals("publisher"));
    assertTrue(columnExists);
}

3.6 插入数据

现在可以向 "books" 表添加数据:

public void insertbookByTitle(Book book) {
    StringBuilder sb = new StringBuilder("INSERT INTO ")
      .append(TABLE_NAME_BY_TITLE).append("(id, title) ")
      .append("VALUES (").append(book.getId())
      .append(", '").append(book.getTitle()).append("');");

    String query = sb.toString();
    session.execute(query);
}

测试新行是否成功插入:

@Test
public void whenAddingANewBook_thenBookExists() {
    bookRepository.deleteTable(BOOKS_BY_TITLE);
    bookRepository.createTableBooksByTitle();

    String title = "Effective Java";
    String author = "Joshua Bloch";
    Book book = new Book(UUIDs.timeBased(), title, author, "Programming");
    bookRepository.insertbookByTitle(book);

    Book savedBook = bookRepository.selectByTitle(title);
    assertEquals(book.getTitle(), savedBook.getTitle());
}

测试中使用了另一种表创建方式(booksByTitle):

public void createTableBooksByTitle() {
    StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
      .append("booksByTitle").append("(")
      .append("id uuid, ")
      .append("title text,")
      .append("PRIMARY KEY (title, id));");

    String query = sb.toString();
    session.execute(query);
}

⚠️ Cassandra 最佳实践:采用"一表一查询"模式。不同查询需要不同表。
本例中,为支持按标题查询,我们创建了复合主键表(title 为分区键,id 为集群键)。这会导致数据冗余,但能优化读取性能。

查询当前表数据:

public List<Book> selectAll() {
    StringBuilder sb = 
      new StringBuilder("SELECT * FROM ").append(TABLE_NAME);

    String query = sb.toString();
    ResultSet rs = session.execute(query);

    List<Book> books = new ArrayList<Book>();

    rs.forEach(r -> {
        books.add(new Book(
          r.getUUID("id"), 
          r.getString("title"),  
          r.getString("subject")));
    });
    return books;
}

验证查询结果:

@Test
public void whenSelectingAll_thenReturnAllRecords() {
    bookRepository.deleteTable(BOOKS);
    bookRepository.createTable();

    Book book = new Book(UUIDs.timeBased(), "Effective Java", "Joshua Bloch", "Programming");
    bookRepository.insertbook(book);

    book = new Book(UUIDs.timeBased(), "Clean Code", "Robert C. Martin", "Programming");
    bookRepository.insertbook(book);

    List<Books> books = bookRepository.selectAll();

    assertEquals(2, books.size());
    assertTrue(books.stream().anyMatch(b -> b.getTitle().equals("Effective Java")));
    assertTrue(books.stream().anyMatch(b -> b.getTitle().equals("Clean Code")));
}

数据一致性问题:我们创建了 books 和 booksByTitle 两张表,但只向 booksByTitle 插入数据,导致数据不一致。解决方案是使用批量查询(batch):

public void insertBookBatch(Book book) {
    StringBuilder sb = new StringBuilder("BEGIN BATCH ")
      .append("INSERT INTO ").append(TABLE_NAME)
      .append("(id, title, subject) ")
      .append("VALUES (").append(book.getId()).append(", '")
      .append(book.getTitle()).append("', '")
      .append(book.getSubject()).append("');")
      .append("INSERT INTO ")
      .append(TABLE_NAME_BY_TITLE).append("(id, title) ")
      .append("VALUES (").append(book.getId()).append(", '")
      .append(book.getTitle()).append("');")
      .append("APPLY BATCH;");

    String query = sb.toString();
    session.execute(query);
}

测试批量操作结果:

@Test
public void whenAddingANewBookBatch_ThenBookAddedInAllTables() {
    // 创建 books 表
    bookRepository.deleteTable(BOOKS);
    bookRepository.createTable();

    // 创建 booksByTitle 表
    bookRepository.deleteTable(BOOKS_BY_TITLE);
    bookRepository.createTableBooksByTitle();

    String title = "Effective Java";
    String author = "Joshua Bloch";
    Book book = new Book(UUIDs.timeBased(), title, author, "Programming");
    bookRepository.insertBookBatch(book);

    List<Book> books = bookRepository.selectAll();

    assertEquals(1, books.size());
    assertTrue(books.stream().anyMatch(b -> b.getTitle().equals("Effective Java")));

    List<Book> booksByTitle = bookRepository.selectAllBookByTitle();

    assertEquals(1, booksByTitle.size());
    assertTrue(booksByTitle.stream().anyMatch(b -> b.getTitle().equals("Effective Java")));
}

替代方案:Cassandra 3.0+ 支持"物化视图"(Materialized Views),可替代批量查询。详细示例见官方文档

3.7 删除列族

删除表的代码:

public void deleteTable() {
    StringBuilder sb = 
      new StringBuilder("DROP TABLE IF EXISTS ").append(TABLE_NAME);

    String query = sb.toString();
    session.execute(query);
}

查询不存在的表会抛出 InvalidQueryException

@Test(expected = InvalidQueryException.class)
public void whenDeletingATable_thenUnconfiguredTable() {
    bookRepository.createTable();
    bookRepository.deleteTable("books");
       
    session.execute("SELECT * FROM " + KEYSPACE_NAME + ".books;");
}

3.8 删除键空间

删除键空间的代码:

public void deleteKeyspace(String keyspaceName) {
    StringBuilder sb = 
      new StringBuilder("DROP KEYSPACE ").append(keyspaceName);

    String query = sb.toString();
    session.execute(query);
}

验证键空间是否删除:

@Test
public void whenDeletingAKeyspace_thenDoesNotExist() {
    String keyspaceName = "testBaeldungKeyspace";

    schemaRepository.createKeyspace(keyspaceName, "SimpleStrategy", 1);
    schemaRepository.deleteKeyspace(keyspaceName);

    ResultSet result = session.execute("SELECT * FROM system_schema.keyspaces;");
    boolean isKeyspaceCreated = result.all().stream().anyMatch(r -> r.getString(0).equals(keyspaceName.toLowerCase()));
    assertFalse(isKeyspaceCreated);
}

4. 总结

本教程介绍了使用 Java 连接和操作 Cassandra 数据库的基本步骤,并讨论了关键概念,助你快速上手。完整实现代码可在 GitHub 项目 中查看。


原始标题:A Guide to Cassandra with Java