专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

Spring Boot 分布式事务实现简单得超乎想象

ins518 2025-07-28 16:59:42 技术文章 17 ℃ 0 评论

环境:SpringBoot2.7.18 + Atomikos4.x + MySQL5.7



1. 简介

关于什么是分布式事务,本文不做介绍。有需要了解的自行查找相关的资料。

本篇文章将基于SpringBoot整合Atomikos实现分布式事务。

1.1 什么是Atomikos

Atomikos 是一个第三方的事务管理器(Transaction Manager),它是对Java Transaction API (JTA) 的实现,旨在提供可靠、高效的分布式事务处理能力。Atomikos 与 JTA 之间的关系可以描述为:实现的关系:

  • JTA:Java Transaction API 是一套标准的Java接口规范,定义了应用程序如何与事务管理器交互,以进行分布式事务的管理和控制。JTA规范由Java Community Process (JCP)制定,为Java开发者提供了一种与具体数据库或事务资源无关的、统一的事务处理编程模型。
  • Atomikos:作为一款第三方事务管理软件,Atomikos TransactionsEssentials 是 JTA 规范的一个具体实现。它实现了JTA所定义的接口,如下接口:
javax.transaction.UserTransaction;
javax.transaction.TransactionManager;

Atomikos 作为 JTA 实现,负责实际的事务管理任务,包括:

  • 事务协调:按照两阶段提交(2PC)协议或者其他分布式事务协议来协调参与事务的多个资源(如数据库、消息中间件)。
  • 事务生命周期管理:提供API供应用程序启动、提交、回滚事务,以及查询事务状态。
  • 故障恢复:在出现系统故障时,具备一定的事务恢复能力,保证事务的最终一致性。
  • 性能优化:可能包含针对特定环境的性能优化策略,如事务超时管理、并发控制、缓存机制等。


1.2 什么是JTA

Java Transaction API(JTA)是Java的应用程序接口(API),用于处理分布式事务,尤其是在企业级Java应用程序(Java EE)的环境中。JTA使得开发人员能够在一个跨多个数据库或事务资源(例如JMS消息队列)的操作中管理事务的完整生命周期,确保数据的一致性和完整性。

JTA的核心概念包括:

  1. 分布式事务:JTA支持跨越多个网络计算机资源的事务管理,允许在不同的数据库或事务资源之间同步提交或回滚事务。
  2. XA协议:JTA依赖于XA协议来协调不同事务资源之间的事务。XA是分布式事务处理的标准接口,定义了两阶段提交协议来确保所有参与事务的资源要么一起成功提交,要么一起回滚。
  3. 事务管理器(Transaction Manager):JTA事务管理器负责协调全局事务,监控事务边界,并决定何时提交或回滚事务。
  4. 资源管理器(Resource Manager):如JDBC XA兼容的数据库驱动程序(像MySQL,Oracle等都实现了XA协议),它们参与到全局事务中并能响应事务管理器的指令。

通过JTA,开发者无需直接编写复杂的事务管理代码,而是可以通过容器提供的事务服务或者编程方式来声明事务边界、设置事务属性以及控制事务的传播行为等。在Java EE应用服务器中,JTA通常与EJB(Enterprise JavaBeans)容器集成,使得EJB方法能够在无感知的情况下自动参与到事务管理中去。此外,Spring框架也提供了对JTA事务的支持,使得在非Java EE环境下也能方便地使用分布式事务处理功能。

以上是关于Atomikos与JTA的简单介绍,接下来将通过实际的案例来演示。

2. 实战案例

2.1 环境准备

数据库

创建2个数据库,如下:

不用创建表,因为我们将使用JPA,由JPA自动生成相关的表。

项目依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

SpringBoot已经为我们集成了atomikos,所以这里只需要引入对应的starter即可。

2.2 核心配置类

定义domain对象

分别放到不同的package中,如下:

// pkg: com.pack.domain.customer
@Entity
@Table(name = "customer")
public class Customer {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Integer id;
  @Column(name = "name", nullable = false)
  private String name;
  @Column(name = "age", nullable = false)
  private Integer age;
}
// pkg: com.pack.domain.order
@Entity
@Table(name = "orders")
public class Order {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Integer id;
  @Column(name = "code", nullable = false)
  private Integer code;
  @Column(name = "quantity", nullable = false)
  private Integer quantity;
}

定义Repository接口

// pkg: com.pack.repository.customer
public interface CustomerRepository extends JpaRepository<Customer, Integer> {
}
// pkg: com.pack.repository.order
public interface OrderRepository extends JpaRepository<Order, Integer> {
}

定义Service

// pkg: com.pack.service.customer
@Service
public class CustomerService{


  @Resource
  private CustomerRepository customerRepository ;


  @Transactional
  public void save(Customer customer){
    this.customerRepository.save(customer) ;
  }
}
// pkg: com.pack.service.order
@Service
public class OrderService{


  @Resource
  private OrderRepository orderRepository ;


  @Transactional
  public void save(Order order){
    this.orderRepository.save(order) ;
    throw new RuntimeException("订单发生异常") ;
  }
}

接下来是重点了,需要分别对每一个数据源进行相应JPA的配置(如果你使用MyBatis也是同样的原理,对相应的如:SqlSessionFactory等进行配置)。

Customer相应配置

@ConfigurationProperties(prefix = "pack.datasource.customer")
public class CustomerDataSourceProperties {


  private String jdbcUrl ;
  private String username ;
  private String password ;
}
// 下面是针对JPA及XA数据源的配置
@Configuration
@DependsOn("transactionManager")      
// 设置jpa repository扫描的包 及相应的事务管理器
@EnableJpaRepositories(basePackages = "com.pack.repository.customer", entityManagerFactoryRef = "customerEntityManager", transactionManagerRef = "transactionManager")
@EnableConfigurationProperties(CustomerDataSourceProperties.class)
public class CustomerConfig {


  @Resource
  private JpaVendorAdapter jpaVendorAdapter;
  @Resource
  private CustomerDataSourceProperties properties ;


  // 数据源配置该数据源是由MySQL驱动程序提供的 
  @Bean(name = "customerDataSource", initMethod = "init", destroyMethod = "close")
  DataSource customerDataSource() throws Exception {
    MysqlXADataSource dataSource = new MysqlXADataSource();
    dataSource.setUrl(properties.getJdbcUrl()) ;
    dataSource.setUser(properties.getUsername()) ;
    dataSource.setPassword(properties.getPassword()) ;
    dataSource.setPinGlobalTxToPhysicalConnection(true) ;
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(dataSource);
    xaDataSource.setUniqueResourceName("xa-customer");
    return xaDataSource;
  }


  @Bean(name = "customerEntityManager")
  @DependsOn("transactionManager")
  LocalContainerEntityManagerFactoryBean customerEntityManager() throws Throwable {
    HashMap<String, Object> properties = new HashMap<String, Object>();
    properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
    // 设置事务类型为JTA
    properties.put("javax.persistence.transactionType", "JTA") ;
    // 指定数据库方言
    properties.put("hibernate.dialect", "org.hibernate.dialect.MySQL5InnoDBDialect") ;  


    LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
    entityManager.setJtaDataSource(customerDataSource());
    entityManager.setJpaVendorAdapter(jpaVendorAdapter);
    // 配置JPA扫描的包
    entityManager.setPackagesToScan("com.pack.domain.customer");
    entityManager.setPersistenceUnitName("customerPersistenceUnit");
    entityManager.setJpaPropertyMap(properties);
    return entityManager;
  }


}

Order相应配置

@ConfigurationProperties(prefix = "pack.datasource.order")
public class OrderDataSourceProperties {
  private String jdbcUrl ;
  private String username ;
  private String password ;
}
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = "com.pack.repository.order", entityManagerFactoryRef = "orderEntityManager", transactionManagerRef = "transactionManager")
@EnableConfigurationProperties(OrderDataSourceProperties.class)
public class OrderConfig {


  @Resource
  private JpaVendorAdapter jpaVendorAdapter;
  @Resource
  private OrderDataSourceProperties properties ;


  @Bean(name = "orderDataSource", initMethod = "init", destroyMethod = "close")
  DataSource customerDataSource() throws Exception {
    MysqlXADataSource dataSource = new MysqlXADataSource();
    dataSource.setUrl(properties.getJdbcUrl()) ;
    dataSource.setUser(properties.getUsername()) ;
    dataSource.setPassword(properties.getPassword());
    dataSource.setPinGlobalTxToPhysicalConnection(true) ;
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(dataSource);
    xaDataSource.setUniqueResourceName("xa-order");
    return xaDataSource;
  }


  @Bean(name = "orderEntityManager")
  @DependsOn("transactionManager")
  LocalContainerEntityManagerFactoryBean customerEntityManager() throws Throwable {
    HashMap<String, Object> properties = new HashMap<String, Object>();
    properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName()) ;
    properties.put("javax.persistence.transactionType", "JTA") ;
    properties.put("hibernate.dialect", "org.hibernate.dialect.MySQL5InnoDBDialect") ;


    LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
    entityManager.setJtaDataSource(customerDataSource());
    entityManager.setJpaVendorAdapter(jpaVendorAdapter);
    entityManager.setPackagesToScan("com.pack.domain.order");
    entityManager.setPersistenceUnitName("orderPersistenceUnit");
    entityManager.setJpaPropertyMap(properties);
    return entityManager;
  }


}

主配置

@Configuration
@ComponentScan
@EnableTransactionManagement
public class MainConfig{


  @Bean
  JpaVendorAdapter jpaVendorAdapter(){
    HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter();
    // 显示SQL
    hibernateJpaVendorAdapter.setShowSql(true);
    // 生成ddl语句
    hibernateJpaVendorAdapter.setGenerateDdl(true);
    // 数据库类型
    hibernateJpaVendorAdapter.setDatabase(Database.MYSQL);
    return hibernateJpaVendorAdapter;
  }


  @Bean(name = "userTransaction")
  UserTransaction userTransaction() throws Throwable {
    UserTransactionImp userTransactionImp = new UserTransactionImp();
    userTransactionImp.setTransactionTimeout(10000) ;
    return userTransactionImp;
  }


  @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
  TransactionManager atomikosTransactionManager() throws Throwable {
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    userTransactionManager.setForceShutdown(false);
    AtomikosJtaPlatform.transactionManager = userTransactionManager;
    return userTransactionManager;
  }


  @Bean(name = "transactionManager")
  @DependsOn({"userTransaction", "atomikosTransactionManager"})
  PlatformTransactionManager transactionManager() throws Throwable {
    UserTransaction userTransaction = userTransaction();
    AtomikosJtaPlatform.transaction = userTransaction;
    TransactionManager atomikosTransactionManager = atomikosTransactionManager();
    return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
  }
}

以上就是所有的核心配置类,接下来做测试

准备测试Service

@Service
public class BaseService{


  @Resource
  private CustomerService customerService ;
  @Resource
  private OrderService orderService ;


  // 正常执行
  @Transactional
  public void save(Customer customer, Order order){
    this.customerService.save(customer) ;
    this.orderService.save(order) ;
  }


  // 入口抛出异常(测试是否两个都进行了回滚)
  @Transactional
  public void saveLocalExecption(Customer customer, Order order){
    this.customerService.save(customer) ;
    this.orderService.save(order) ;
    throw new RuntimeException("LocalException 发生异常") ;
  }


  // 测试当Customer接口抛出了异常
  @Transactional
  public void saveCustomerException(Customer customer, Order order){
    this.customerService.save(customer) ;
    this.orderService.save(order) ;
  }


}

正常测试

@Test
public void testSaveNormal(){
  Customer customer = new Customer() ;
  customer.setAge(10) ;
  customer.setName("张三") ;


  Order order = new Order() ;
  order.setCode(10001) ;
  order.setQuantity(10) ;


  baseService.save(customer, order) ;
}

输出结果

成功提交事务,查看数据库都有数据。

异常测试1

@Test
public void testSaveLocalExecption(){
  Customer customer = new Customer() ;
  customer.setAge(10) ;
  customer.setName("张三") ;


  Order order = new Order() ;
  order.setCode(10001) ;
  order.setQuantity(10) ;


  baseService.saveLocalExecption(customer, order) ;
}

输出结果

两个事务都回滚,数据库没有insert数据。

异常测试2

与上面的结果相同。

到此通过Atomikos实现的分布式事务就成功完成了。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表