1. 概述
本文将探讨在 @Transactional
事务块内发布消息的必要性及其性能挑战(如数据库连接时间过长)。我们将利用 Spring Modulith 的特性监听 Spring 应用事件,并自动将其发布到 Kafka 主题。
2. 事务操作与消息代理
本文代码示例假设我们正在实现 Baeldung 网站的 Article
保存功能:
@Service
class Baeldung {
private final ArticleRepository articleRepository;
// 构造函数
@Transactional
public void createArticle(Article article) {
validateArticle(article);
article = addArticleTags(article);
// ... 其他业务逻辑
articleRepository.save(article);
}
}
此外,我们需要通知系统其他部分新文章的创建。其他模块或服务将据此生成报告或向网站读者发送新闻邮件。
最直接的方式是注入一个能发布事件的依赖。本例使用 KafkaOperations
向 baeldung.articles.published
主题发送消息,并以文章的 slug()
作为消息键:
@Service
class Baeldung {
private final ArticleRepository articleRepository;
private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
// 构造函数
@Transactional
public void createArticle(Article article) {
// ... 业务逻辑
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
messageProducer.send(
"baeldung.articles.published",
article.slug(),
new ArticlePublishedEvent(article.slug(), article.title())
).join();
}
}
⚠️ 但这种方案存在明显缺陷:
- 设计层面:领域服务与消息生产者强耦合,违反了清洁架构的基本原则
- 性能层面:所有操作在
@Transactional
方法内执行,数据库连接会一直保持到消息成功发布 - 可靠性问题:
- 消息发布失败会导致事务回滚
- 即使消息已发布,事务仍可能回滚(造成数据不一致)
3. 使用 Spring 事件实现依赖倒置
我们可以利用 Spring 事件 改进设计。核心思路是避免在领域服务中直接操作 Kafka:
@Service
public class Baeldung {
private final ApplicationEventPublisher applicationEvents;
private final ArticleRepository articleRepository;
// 构造函数
@Transactional
public void createArticle(Article article) {
// ... 业务逻辑
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
applicationEvents.publishEvent(
new ArticlePublishedEvent(article.slug(), article.title()));
}
}
同时创建专门的 Kafka 生产者组件(基础设施层),监听 ArticlePublishedEvent
并委托给底层 KafkaOperations
:
@Component
class ArticlePublishedKafkaProducer {
private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
// 构造函数
@EventListener
public void publish(ArticlePublishedEvent article) {
Assert.notNull(article.slug(), "Article Slug must not be null!");
messageProducer.send("baeldung.articles.published", article.slug(), article);
}
}
✅ 改进效果:
- 基础设施组件依赖领域事件,实现依赖倒置
- 其他模块可直接监听应用事件,无需修改领域服务
- 解耦业务逻辑与消息发布机制
但 publish()
方法仍在原事务中执行,操作失败仍会导致事务回滚。
4. 原子操作 vs 非原子操作
现在分析性能影响:是否需要消息代理通信失败时回滚事务? 这取决于业务场景。
若不需要原子性保证,应尽快释放数据库连接并异步发布事件。模拟测试(创建无 slug 的文章导致发布失败):
@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
var article = new Article(null, "Spring Boot 入门", "张三", "<p> Spring Boot 是 [...] </p>");
baeldung.createArticle(article);
assertThat(repository.findAll())
.hasSize(1).first()
.extracting(Article::title, Article::author)
.containsExactly("Spring Boot 入门", "张三");
}
当前测试会失败,因为 ArticlePublishedKafkaProducer
抛出异常导致事务回滚。解决方案:将事件监听器改为异步:
@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
Assert.notNull(event.slug(), "Article Slug must not be null!");
messageProducer.send("baeldung.articles.published", event);
}
✅ 改进效果:
- 异常仅被记录,不影响主事务
- 数据库连接及时释放
- 文章成功保存,消息发布失败被隔离
5. 使用 Spring Modulith 实现事件外部化
我们通过两步优化解决了原始代码的设计和性能问题:
- 使用 Spring 事件实现依赖倒置
- 通过
@TransactionalEventListener
+@Async
实现异步发布
Spring Modulith 进一步简化了该模式。首先添加 Maven 依赖:
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>1.1.3</version>
</dependency>
该模块可监听应用事件并自动外部化到多种消息系统。以 Kafka 为例,添加集成依赖:
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-kafka</artifactId>
<version>1.1.3</version>
<scope>runtime</scope>
</dependency>
更新 ArticlePublishedEvent
并添加 @Externalized
注解,指定 Kafka 主题和消息键(使用 SpEL 表达式):
@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}
6. 事件发布注册表
前文提到数据持久化与消息发布仍存在可靠性问题。Spring Modulith 通过事务性发件箱模式解决此问题,确保系统最终一致性。
6.1 事件发布日志
首先添加持久化技术对应的 starter(以 JPA + PostgreSQL 为例):
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-starter-jpa</artifactId>
<version>1.1.2</version>
</dependency>
启用 event_publication
表自动创建(存储外部化事件数据):
spring.modulith:
events.jdbc-schema-initialization.enabled: true
使用 Testcontainers 启动 PostgreSQL 容器后,可通过终端验证表结构:
psql -U test_user -d test_db
\d
查询事件数据:
✅ 关键发现:
- 第一行:正常流程的事件
- 第二行:故意构造的无效事件(无 slug),
completion_date
为空表示发布失败
6.2 事件重新提交
启用应用重启时自动重试未完成事件:
spring.modulith:
republish-outstanding-events-on-restart: true
编程方式重试失败事件(使用 IncompleteEventPublications
Bean):
@Component
class EventPublications {
private final IncompleteEventPublications incompleteEvents;
private final CompletedEventPublications completeEvents;
// 构造函数
void resubmitUnpublishedEvents() {
incompleteEvents.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(60));
}
}
清理已发布事件(使用 CompletedEventPublications
):
void clearPublishedEvents() {
completeEvents.deletePublicationsOlderThan(Duration.ofSeconds(60));
}
7. 事件外部化配置
当 @Externalized
注解的 SpEL 表达式过于复杂,或需要分离主题配置/事件模型时,可使用 EventExternalizationConfiguration
:
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing()
.select(EventExternalizationConfiguration.annotatedAsExternalized())
.route(
ArticlePublishedEvent.class,
it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
)
.mapping(
ArticlePublishedEvent.class,
it -> new PostPublishedKafkaEvent(it.slug(), it.title())
)
.build();
}
✅ 优势:
- 声明式定义路由和映射
- 支持多种事件类型(示例添加
WeeklySummaryPublishedEvent
):@Bean EventExternalizationConfiguration eventExternalizationConfiguration() { return EventExternalizationConfiguration.externalizing() .select(EventExternalizationConfiguration.annotatedAsExternalized()) .route( ArticlePublishedEvent.class, it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug()) ) .mapping( ArticlePublishedEvent.class, it -> new PostPublishedKafkaEvent(it.slug(), it.title()) ) .route( WeeklySummaryPublishedEvent.class, it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.handle()) ) .mapping( WeeklySummaryPublishedEvent.class, it -> new PostPublishedKafkaEvent(it.handle(), it.heading()) ) .build(); }
此时事件类只需保留 @Externalized
注解(无需配置值):
@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}
@Externalized
public record WeeklySummaryPublishedEvent(String handle, String heading) {
}
8. 总结
本文探讨了事务块内发布消息的场景及其性能影响(数据库连接长时间占用)。通过 Spring Modulith 实现了:
- 监听 Spring 应用事件
- 自动发布到 Kafka
- 异步外部化事件
- 及时释放数据库连接
- 通过事务性发件箱确保最终一致性
完整源码可在 GitHub 获取。