1. 概述
本文将探讨数据库操作与消息传递之间保持数据一致性的挑战。首先分析问题本质,然后通过实现事务性发件箱模式解决核心问题。
接着引入Eventuate Tram框架,它负责将待发布消息写入发件箱表。最后通过独立Docker容器运行Eventuate CDC服务,监控发件箱变更并通过Kafka发布消息。
2. 何时需要事务消息?
就像我们依赖数据库事务保证数据操作的原子性一样,有时也需要原子性地向消息代理发布消息。典型场景:将数据保存到数据库和向消息代理发布消息需要作为单一原子操作执行。
虽然看似简单,但存在隐藏挑战。通过一个简单用例说明:保存Comment实体到数据库,同时向baeldung.comment.added Kafka主题发布事件。
❌ 天真做法是在事务块内发布消息。若使用Spring Data JPA和KafkaTemplate,领域服务可能这样写:
@Service
class CommentService {
private final CommentRepository comments;
private final KafkaTemplate<Long, CommentAddedEvent> kafkaTemplate;
// constructor
@Transactional
public Long save(Comment comment) {
Comment saved = this.comments.save(comment);
log.info("Comment created: {}", saved);
CommentAddedEvent commentAdded = new CommentAddedEvent(saved.getId(), saved.getArticleSlug());
kafkaTemplate.send("baeldung.comment.added", saved.getId(), commentAdded);
}
}
⚠️ 这种方式在数据库提交前就发布了Kafka消息。即使事务提交失败回滚,消息仍可能被发送。
反之,若移除*@Transactional*注解,Kafka发布失败时Spring不会回滚数据库插入。两种方案都会导致系统间数据不一致。
3. 事务性发件箱模式
通过事务性发件箱模式确保系统最终一致性。核心思想:在数据变更的同一事务内,将消息保存到专用数据库表(即"发件箱")。
随后由独立进程读取发件箱,将消息发布到消息代理,并更新/删除/标记记录为已发布:
类似问题也会出现在发布事件和更新发件箱表时:
- 除非事件成功发布,否则不应更新发件箱(避免事件丢失)
- 若事件已发送但数据库更新失败,系统会重试导致重复事件
✅ 整体实现"至少一次"投递,优先保证可靠性而非避免重复。
4. 演示应用概览
本文使用Spring Boot应用管理博客文章评论(类似Baeldung)。用户通过POST请求向*/api/articles/{slug}/comments*接口添加评论:
curl --location "http://localhost:8080/api/articles/oop-best-practices/comments" \
--header "Content-Type: application/json" \
--data "{
\"articleAuthor\": \"Andrey the Author\",
\"text\": \"Great article!\",
\"commentAuthor\": \"Richard the Reader\"
}"
快速测试可使用src/rest/resources下的post-comment.bat脚本。
当Comment实体保存到数据库时,系统会发布包含评论ID和文章slug的Kafka消息到baeldung.comment.added主题。
本地环境通过Docker启动PostgreSQL、Kafka和Eventuate CDC服务容器,使用src/test/resources下的eventuate-docker-compose.yml配置。Spring Boot应用通过eventuate配置文件启动:
完整实践可参考集成测试EventuateTramLiveTest。
5. Eventuate Tram框架
Eventuate是支持CQRS、事件溯源和事务Saga等微服务核心模式的Java平台。其组件Eventuate Tram通过事务性发件箱模式实现可靠的服务间通信。
集成Eventuate Tram确保Kafka消息至少一次投递:
添加依赖到pom.xml:
<dependency> <groupId>io.eventuate.tram.core</groupId> <artifactId>eventuate-tram-spring-jdbc-kafka</artifactId> <version>0.36.0-RELEASE</version> </dependency> <dependency> <groupId>io.eventuate.tram.core</groupId> <artifactId>eventuate-tram-spring-events</artifactId> <version>0.36.0-RELEASE</version> </dependency>
导入配置类:
@Configuration @Import({ TramEventsPublisherConfiguration.class, TramMessageProducerJdbcConfiguration.class }) class EventuateConfig { }
修改CommentAddedEvent实现DomainEvent接口:
record CommentAddedEvent(Long id, String articleSlug) implements DomainEvent { }
重构领域服务,使用DomainEventPublisher替代直接Kafka发布: ```java @Service class CommentService {
private final CommentRepository comments; private final DomainEventPublisher domainEvents;
// constructor
@Transactional public Long save(Comment comment) {
Comment saved = this.comments.save(comment); log.info("Comment created: {}", saved); CommentAddedEvent commentAdded = new CommentAddedEvent(saved.getId(), saved.getArticleSlug()); domainEvents.publish( "baeldung.comment.added", saved.getId(), singletonList(commentAdded) ); return saved.getId();
}
}
**效果:每次持久化*Comment*实体时,会在同一事务内向*eventuate.message*表插入事件记录。**
验证数据库:
```plaintext
mydb=# select * from comment;
id | article_slug | comment_author | text
----+-------------------+--------------------+------------------
1 | oop-best-practices | Richard the Reader | Great article!
(1 row)
查询eventuate.message表(假设CDC服务未启动):
mydb=# select id, destination, published from eventuate.message;
id | destination | published
--------------------------------------+----------------------------+-----------
0000019713d8ffe4-e86a640584cf0000 | baeldung.comment.added | 0
(1 row)
6. Eventuate CDC服务
变更数据捕获(CDC)技术用于检测数据库变更(插入/更新/删除)并传递给其他系统。Eventuate CDC服务正是捕获发件箱表变更并发布到消息代理的组件。
当前支持的消息代理包括:
- Apache Kafka
- ActiveMQ
- RabbitMQ
- Redis
数据库支持:
- ✅ MySQL:通过binlog协议高效跟踪事务日志
- ✅ PostgreSQL:使用WAL(预写日志)
- ⚠️ 其他JDBC数据库:回退到低效轮询模式
启动CDC服务后重新测试,eventuate.messages表记录将被标记为已发布:
mydb=# select id, destination, published from eventuate.message;
id | destination | published
--------------------------------------+----------------------------+-----------
0000019713d8ffe4-e86a640584cf0000 | baeldung.comment.added | 1
(1 row)
使用kafka-console-consumer.sh验证消息发布:
{
"payload": "{ \"id\": 1, \"articleSlug\": \"oop-best-practices\" }",
"headers": {
"PARTITION_ID": "1",
"event-aggregate-type": "baeldung.comment.added",
"DATE": "Tue, 27 May 2025 22:24:37 GMT",
"event-aggregate-id": "1",
"event-type": "com.baeldung.eventuate.tram.domain.CommentAddedEvent",
"DESTINATION": "baeldung.comment.added",
"ID": "0000019713d8ffe4-e86a640584cf0000"
}
}
7. 总结
本文深入探讨了事务消息的复杂性:
- 从原子性执行数据库操作和发布领域事件的挑战出发
- 揭示隐藏难点并展示事务性发件箱模式的解决方案
- 使用Eventuate Tram框架实现该模式
- 结合Eventuate CDC服务(通过变更数据捕获监控发件箱表发送Kafka消息)
- 最终实现系统最终一致性和至少一次投递保证
本文代码可在GitHub获取。