1. 简介

Atomikos 是一个专为 Java 应用设计的分布式事务管理库。本文将带你深入理解为何以及如何使用 Atomikos,掌握其核心能力。

在过程中,我们会回顾事务的基础概念,并通过实际代码演示如何集成 Atomikos 实现跨数据库的分布式事务。目标是让你在微服务或复杂系统中,也能轻松驾驭数据一致性问题。

2. 事务基础概念

在深入 Atomikos 之前,先巩固一下事务的核心思想。简单来说,事务是一个逻辑上的工作单元,它的执行结果对外部要么全部生效,要么完全不生效(All-or-Nothing)

举个例子:一个典型的零售系统在下单时,需要先扣减库存,再创建订单。这两个操作必须“同生共死”——不能出现库存扣了但订单没建,或者订单建了但库存没扣的情况。

Screenshot-2020-04-18-at-18.23.31

解决这类问题的标准做法,就是把这两个操作包裹在一个事务中。

2.1 本地事务 vs 分布式事务

事务可能涉及多个独立操作,这些操作可以作用于同一个资源,也可以作用于不同资源。这里的“资源”通常指数据库、消息队列等。

  • 本地事务(Local Transaction):所有操作都在同一个资源(如单个数据库)内完成。
  • 分布式事务(Distributed Transaction):操作跨越多个资源,比如两个不同的数据库,甚至跨机器部署。

Screenshot-2020-04-18-at-18.24.53

在我们的例子中,库存和订单表可能在同一数据库,也可能分属两个独立数据库。一旦跨库,就必须用分布式事务来保证一致性。

2.2 XA 规范与 JTA

  • XA 规范:由 Open Group 提出的分布式事务处理标准,全称 eXtended Architecture。它的核心目标是保证跨异构系统的全局事务原子性。
  • 两阶段提交(2PC):XA 的核心协议,通过“准备”和“提交”两个阶段协调所有参与者,确保事务最终一致性。
  • Java 事务 API(JTA):Java EE 提供的标准接口,让 Java 应用能与 XA 兼容的资源(如数据库、消息队列)进行分布式事务交互。JTA 本身是接口规范,需要具体的事务管理器实现。

3. Atomikos 概述

3.1 什么是 Atomikos

JTA 只是接口,要真正运行还需要实现。传统上,应用服务器(如 WebLogic、WildFly)会内置事务管理器。但在 Spring Boot 等轻量级架构中,我们更希望应用自包含,不依赖重型中间件。

Atomikos 就是一个轻量级、可嵌入的 JTA 事务管理器。它让 Java 应用无需依赖应用服务器,也能实现分布式事务,非常适合云原生和微服务场景。

3.2 Atomikos 架构

Atomikos 基于 XA 架构实现,核心是两阶段提交协议。它作为事务协调者(Transaction Manager),管理多个资源(如数据库、消息队列)的事务生命周期。

Screenshot-2020-04-18-at-18.25.56

如上图所示,Atomikos 协调了数据库和消息队列之间的分布式事务。

3.3 产品版本

Atomikos 提供两个版本:

  • TransactionsEssentials:开源版,支持数据库和消息队列的 JTA/XA 事务。适合测试和评估。
  • ExtremeTransactions:商业版,功能更强大,支持 REST 服务等非传统资源,适用于极端事务处理(XTP)场景。

本文将以 TransactionsEssentials 为例进行演示。

4. 环境搭建

Atomikos 的最大优势之一是可嵌入,直接运行在应用的 JVM 中,无需独立部署。

4.1 依赖配置

pom.xml 中引入核心依赖:

<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>6.0.0</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jms</artifactId>
    <version>6.0.0</version>
</dependency>

💡 提示:Atomikos 还支持其他 XA 资源,均可在 Maven Central 找到对应依赖。

4.2 基本配置

Atomikos 支持通过 transactions.properties 文件进行配置(放在 classpath 下):

# 自定义事务日志路径
com.atomikos.icatch.file=/var/log/atomikos/transactions.log
# 事务超时时间(秒)
com.atomikos.icatch.default_jta_timeout=300
# 应用唯一标识
com.atomikos.icatch.tm_unique_name=myAppTM

其他可配置项包括:事务超时、TM 唯一名称、关闭行为等。

4.3 数据库准备

我们构建一个简单的零售系统,包含两个数据库:

  • 库存库(inventory_db)

    CREATE TABLE INVENTORY (
        productId VARCHAR PRIMARY KEY,
        balance INT
    );
    
  • 订单库(order_db)

    CREATE TABLE ORDERS (
        orderId VARCHAR PRIMARY KEY,
        productId VARCHAR,
        amount INT NOT NULL CHECK (amount <= 5)
    );
    

⚠️ 注意:订单表的 CHECK (amount <= 5) 约束将用于后续测试事务回滚。

5. 核心使用

5.1 初始化 UserTransaction

使用 JTA 的 UserTransaction 来控制事务边界:

UserTransactionImp utx = new UserTransactionImp();

5.2 配置数据源(DataSource)

Atomikos 提供两种数据源:

  • AtomikosDataSourceBean:支持 XA 的数据源,适用于支持 XA 的数据库(如 MySQL、PostgreSQL)。
  • AtomikosNonXADataSourceBean:非 XA 数据源,用于不支持 XA 的数据库。Atomikos 会尝试通过“最后资源提交”(Last Resource Commit)策略保证一致性。
// XA 数据源示例
AtomikosDataSourceBean inventoryDataSource = new AtomikosDataSourceBean();
inventoryDataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
inventoryDataSource.setUniqueResourceName("inventoryDB");
inventoryDataSource.setXaProperties(connectionProps); // 设置数据库连接参数

5.3 执行分布式操作

在事务中同时操作两个数据库:

public void placeOrder(String productId, int amount) throws Exception {
    String orderId = UUID.randomUUID().toString();
    boolean rollback = false;
    try {
        utx.begin();
        
        Connection invConn = inventoryDataSource.getConnection();
        Connection ordConn = orderDataSource.getConnection();
        
        Statement s1 = invConn.createStatement();
        String q1 = "UPDATE INVENTORY SET balance = balance - " + amount + 
                   " WHERE productId = '" + productId + "'";
        s1.executeUpdate(q1);
        s1.close();
        
        Statement s2 = ordConn.createStatement();
        String q2 = "INSERT INTO ORDERS VALUES ('" + orderId + "', '" + 
                   productId + "', " + amount + ")";
        s2.executeUpdate(q2); // 若 amount > 5,此处会因 CHECK 约束失败
        s2.close();
        
        invConn.close();
        ordConn.close();
    } catch (Exception e) {
        rollback = true;
        throw e;
    } finally {
        if (!rollback) {
            utx.commit();
        } else {
            utx.rollback();
        }
    }
}

✅ 关键点:两个 Connection 都在同一个 utx 事务中,任何一步失败都会触发全局回滚。

5.4 验证事务行为

通过单元测试验证:

@Test
public void testPlaceOrderSuccess() throws Exception {
    int amount = 1;
    long initialBalance = getBalance(inventoryDataSource, productId);
    
    application.placeOrder(productId, amount);
    
    long finalBalance = getBalance(inventoryDataSource, productId);
    assertEquals(initialBalance - amount, finalBalance); // 库存应减少
}

@Test(expected = SQLException.class)
public void testPlaceOrderFailure() throws Exception {
    int amount = 10; // 超过约束
    long initialBalance = getBalance(inventoryDataSource, productId);
    
    application.placeOrder(productId, amount); // 应抛出异常
    
    long finalBalance = getBalance(inventoryDataSource, productId);
    assertEquals(initialBalance, finalBalance); // 库存应不变
}

5.5 高级用法

  • ✅ 可直接操作 XAResource,手动注册资源到事务管理器(灵活性更高,但代码更复杂)。
  • ✅ 使用 UserTransactionService 接口进行资源恢复控制,适用于需要精细控制的场景。

6. 与主流框架集成

6.1 集成 Spring + DataSource

Spring 的声明式事务能极大简化代码。只需添加 @Transactional 注解:

@Service
public class OrderService {
    
    @Autowired
    private DataSource inventoryDataSource;
    
    @Autowired
    private DataSource orderDataSource;
    
    @Transactional(rollbackFor = Exception.class)
    public void placeOrder(String productId, int amount) throws SQLException {
        // 无需手动管理事务边界
        try (Connection invConn = inventoryDataSource.getConnection();
             Connection ordConn = orderDataSource.getConnection()) {
            
            // 执行 SQL 操作...
        }
    }
}

Spring 配置:

@Configuration
@EnableTransactionManagement
public class AtomikosConfig {

    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean inventoryDataSource() {
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setUniqueResourceName("inventoryDB");
        ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        // ... 其他配置
        return ds;
    }

    @Bean
    public JtaTransactionManager transactionManager() throws SystemException {
        UserTransactionManager utm = new UserTransactionManager();
        utm.setForceShutdown(true);
        return new JtaTransactionManager(utm, new UserTransactionImp());
    }
}

6.2 集成 Spring + JPA + Hibernate

进一步使用 JPA/Hibernate,代码更简洁:

@Service
@Transactional(rollbackFor = Exception.class)
public class OrderService {

    @Autowired
    private InventoryRepository inventoryRepo;
    
    @Autowired
    private OrderRepository orderRepo;

    public void placeOrder(String productId, int amount) {
        Inventory inv = inventoryRepo.findById(productId).orElseThrow();
        inv.setBalance(inv.getBalance() - amount);
        inventoryRepo.save(inv);

        Order order = new Order();
        order.setOrderId(UUID.randomUUID().toString());
        order.setProductId(productId);
        order.setAmount((long) amount);
        orderRepo.save(order); // amount > 5 时会因 @Max(5) 失败
    }
}

✅ 踩坑提醒:多数据源配置较繁琐,建议结合 Spring Boot 自动配置简化。

7. 超越 JTA:TCC 模式

JTA/XA 要求资源支持两阶段提交,但在微服务中,很多服务(如 HTTP 接口)并不支持 XA。

Atomikos 商业版提供 Try-Confirm/Cancel (TCC) 模式,一种基于补偿的分布式事务方案:

  • Try:预留资源(如冻结库存)
  • Confirm:确认执行(如扣除冻结库存)
  • Cancel:取消操作(如释放冻结库存)

TCC 更适合业务场景,但需要服务提供方支持 TCC 接口,且仅限商业版使用。

8. 替代方案对比

8.1 Narayana

  • 优点:历史悠久,Red Hat 支持,功能全面,支持 C/C++ 绑定。
  • 缺点:配置复杂,学习曲线陡峭。
  • 🔄 对比:功能比 Atomikos 更强大,但 Atomikos 更轻量、易上手。

8.2 Bitronix

  • 优点:纯开源,嵌入式,日志和错误报告优秀,与 Spring 集成好。
  • 缺点:无商业支持,不支持微服务级弹性伸缩。
  • 🔄 对比:适合开源项目,但 Atomikos 商业版在企业级功能上更胜一筹。

9. 总结

Atomikos 是一个轻量、嵌入式的 JTA 事务管理器,非常适合 Spring Boot 等现代 Java 应用。它能让你在不依赖应用服务器的情况下,轻松实现跨数据库的分布式事务。

  • 核心优势:简单易用、可嵌入、与 Spring 生态无缝集成。
  • 适用场景:多数据源事务、消息队列事务、云原生应用。
  • 进阶选择:商业版支持 TCC,适用于复杂微服务架构。

🔗 源码示例:GitHub - persistence-modules/atomikos


原始标题:A Guide to Atomikos | Baeldung