1. 概述

本文将探讨在 @Transactional 事务块内发布消息的必要性及其性能挑战(如数据库连接时间过长)。我们将利用 Spring Modulith 的特性监听 Spring 应用事件,并自动将其发布到 Kafka 主题。

2. 事务操作与消息代理

本文代码示例假设我们正在实现 Baeldung 网站的 Article 保存功能:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;

    // 构造函数

    @Transactional
    public void createArticle(Article article) {
        validateArticle(article);
        article = addArticleTags(article);
        // ... 其他业务逻辑
        
        articleRepository.save(article);
    }
}

此外,我们需要通知系统其他部分新文章的创建。其他模块或服务将据此生成报告或向网站读者发送新闻邮件。

最直接的方式是注入一个能发布事件的依赖。本例使用 KafkaOperationsbaeldung.articles.published 主题发送消息,并以文章的 slug() 作为消息键:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // 构造函数

    @Transactional
    public void createArticle(Article article) {
        // ... 业务逻辑
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        messageProducer.send(
          "baeldung.articles.published",
          article.slug(),
          new ArticlePublishedEvent(article.slug(), article.title())
        ).join();
    }
}

⚠️ 但这种方案存在明显缺陷

  1. 设计层面:领域服务与消息生产者强耦合,违反了清洁架构的基本原则
  2. 性能层面:所有操作在 @Transactional 方法内执行,数据库连接会一直保持到消息成功发布
  3. 可靠性问题
    • 消息发布失败会导致事务回滚
    • 即使消息已发布,事务仍可能回滚(造成数据不一致)

3. 使用 Spring 事件实现依赖倒置

我们可以利用 Spring 事件 改进设计。核心思路是避免在领域服务中直接操作 Kafka:

@Service
public class Baeldung {
    private final ApplicationEventPublisher applicationEvents;
    private final ArticleRepository articleRepository;

    // 构造函数

    @Transactional
    public void createArticle(Article article) {
        // ... 业务逻辑
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        applicationEvents.publishEvent(
          new ArticlePublishedEvent(article.slug(), article.title()));
    }
}

同时创建专门的 Kafka 生产者组件(基础设施层),监听 ArticlePublishedEvent 并委托给底层 KafkaOperations

@Component
class ArticlePublishedKafkaProducer {
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // 构造函数 

    @EventListener
    public void publish(ArticlePublishedEvent article) {
        Assert.notNull(article.slug(), "Article Slug must not be null!");
        messageProducer.send("baeldung.articles.published", article.slug(), article);
    }
}

改进效果

  • 基础设施组件依赖领域事件,实现依赖倒置
  • 其他模块可直接监听应用事件,无需修改领域服务
  • 解耦业务逻辑与消息发布机制

publish() 方法仍在原事务中执行,操作失败仍会导致事务回滚。

4. 原子操作 vs 非原子操作

现在分析性能影响:是否需要消息代理通信失败时回滚事务? 这取决于业务场景。

若不需要原子性保证,应尽快释放数据库连接并异步发布事件。模拟测试(创建无 slug 的文章导致发布失败):

@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
    var article = new Article(null, "Spring Boot 入门", "张三", "<p> Spring Boot 是 [...] </p>");

    baeldung.createArticle(article);

    assertThat(repository.findAll())
      .hasSize(1).first()
      .extracting(Article::title, Article::author)
      .containsExactly("Spring Boot 入门", "张三");
}

当前测试会失败,因为 ArticlePublishedKafkaProducer 抛出异常导致事务回滚。解决方案:将事件监听器改为异步:

@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
    Assert.notNull(event.slug(), "Article Slug must not be null!");
    messageProducer.send("baeldung.articles.published", event);
}

改进效果

  • 异常仅被记录,不影响主事务
  • 数据库连接及时释放
  • 文章成功保存,消息发布失败被隔离

5. 使用 Spring Modulith 实现事件外部化

我们通过两步优化解决了原始代码的设计和性能问题:

  1. 使用 Spring 事件实现依赖倒置
  2. 通过 @TransactionalEventListener + @Async 实现异步发布

Spring Modulith 进一步简化了该模式。首先添加 Maven 依赖:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-events-api</artifactId>
    <version>1.1.3</version>
</dependency>

该模块可监听应用事件并自动外部化到多种消息系统。以 Kafka 为例,添加集成依赖:

<dependency> 
    <groupId>org.springframework.modulith</groupId> 
    <artifactId>spring-modulith-events-kafka</artifactId> 
    <version>1.1.3</version>
    <scope>runtime</scope> 
</dependency>

更新 ArticlePublishedEvent 并添加 @Externalized 注解,指定 Kafka 主题和消息键(使用 SpEL 表达式):

@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}

6. 事件发布注册表

前文提到数据持久化与消息发布仍存在可靠性问题。Spring Modulith 通过事务性发件箱模式解决此问题,确保系统最终一致性。

6.1 事件发布日志

首先添加持久化技术对应的 starter(以 JPA + PostgreSQL 为例):

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jpa</artifactId>
    <version>1.1.2</version>
</dependency>

启用 event_publication 表自动创建(存储外部化事件数据):

spring.modulith:
  events.jdbc-schema-initialization.enabled: true

使用 Testcontainers 启动 PostgreSQL 容器后,可通过终端验证表结构:

psql -U test_user -d test_db
\d

TestcontainersDesktop

查询事件数据:

testcontainer_events-1

关键发现

  • 第一行:正常流程的事件
  • 第二行:故意构造的无效事件(无 slug),completion_date 为空表示发布失败

6.2 事件重新提交

启用应用重启时自动重试未完成事件

spring.modulith:
  republish-outstanding-events-on-restart: true

编程方式重试失败事件(使用 IncompleteEventPublications Bean):

@Component
class EventPublications {
    private final IncompleteEventPublications incompleteEvents;
    private final CompletedEventPublications completeEvents;

    // 构造函数

    void resubmitUnpublishedEvents() {
        incompleteEvents.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(60));
    }
}

清理已发布事件(使用 CompletedEventPublications):

void clearPublishedEvents() {
    completeEvents.deletePublicationsOlderThan(Duration.ofSeconds(60));
}

7. 事件外部化配置

@Externalized 注解的 SpEL 表达式过于复杂,或需要分离主题配置/事件模型时,可使用 EventExternalizationConfiguration

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.slug(), it.title())
      )
      .build();
}

优势

  • 声明式定义路由和映射
  • 支持多种事件类型(示例添加 WeeklySummaryPublishedEvent):
    @Bean
    EventExternalizationConfiguration eventExternalizationConfiguration() {
      return EventExternalizationConfiguration.externalizing()
        .select(EventExternalizationConfiguration.annotatedAsExternalized())
        .route(
          ArticlePublishedEvent.class,
          it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
        )
        .mapping(
          ArticlePublishedEvent.class,
          it -> new PostPublishedKafkaEvent(it.slug(), it.title())
        )
        .route(
          WeeklySummaryPublishedEvent.class,
          it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.handle())
        )
        .mapping(
          WeeklySummaryPublishedEvent.class,
          it -> new PostPublishedKafkaEvent(it.handle(), it.heading())
        )
        .build();
    }
    

此时事件类只需保留 @Externalized 注解(无需配置值):

@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}

@Externalized
public record WeeklySummaryPublishedEvent(String handle, String heading) {
}

8. 总结

本文探讨了事务块内发布消息的场景及其性能影响(数据库连接长时间占用)。通过 Spring Modulith 实现了:

  1. 监听 Spring 应用事件
  2. 自动发布到 Kafka
  3. 异步外部化事件
  4. 及时释放数据库连接
  5. 通过事务性发件箱确保最终一致性

完整源码可在 GitHub 获取。


原始标题:Event Externalization with Spring Modulith