掘金 后端 ( ) • 2024-03-07 13:57

@[TOC]

一、认识2PC - 两阶段提交

1、理论

理论性的东西,懒得再打一遍了,贴在这了: 分布式事务详解【分布式事务的几种解决方案】彻底搞懂分布式事务

关键的两张图: 下图展示了2PC的两个阶段,分成功和失败两个情况说明: 成功情况: 在这里插入图片描述

失败情况: 在这里插入图片描述

2、手撸XA-两阶段提交

(1)时序图

在这里插入图片描述

(2)代码实例

import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXid;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

@SpringBootTest
public class MysqlXaTest {

    @Test
    public void testXa() {
        try {
            //获取员工库的连接以及资源管理器
            JdbcConnection employeeConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/employee", "root", "rootroot");
            MysqlXAConnection employeeXAConnection = new MysqlXAConnection(employeeConnection, true);
            XAResource employeeXaResource = employeeXAConnection.getXAResource();

            //获取的员工薪资库的连接以及资源管理器
            JdbcConnection salaryConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/salary", "root", "rootroot");
            MysqlXAConnection salaryXAConnection = new MysqlXAConnection(salaryConnection, true);
            XAResource salaryXaResource = salaryXAConnection.getXAResource();


            // 全局事务id
            byte[] gtrid = "g00003".getBytes();
            // 分支事务id
            byte[] bqual = "b00001".getBytes();
            // 标识,一般是个固定值
            int formatId = 1;

            //开启员工插入的分支事务
            Xid employeeXid = new MysqlXid(gtrid, bqual, formatId);
            employeeXaResource.start(employeeXid, XAResource.TMNOFLAGS);
            PreparedStatement preparedStatement = employeeConnection.prepareStatement("insert into employee (name, sex, level) values ('小10', '女', '7')");
            preparedStatement.execute();
            employeeXaResource.end(employeeXid, XAResource.TMSUCCESS);

            //开启员工薪资的分支事务
            byte[] salaryBqual = "b00002".getBytes();
            Xid salaryXid = new MysqlXid(gtrid, salaryBqual, formatId);
            salaryXaResource.start(salaryXid, XAResource.TMNOFLAGS);
            PreparedStatement salaryPreparedStatement = salaryConnection.prepareStatement("insert into employee_salary (employee_id, salary) values ('12', 7000)");
            salaryPreparedStatement.execute();
            salaryXaResource.end(salaryXid, XAResource.TMSUCCESS);

            //第一阶段-准备阶段
            int employeePrepareResult = employeeXaResource.prepare(employeeXid);
            int salaryPrepareResult = salaryXaResource.prepare(salaryXid);

            //第二阶段-根据准备阶段的结果。判断是要执行commit还是rollback
            if (employeePrepareResult == XAResource.XA_OK && salaryPrepareResult == XAResource.XA_OK) {
                employeeXaResource.commit(employeeXid, false);
                salaryXaResource.commit(salaryXid, false);
            } else {
                employeeXaResource.rollback(employeeXid);
                salaryXaResource.rollback(salaryXid);
            }
        } catch (SQLException | XAException e) {
            throw new RuntimeException(e);
        }
    }
}

3、认识JTA

JTA(Java Transaction API):是Java平台上一个标准API,用于管理和控制分布式事务的执行流程。

核心类: javax.transaction.UserTransaction:暴露给应用使用,用来启动、提交、回滚事务。 javax.transaction.TransactionManager:提供给事务管理器的接口,用于协调和控制分布式事务的执行过程。 javax.transaction.XAResource:表示一个资源管理器,用于管理和操作资源。 javax.transaction.Xid:用于唯一标识一个分布式事务。

4、今天的主角:Atomikos

Atomikos是一个开源的事务管理器,用于管理和控制分布式事务的执行过程。提供了一个可靠的、高性能的事务管理解决方案,可以与多种应用程序和数据库集成。

简单理解就是,Atomikos是可以集成在我们Java代码里面,和我们的业务代码绑定到同一个Java进程里面的一个事务管理器的框架,可以帮助我们业务程序去自行实现分布式事务。

Atomikos特点:支持分布式事务、支持多种web服务器、支持多种数据库、支持XA协议、提供高性能的事务管理。

Atomikos可以解决,在同一个应用下,连接多个数据库,实现分布式事务。

5、2PC存在的问题

1、TM单点问题。TM挂掉之后,无法回滚和提交。 2、资源锁定的问题。资源锁定之后,TM挂掉无法回滚和提交。 3、性能瓶颈。资源锁定时间长。 4、数据不一致问题。commit时成功状态不一致就会造成数据不一致。

在这里插入图片描述

二、Atomikos使用

1、依赖+配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
server.port=8080

spring.employee-datasource.driverClassName = com.mysql.jdbc.Driver
spring.employee-datasource.jdbc-url = jdbc:mysql://localhost:3306/employee?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.employee-datasource.username = root
spring.employee-datasource.password = rootroot

spring.salary-datasource.driverClassName = com.mysql.jdbc.Driver
spring.salary-datasource.jdbc-url = jdbc:mysql://localhost:3306/salary?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.salary-datasource.username = root
spring.salary-datasource.password = rootroot

logging.level.com.atomikos = debug

2、定义AtomikosDataSourceBean数据源

import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
public class AtomikosDataSourceConfig {

    @Value("${spring.employee-datasource.jdbc-url}")
    private String employeeUrl;

    @Value("${spring.employee-datasource.username}")
    private String employeeUser;

    @Value("${spring.employee-datasource.password}")
    private String employeePassword;

    @Value("${spring.salary-datasource.jdbc-url}")
    private String salaryUrl;

    @Value("${spring.salary-datasource.username}")
    private String salaryUser;

    @Value("${spring.salary-datasource.password}")
    private String salaryPassword;

    /**
     * 定义两个数据源,分别对应两个数据库
     */
    @Bean(name = "employeeDataSource")
    public DataSource employeeDataSource(){
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("employeeDataSource");
        atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");

        Properties properties = new Properties();
        properties.setProperty("URL", employeeUrl);
        properties.setProperty("user", employeeUser);
        properties.setProperty("password", employeePassword);
        atomikosDataSourceBean.setXaProperties(properties);
        return atomikosDataSourceBean;
    }

    /**
     * 定义两个数据源,分别对应两个数据库
     */
    @Bean(name = "salaryDataSource")
    public DataSource salaryDataSource(){
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("salaryDataSource");
        atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");

        Properties properties = new Properties();
        properties.setProperty("URL", salaryUrl);
        properties.setProperty("user", salaryUser);
        properties.setProperty("password", salaryPassword);
        atomikosDataSourceBean.setXaProperties(properties);
        return atomikosDataSourceBean;
    }
}

3、定义事务管理器JtaTransactionManager

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

@Configuration
public class AtomikosConfig {

    // JTA的事务管理
    @Bean(name = "userTransaction")
    public UserTransaction userTransaction() {
        return new UserTransactionImp();
    }

    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() {
        return new UserTransactionManager();
    }

    /**
     * 事务管理器
     */
    @Bean(name = "platformTransactionManager")
    @DependsOn({"userTransaction", "atomikosTransactionManager"})
    public PlatformTransactionManager transactionManager() {
        UserTransaction userTransaction = userTransaction();
        TransactionManager transactionManager = atomikosTransactionManager();
        return new JtaTransactionManager(userTransaction, transactionManager);
    }
}

4、MyBatis配置

import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao", sqlSessionFactoryRef = "sqlSessionFactoryEmployee")
public class EmployeeMybatisConfig {

    @SneakyThrows
    @Bean
    public SqlSessionFactory sqlSessionFactoryEmployee(@Qualifier("employeeDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        return factoryBean.getObject();
    }
}

import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao1", sqlSessionFactoryRef = "sqlSessionFactorySalary")
public class SalaryMybatisConfig {

    @SneakyThrows
    @Bean
    public SqlSessionFactory sqlSessionFactorySalary(@Qualifier("salaryDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        return factoryBean.getObject();
    }
}

5、验证

@Transactional(rollbackFor = Exception.class)
public String join(EmployeeEntity employeeEntity) {
    //第一步,插入员工基础信息
    employeeDao.insertEmployee(employeeEntity);
    //第二步,插入员工薪资
    employeeSalaryDao.insertEmployeeSalary(employeeEntity.getId(), employeeEntity.getSalary());

    int i = 1 / 0;
    return "员工入职成功";
}

三、Atomikos源码分析

1、@Transactional入口:TransactionInterceptor创建事务流程

  • (1)Spring事务入口:@Transactional
  • (2)TransactionInterceptor#invoke:Spring事务的代理拦截方法
  • (3)TransactionAspectSupport#determineTransactionManager:确定事务管理器=>我们创建的JtaTransactionManager
  • (4)TransactionAspectSupport#createTransactionIfNecessary:创建事务
  • (5)AbstractPlatformTransactionManager#getTransaction:获取事务
  • (6)JtaTransactionManager#doGetTransaction:获取事务,拿到JtaTransactionObject,里面封装了UserTransactionImp
  • (7)JtaTransactionManager获取我们配置的UserTransactionImp 在这里插入图片描述 在这里插入图片描述

在这里插入图片描述 JtaTransactionManager#doGetTransaction:获取事务 在这里插入图片描述 在这里插入图片描述

2、启动事务

  • (1)从AbstractPlatformTransactionManager#handleExistingTransaction调用AbstractPlatformTransactionManager#startTransaction开启事务
  • (2)调用JtaTransactionManager#doBegin开启事务
  • (3)调用JtaTransactionManager#doJtaBegin开启事务
  • (4)调用UserTransactionImp#begin开启事务
  • (5)最终调用的是UserTransactionManager#begin开启事务
  • (6)调用TransactionManagerImp#begin()开启事务
  • (7)调用CompositeTransactionManagerImp#createCompositeTransaction创建分布式事务 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

3、小总结:启动全局事务流程图

在这里插入图片描述

4、分支事务,业务流程执行过程

在这里插入图片描述

5、事务提交与回滚

在这里插入图片描述 在这里插入图片描述