1. 简介

在本教程中,我们将学习如何使用 Spring Data Cassandra 的响应式数据访问功能。

这是 Spring Data Cassandra 系列文章的第三篇。在这篇文章中,我们将通过 REST API 暴露一个 Cassandra 数据库接口。

如果你对 Spring Data Cassandra 还不熟悉,可以先阅读系列中的第一篇第二篇文章。

2. Maven 依赖

✅ Spring Data Cassandra 支持 Project Reactor 和 RxJava 的响应式类型。
在本教程中,我们主要使用 Project Reactor 提供的 FluxMono 类型来演示。

首先,添加以下依赖:

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-cassandra</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

你可以在这里查看最新的 spring-data-cassandra 版本

接下来,我们还需要添加 Spring Web 模块的支持,用于暴露 REST 接口:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

3. 实现我们的应用

由于我们需要持久化数据,首先定义实体类:

@Table
public class Employee {
    @PrimaryKey
    private int id;
    private String name;
    private String address;
    private String email;
    private int age;

    // 构造方法、getter/setter 省略
}

然后创建一个继承自 ReactiveCassandraRepository 的接口:

public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {
    @AllowFiltering
    Flux<Employee> findByAgeGreaterThan(int age);
}

⚠️ 注意:这个接口是开启响应式支持的关键。

3.1. REST 控制器实现 CRUD 操作

为了演示,我们编写一个简单的 REST 控制器,暴露一些基本的查询操作:

@RestController
@RequestMapping("employee")
public class EmployeeController {

    @Autowired
    EmployeeService employeeService;

    @PostConstruct
    public void saveEmployees() {
        List<Employee> employees = new ArrayList<>();
        employees.add(new Employee(123, "John Doe", "Delaware", "john.doe@example.com", 31));
        employees.add(new Employee(324, "Adam Smith", "North Carolina", "adam.smith@example.com", 43));
        employees.add(new Employee(355, "Kevin Dunner", "Virginia", "kevin.dunner@example.com", 24));
        employees.add(new Employee(643, "Mike Lauren", "New York", "mike.lauren@example.com", 41));
        employeeService.initializeEmployees(employees);
    }

    @GetMapping("/list")
    public Flux<Employee> getAllEmployees() {
        return employeeService.getAllEmployees();
    }

    @GetMapping("/{id}")
    public Mono<Employee> getEmployeeById(@PathVariable int id) {
        return employeeService.getEmployeeById(id);
    }

    @GetMapping("/filterByAge/{age}")
    public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {
        return employeeService.getEmployeesFilterByAge(age);
    }
}

对应的 Service 层也很简单粗暴:

@Service
public class EmployeeService {

    @Autowired
    EmployeeRepository employeeRepository;

    public void initializeEmployees(List<Employee> employees) {
        Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);
        savedEmployees.subscribe();
    }

    public Flux<Employee> getAllEmployees() {
        return employeeRepository.findAll();
    }

    public Flux<Employee> getEmployeesFilterByAge(int age) {
        return employeeRepository.findByAgeGreaterThan(age);
    }

    public Mono<Employee> getEmployeeById(int id) {
        return employeeRepository.findById(id);
    }
}

3.2. 数据库配置

接着,在 application.properties 中指定连接 Cassandra 所需的信息:

spring.data.cassandra.keyspace-name=practice
spring.data.cassandra.port=9042
spring.data.cassandra.local-datacenter=datacenter1

📌 注意:datacenter1 是默认的数据中心名称。

4. 测试接口

4.1. 手动测试

先获取所有员工信息:

curl localhost:8080/employee/list

响应结果如下:

[
    {
        "id": 324,
        "name": "Adam Smith",
        "address": "North Carolina",
        "email": "adam.smith@example.com",
        "age": 43
    },
    {
        "id": 123,
        "name": "John Doe",
        "address": "Delaware",
        "email": "john.doe@example.com",
        "age": 31
    },
    {
        "id": 355,
        "name": "Kevin Dunner",
        "address": "Virginia",
        "email": "kevin.dunner@example.com",
        "age": 24
    },
    {
        "id": 643,
        "name": "Mike Lauren",
        "address": "New York",
        "email": "mike.lauren@example.com",
        "age": 41
    }
]

再根据 ID 查询特定员工:

curl localhost:8080/employee/643

返回结果:

{
    "id": 643,
    "name": "Mike Lauren",
    "address": "New York",
    "email": "mike.lauren@example.com",
    "age": 41
}

最后测试按年龄过滤:

curl localhost:8080/employee/filterByAge/35

返回年龄大于 35 的员工:

[
    {
        "id": 324,
        "name": "Adam Smith",
        "address": "North Carolina",
        "email": "adam.smith@example.com",
        "age": 43
    },
    {
        "id": 643,
        "name": "Mike Lauren",
        "address": "New York",
        "email": "mike.lauren@example.com",
        "age": 41
    }
]

4.2. 集成测试

此外,我们也可以写一个集成测试验证功能是否正常:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveEmployeeRepositoryIntegrationTest {

    @Autowired
    EmployeeRepository repository;

    @Before
    public void setUp() {
        Flux<Employee> deleteAndInsert = repository.deleteAll()
          .thenMany(repository.saveAll(Flux.just(
            new Employee(111, "John Doe", "Delaware", "john.doe@example.com", 31),
            new Employee(222, "Adam Smith", "North Carolina", "adam.smith@example.com", 43),
            new Employee(333, "Kevin Dunner", "Virginia", "kevin.dunner@example.com", 24),
            new Employee(444, "Mike Lauren", "New York", "mike.lauren@example.com", 41))));

        StepVerifier
          .create(deleteAndInsert)
          .expectNextCount(4)
          .verifyComplete();
    }

    @Test
    public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
        Mono<Long> saveAndCount = repository.count()
          .doOnNext(System.out::println)
          .thenMany(repository
            .saveAll(Flux.just(
            new Employee(325, "Kim Jones", "Florida", "kim.jones@example.com", 42),
            new Employee(654, "Tom Moody", "New Hampshire", "tom.moody@example.com", 44))))
          .last()
          .flatMap(v -> repository.count())
          .doOnNext(System.out::println);

        StepVerifier
          .create(saveAndCount)
          .expectNext(6L)
          .verifyComplete();
    }

    @Test
    public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {
        StepVerifier
          .create(repository.findByAgeGreaterThan(35))
          .expectNextCount(2)
          .verifyComplete();
    }
}

5. 总结

总结一下,我们学习了如何使用 Spring Data Cassandra 构建响应式(非阻塞)应用,并通过 REST 接口对外提供服务。

一如既往,本文示例代码可以在 GitHub 上找到:源码地址


原始标题:Spring Data with Reactive Cassandra