1. 概述

本文将深入探讨 Spring Integration 框架中的事务支持机制。作为构建企业级集成应用的核心组件之一,Spring Integration 提供了强大的事务管理能力,尤其在涉及多资源协调时显得尤为重要。

我们不仅会覆盖基础的事务同步场景,还会剖析如何将非事务性资源(如文件系统)与事务流程进行协调,避免“踩坑”。

2. 消息流中的事务

Spring 从早期版本就开始支持资源与事务的同步,尤其擅长协调多个事务管理器之间的行为。例如,你可以轻松实现 JMS 提交JDBC 提交 的原子性——要么都成功,要么都回滚。

但在实际的消息流(Message Flow)中,事务需求往往更复杂:不仅要处理多种事务性资源(如数据库、消息队列),还可能需要联动非事务性资源(如文件操作、HTTP 调用)。

消息流的触发方式通常分为两类:

2.1. 由用户进程触发的消息流

这类流程通常由外部主动发起,比如:

  • 调用一个消息网关(Message Gateway)的方法
  • 手动向某个 Channel 发送消息

✅ 这种场景下,无需 Spring Integration 特别配置事务,直接使用 Spring 原生的事务支持即可。消息流会自然继承调用上下文的事务语义。

举个例子,你可以直接在 ServiceActivator 上使用 @Transactional 注解:

@Transactional
public class TxServiceActivator {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void storeTestResult(String testResult) {
        this.jdbcTemplate.update("insert into STUDENT values(?)", testResult);
        log.info("Test result is stored: {}", testResult);
    }
}

只要这个方法被 Spring 管理的 Bean 调用,事务就会生效。配置灵活,控制粒度细,属于“简单粗暴但有效”的做法。

2.2. 由守护进程触发的消息流

这类流程是自动触发的,常见于后台任务,比如:

  • Poller 定时从消息队列拉取消息
  • Scheduler 定时生成消息并启动流程

⚠️ 这类流程没有外部事务上下文,必须显式配置事务边界,否则事务不会自动开启。

Spring Integration 允许你在 Poller 层面声明事务支持,从而为每次消息处理创建独立的事务上下文。这也是本文的重点。

3. Poller 的事务支持

Poller 是 Spring Integration 中最常用的入站适配器组件之一,负责周期性地从数据源(如文件目录、队列)获取数据并推入消息流。

好消息是:Spring Integration 原生支持 Poller 的事务机制。你只需在配置中加入事务拦截器即可。

示例配置如下:

@Bean
@InboundChannelAdapter(value = "someChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> someMessageSource() {
    ...
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

private TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
      .transactionManager(txManager)
      .build();
}

关键点:

  • transactionManager:指定使用的事务管理器(如 DataSourceTransactionManager
  • advice(transactionInterceptor()):将事务拦截器织入 Poller 执行链
  • 可选:通过 transactionSynchronizationFactory 注册事务同步处理器

一旦配置完成,由该 Poller 触发的每一条消息处理流程都会运行在一个事务上下文中

4. 事务边界与线程模型

Spring 的事务是基于线程绑定的(ThreadLocal),这意味着:

✅ 只要整个消息流在同一个线程中执行,事务上下文就会自动传递。

❌ 一旦你跨线程(比如使用异步 Channel、Executor Channel 或手动启动新线程),事务上下文就会断开!

更危险的是:即使事务已经提交,后续流程仍可能抛出异常。但由于事务已结束,无法回滚,导致数据不一致。

如何避免这个问题?

使用 事务性 Channel 来跨越线程边界,例如:

  • JMS Channel(支持分布式事务)
  • JDBC Channel(通过数据库表暂存消息)
  • 其他支持事务的消息中间件

这样可以确保“消息处理成功”和“事务提交”是原子操作。

5. 事务同步:联动非事务资源

这是 Spring Integration 最实用但也最容易被误解的功能之一:事务同步(Transaction Synchronization)

场景举例

你想实现这样一个流程:

  1. Poller 读取一个考试结果文件
  2. 将内容写入数据库
  3. 成功 → 文件重命名为 .PASSED
  4. 失败 → 文件重命名为 .FAILED

文件系统本身是非事务性的,但我们可以通过事务同步机制,让文件操作“看起来”具备事务行为。

⚠️ 注意:这不是让文件系统变事务性,而是把文件操作绑定到事务的提交或回滚阶段

实现步骤

步骤 1:配置带事务的 InboundChannelAdapter

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File("/tmp/input")); // mock 路径
    sourceReader.setFilter(new SimplePatternFileListFilter("*.txt"));
    return sourceReader;
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory())
      .get();
}

步骤 2:定义事务同步工厂

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
      new ExpressionEvaluatingTransactionSynchronizationProcessor();

    SpelExpressionParser spelParser = new SpelExpressionParser();
 
    processor.setAfterCommitExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
 
    processor.setAfterRollbackExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));

    return new DefaultTransactionSynchronizationFactory(processor);
}

这里的关键是:

  • afterCommitExpression:事务提交后执行的 SpEL 表达式
  • afterRollbackExpression:事务回滚后执行的操作
  • payload 指的是当前消息体(即 File 对象)

步骤 3:消息流转与转换

@Bean
public MessageChannel inputChannel() {
    return new DirectChannel();
}
    
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
    return new FileToStringTransformer();
}

将文件内容转为字符串,传递给下游服务。

步骤 4:ServiceActivator 处理业务逻辑

@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {

    jdbcTemplate.update("insert into STUDENT (result) values(?)", payload);

    if (payload.toLowerCase().startsWith("fail")) {
        log.error("Service failure. Test result: {} ", payload);
        throw new RuntimeException("Service failure.");
    }

    log.info("Service success. Test result: {}", payload);
}

如果内容以 fail 开头,则抛出异常,触发事务回滚,最终文件会被重命名为 .FAILED

执行流程图解

[Poller] 
   ↓ (触发)
[读取文件] → [开启事务]
   ↓
[File → String] 
   ↓
[写入数据库] → 成功 → 提交事务 → 触发 afterCommit → 重命名为 .PASSED
             ↘ 失败 → 回滚事务 → 触发 afterRollback → 重命名为 .FAILED

整个流程简洁、可靠,避免了“数据库写入成功但文件没处理”的中间状态。

6. 总结

本文系统讲解了 Spring Integration 中的事务支持机制,重点包括:

  • ✅ 用户触发流程:直接使用 @Transactional 即可
  • ✅ 守护进程触发流程:必须通过 Poller 配置事务拦截器
  • ✅ 事务边界:注意线程切换导致的上下文丢失
  • ✅ 事务同步:通过 TransactionSynchronizationFactory 联动非事务资源(如文件系统)

这些机制在实际项目中非常实用,尤其是在金融、订单、对账等强一致性场景下,能有效防止数据不一致问题。

完整示例代码已托管至 GitHub:
👉 https://github.com/spring-tutorials/spring-integration-transactions


原始标题:Transaction Support in Spring Integration