掘金 后端 ( ) • 2024-04-06 12:03

事务基本流程

本人开发中遇到Transaction加多数据源问题。虽然换成DSTransaction成功解决了该问题。但是还是很好奇为什么会失效。之前到处百度,永远都是什么代理直走了Transcation的,不走DS的。讲了也白讲根本不到点子上,还到处一模一样的文章。所以我自己debugger分析,并分享出来个人的见解。不一定准确,如有问题,请点出指教。 前面是事务中相关知识点,只挑后面有用的讲,后面则是失效的原因。

首先我们知道Spring事务其实是一个AOP,在Bean初始化的时候就已经形成了代理。那么我们直接看下他的具体实现,他的拦截器TransactionInterceptor的invoke方法,动态代理常规的操作。核心方法是invokeWithinTranscation。

@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
    //核心执行方法
    return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new TransactionAspectSupport.CoroutinesInvocationCallback() {
        @Nullable
        public Object proceedWithInvocation() throws Throwable {
            return invocation.proceed();
        }

        public Object getTarget() {
            return invocation.getThis();
        }

        public Object[] getArguments() {
            return invocation.getArguments();
        }
    });
}

核心执行逻辑

我们继续看下去,invokeWithTranscation方法,咱们直接跳过部分其他方法感兴趣可以自己研究下,看下几个比较关键的东西。 1.取得TranscationManager,这个是个核心的类 2.createTranscationIfNecessary 看方法名大概是获得事务的信息

@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, InvocationCallback invocation) throws Throwable {
//获取事务信息,比如事务的配置传播属性等等
    TransactionAttributeSource tas = this.getTransactionAttributeSource();
    TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
    //决定何种TranscationManager
    TransactionManager tm = this.determineTransactionManager(txAttr);
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {...} else {
    //封装成PlatformTransactionManager
        PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);
        //生成唯一标识名称(具体没有细究,看了下就是方法名和目标类名称拼接)
        String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
        if (txAttr != null && ptm instanceof CallbackPreferringPlatformTransactionManager) {...} else {
            //核心1
            TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
            Object retVal;
            try {
            //继续执行invoke方法
                retVal = invocation.proceedWithInvocation();
            } catch (Throwable var20) {
                this.completeTransactionAfterThrowing(txInfo, var20);
                throw var20;
            } finally {
            //清楚事务信息
                this.cleanupTransactionInfo(txInfo);
            }

            if (retVal != null && vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {
                TransactionStatus status = txInfo.getTransactionStatus();
                if (status != null && txAttr != null) {
                    retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
            }
            //提交事务
            this.commitTransactionAfterReturning(txInfo);
            return retVal;
        }
    }
}

事务的核心类:TransactionManager

直接点进TransactionManager并查看他的类图:

image.png 他有很多实现类,具体涉及的JdbcTransactionManager,DataSourceTransactionManager。上述代码中有一步就是转换为PlatFormTransactionManager接口。看一下PlatFormTransactionManager接口中的方法。PS:TransactionManager里面是没有方法的。

public interface PlatformTransactionManager extends TransactionManager {
    TransactionStatus getTransaction(@Nullable TransactionDefinition var1) throws TransactionException;

    void commit(TransactionStatus var1) throws TransactionException;

    void rollback(TransactionStatus var1) throws TransactionException;
}

可以得出,这个涉及事务的核心方法,获取事务状态,提交事务,回滚事务。大概这是一个事务极其重要的类,负责几个核心的操作。我们先了解下,后面看他怎么用。

核心执行逻辑中的createTransactionIfNecessary的getTransaction方法:创建事务信息

在上面了解TranscationManager,暂时不知道他具体用处。我们回到核心执行逻辑里的createTransactionIfNecessary。方法如下:

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
//看上边获得属性里是否有名称,没有就返回默认名称。
    if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
        txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
        //获取事务
            status = tm.getTransaction((TransactionDefinition)txAttr);
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
        }
    }

    return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}

这里面比较简单,没有具体代码,都在getTransaction里面。发现这里使用了TranscationManager方法的GetTransaction方法了。我们基本上都是用JdbcTranscationManager,而JdbcTranscationManaer是继承DataSourceTransactionManager,然后又继承AbstractPlatformTransactionManager。这个方法直接去了AbstractPlatformTransactionManager的getTransaction方法。我们这边不会去仔细看嵌套事务处理方法。这个是事务的代码。我们直接读取对数据源切换失效的核心代码。如下

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();
    //获取事务
    Object transaction = this.doGetTransaction();
    boolean debugEnabled = this.logger.isDebugEnabled();
    //根据transaction是否存在,判断是否有嵌套事务
    if (this.isExistingTransaction(transaction)) {
    //处理嵌套事务
        return this.handleExistingTransaction(def, transaction, debugEnabled);
    } else if (def.getTimeout() < -1) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    } else if (def.getPropagationBehavior() == 2) {
        throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
    } else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {
        if (def.getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
            this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);
        }

        boolean newSynchronization = this.getTransactionSynchronization() == 0;
        return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
    } else {
        SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
        if (debugEnabled) {
            this.logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }

        try {
            return this.startTransaction(def, transaction, debugEnabled, suspendedResources);
        } catch (Error | RuntimeException var7) {
            this.resume((Object)null, suspendedResources);
            throw var7;
        }
    }
}

doGetTransaction

继续看下这个方法。

protected Object doGetTransaction() {
    //事务相关属性设置
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
    //获取ConnectionHolder
    ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
    //将获取的ConnectionHolder保存到事务的对象里,首次获得Resource肯定是null,后面才开始加载先跳过
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

顾名思义,就是获得事务的实际操作,前两行都是事务相关属性操作。后面两行比较重要,我们先了解下这边出现的两个类。

ConnectionHolder和DataSourceTransactionObject

这边涉及到ConnectionHolder和DataSourceTransactionObject两个属性类。我们看下这个ConnectionHolder类里面有啥:

public class ConnectionHolder extends ResourceHolderSupport {
    public static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";
    @Nullable
    private ConnectionHandle connectionHandle;
    @Nullable
    private Connection currentConnection;
    private boolean transactionActive;
    @Nullable
    private Boolean savepointsSupported;
    private int savepointCounter;

我只截取他的属性。这边出现Connection类。他就是java.sql下的,就是数据库连接的那个Connection。这个类大概率是做保存数据源连接的。毕竟大家都知道事务是由数据库完成的,当然得通过这个链接去执行。

继续看下DataSourceTransactionObject类,这边一样截取重要属性。其实不难得出他就是里面缓存一个ConnectionHolder,和相关属性记录。

private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
    private boolean newConnectionHolder;
    private boolean mustRestoreAutoCommit;

    private DataSourceTransactionObject() {
    }

    public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
        super.setConnectionHolder(connectionHolder);
        this.newConnectionHolder = newConnectionHolder;
    }

TransactionSynchronizationManager(数据源切换核心1)

返回doGetTransaction方法中的TransactionSynchronizationManager.getResource( this.obtainDataSource())方法。 直观的看出来其实他就是对本地线程的一个Map进行操作。 对于Reousrce的代码很简单,无非是存取,但是他很重要,他是影响数据源切换的。

public abstract class TransactionSynchronizationManager {
    private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
    private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");

@Nullable
public static Object getResource(Object key) {
    //不知道是干啥的,反正就是key的处理
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    //具体获取MAP中的key为actualKey的ConnectionHolder对象。
    Object value = doGetResource(actualKey);
    if (value != null && logger.isTraceEnabled()) {
        logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }

    return value;
}
//没什么好说的,就是普通的map取值等等
@Nullable
private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = (Map)resources.get();
    if (map == null) {
        return null;
    } else {
        Object value = map.get(actualKey);
        if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) {
            map.remove(actualKey);
            if (map.isEmpty()) {
                resources.remove();
            }

            value = null;
        }

        return value;
    }
}
//赋值操作
public static void bindResource(Object key, Object value) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found
    if (map == null) {
       map = new HashMap<>();
       resources.set(map);
    }
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
       oldValue = null;
    }
    if (oldValue != null) {
       throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
             actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
    if (logger.isTraceEnabled()) {
       logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
             Thread.currentThread().getName() + "]");
    }
}

obtainDataSource

就这两方法,默认返回的是数据源。 这个方法用到次数很多。还是得注意下的。 其实他也没什么,他主要是获得的DynamicRoutingDataSource类。 比如我们这边用的Mybatis-plus配置多数据源,他就是DynamicRoutingDataSource,然后里面有多个数据源他会保存map参数,key为配置的数据源名称。咱们只需要知道她这边返回的就整个DynamicRoutingDataSource对象。

@Nullable
public DataSource getDataSource() {
    return this.dataSource;
}

protected DataSource obtainDataSource() {
    DataSource dataSource = this.getDataSource();
    Assert.state(dataSource != null, "No DataSource set");
    return dataSource;
}

继续看,毕竟咱们这个主要是讲解数据源失效。看下DynamicRoutingDataSource。

public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(DynamicRoutingDataSource.class);
    private static final String UNDERLINE = "_";
    private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap();
    private final Map<String, GroupDataSource> groupDataSources = new ConcurrentHashMap();
    @Autowired
    private List<DynamicDataSourceProvider> providers;
    private Class<? extends DynamicDataSourceStrategy> strategy = LoadBalanceDynamicDataSourceStrategy.class;
    private String primary = "master";
    private Boolean strict = false;
    private Boolean p6spy = false;
    private Boolean seata = false;

    public DynamicRoutingDataSource() {
    }

    protected String getPrimary() {
        return this.primary;
    }

doGetTransaction方法总结

看整个流程得出:该方法返回的是DataSourceTransactionObject。这里面其实就是暂存了一个ConnectionHolder,而ConnectionHolder中拥有这此数据源的链接。

startTransaction 启动事务线程(数据源切换核心2)

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
       boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// 判断是否需要开启事务同步 
// 默认是需要开启的
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 创建事务状态DefaultTransactionStatus 
    // 事务状态主要包含事务对象transaction和被挂起事务的数据suspendedResources
    DefaultTransactionStatus status = newTransactionStatus(
          definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 实际的开启事务并将开启的事务与当前线程绑定
    doBegin(transaction, definition);
    // 将事务的一些信息与当前线程绑定
    prepareSynchronization(status, definition);
    return status;
}

doBegin(数据源切换核心3)

跟踪doBegin方法,他所在的是DataSourceTransactionManager类中。直接把代码搬出来。我们只关注我这边标记的重点1,重点2,重点3。其他剩余的内容大概是给它赋值

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject)transaction;
    Connection con = null;

    try {
        if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        //上文介绍过,重点1.this.obtainDataSource() 这个就是获得DynamicRoutingDataSource,看后文解释
            Connection newCon = this.obtainDataSource().getConnection();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
        //重要2 刚刚看过txObject属性,就是将这个ConnectionHolder保存到里面。
        //上一次出现是null的,这次不为空了,保存进去
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        txObject.setReadOnly(definition.isReadOnly());
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }

            con.setAutoCommit(false);
        }

        this.prepareTransactionalConnection(con, definition);
        txObject.getConnectionHolder().setTransactionActive(true);
        int timeout = this.determineTimeout(definition);
        if (timeout != -1) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        if (txObject.isNewConnectionHolder()) {
        //重点3.绑定
            TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
        }

    } catch (Throwable var7) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, this.obtainDataSource());
            txObject.setConnectionHolder((ConnectionHolder)null, false);
        }

        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
    }
}
重点1.this.obtainDataSource().getConnection();

注意上文讲过这个是DynamicRoutingDataSource,补充下是 com.baomidou.dynamic.datasource这个包下的数据源类。这个getConnection方法在父类AbstractRoutingDataSource中,小伙伴们千万不要搞错。咱们看他的实现

public Connection getConnection() throws SQLException {
//首次为空,就是LocalThread本店变量存储
    String xid = TransactionContext.getXID();
    if (StringUtils.isEmpty(xid)) {
    //获取数据源中的Connection,我们知道我们的DataSource有多个数据源
        return this.determineDataSource().getConnection();
    } else {
        String ds = DynamicDataSourceContextHolder.peek();
        ds = StringUtils.isEmpty(ds) ? this.getPrimary() : ds;
        ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds);
        return (Connection)(connection == null ? this.getConnectionProxy(xid, ds, this.determineDataSource().getConnection()) : connection);
    }
}

继续下一步获取数据源。我们看下第一行,这个并不陌生这个操作是切换动态数据源。可以自己查看下DS原理。如果需要讲解,后续我在贴出来吧。应该大家都知道。毕竟各位遇到失效的时候肯定会看下DS的原理。

public DataSource determineDataSource() {
    String dsKey = DynamicDataSourceContextHolder.peek();
    return this.getDataSource(dsKey);
}

这里有一点注意,我们业务中分情况,如下。

所以第一次的时候DynamicDataSourceContextHolder.peek()一定是空!!! 这点记住很重要

   以下是业务中多数据源和事务的应用场景,大概这3种。
   1.给Service层加了Transaction注解,里面调用mapper加了DS。
   2. Controller加Transaction注解,里面Service加DS。
   3.同方法两个一起加

为什么我会说一定为空呢。首先我们知道这个aop他是有顺序的,我们自己写的时候也是可以给他指定Order去的。默认Transaction的优先级肯定比DS高。开始处理Transaction的AOP时候,DS的AOP压根还没开始。所以这个peek不到数据,本身就没开始往里面存。

既然首次为空,则进入determinePrimaryDataSource()

public DataSource getDataSource(String ds) {

    if (StringUtils.isEmpty(ds)) {
        return this.determinePrimaryDataSource();
    } else if (!this.groupDataSources.isEmpty() && this.groupDataSources.containsKey(ds)) {
        log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
        return ((GroupDataSource)this.groupDataSources.get(ds)).determineDataSource();
    } else if (this.dataSourceMap.containsKey(ds)) {
        log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
        return (DataSource)this.dataSourceMap.get(ds);
    } else if (this.strict) {
        throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named " + ds);
    } else {
        return this.determinePrimaryDataSource();
    }
}

不难看出来,或默认取的master返回。this.primary值就是master。

private DataSource determinePrimaryDataSource() {
    log.debug("dynamic-datasource switch to the primary datasource");
    DataSource dataSource = (DataSource)this.dataSourceMap.get(this.primary);
    if (dataSource != null) {
        return dataSource;
    } else {
        GroupDataSource groupDataSource = (GroupDataSource)this.groupDataSources.get(this.primary);
        if (groupDataSource != null) {
            return groupDataSource.determineDataSource();
        } else {
            throw new CannotFindDataSourceException("dynamic-datasource can not find primary datasource");
        }
    }
}

重点2.事务保存数据源连接

doBegin执行一个保存操作,把数据源连接封装进txObject中,这段上文有过解释。就是把这个数据源连接封装包装成ConnectionHolder并存进去事务信息里面去。

txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

重复一下,第一次出现保存连接的操作是保存为空的,之前里面什么都没有。而这次保存的却是主数据源连接(master)。小伙伴们应该注意到这也是可能出现数据源无法切换的原因。我们继续往下看。

重点3.TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());

看下两个参数。第一个参数获取整个数据源,第二个获得数据源连接(master)。

重要点:这时候Resource有值了Key是DynamicRoutingDataSource,VALUE是Master的数据源

public static void bindResource(Object key, Object value) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    //resources是TransactionSynchronizationManager的本地线程变量。之前一直是空没有保存的。这边把ConnectionHolder给缓存起来。
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found
    if (map == null) {
       map = new HashMap<>();
       resources.set(map);
    }
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
       oldValue = null;
    }
    if (oldValue != null) {
       throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
             actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
    if (logger.isTraceEnabled()) {
       logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
             Thread.currentThread().getName() + "]");
    }
}
doBegin总结

这边主要是获取,缓存数据源的操作。请仔细查看doBegin的实现。他很重要,在后面解释数据源无法切换是关键代码。

这边可以推断出一个疑问:为什么我的数据源切换失效时,我一个操作里有多个数据源切换,我第一步使用slave,再用slave2,明明没有使用master,但是他还是默认给的master数据源。 总结下,这段代码里的3步大概就是如下

重点1.获取数据源,但是我也解释了,他只会获得master数据源。

重点2.给事务保存数据源。

重点3.缓存数据源去TranscationManager

后续其他操作

其他后续事务操作,我们就不追究了,毕竟不是主要讲事务的。比如提交事务,回滚事务等等操作。读者有需要请自己去查看源码。也没多少了。

分割---------一下是正式解析为什么会失效-以上是事务相关的内容的补习

2.1 SqlSessionInterceptor

之前介绍的事务操作,熟悉后我们继续看。建议不熟练的按照我的方法里面每个都打断点,根据断点一步步看。没有类的上下文,光看真的很难。

这边开始我们去Mybatis查询逻辑去看下。和事务一样,他也是要AOP去处理数据源。

关注一下org.mybatis.spring.SqlSessionTemplate.SqlSessionInterceptor,内部类不要进入错误的同名类,是这个SqlSessionTemplate下的,不要断点打错。

继承InvocationHandler可知,他还是动态代理的。

private class SqlSessionInterceptor implements InvocationHandler {
    private SqlSessionInterceptor() {
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //切换重点1: 获取SqlSession
        SqlSession sqlSession = SqlSessionUtils.getSqlSession(SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);

        Object unwrapped;
        try {
        //执行后面的查询方法
            Object result = method.invoke(sqlSession, args);
            if (!SqlSessionUtils.isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
                sqlSession.commit(true);
            }

            unwrapped = result;
        } catch (Throwable var11) {
            unwrapped = ExceptionUtil.unwrapThrowable(var11);
            if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
                SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
                sqlSession = null;
                Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException)unwrapped);
                if (translated != null) {
                    unwrapped = translated;
                }
            }

            throw (Throwable)unwrapped;
        } finally {
            if (sqlSession != null) {
                SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
            }

        }

        return unwrapped;
    }
}

方法还是清晰简单的。获取SqlSession然后去执行查询方法,然后最后关闭SqlSession。直接看重点1处的代码。

2.1.1 SqlSession的获取()

这一步解释完大家就知道为什么会失效了。直接进入方法,一看。好家伙就是你害我苦苦寻找。幕后真凶来了。直接TransactionSynchronizationManager.getResource(sessionFactory)获取的。这是不是有点眼熟,但是又有点陌生。

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
    Assert.notNull(sessionFactory, "No SqlSessionFactory specified");
    Assert.notNull(executorType, "No ExecutorType specified");
    // 获取之前往事务管理器保存的SqlSessionHolder,
    SqlSessionHolder holder = (SqlSessionHolder)TransactionSynchronizationManager.getResource(sessionFactory);
    SqlSession session = sessionHolder(executorType, holder);
    //存在则返回给这个SqlSession
    if (session != null) {
        return session;
    } else {
        LOGGER.debug(() -> {
            return "Creating a new SqlSession";
        });
        //不存在则新建一个,并且之后会保存在TransactionSynchronizationManager中。所以首次会运行这段内容,后面则会运行上面那个
        session = sessionFactory.openSession(executorType);
        registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
        return session;
    }
}

这个Key好像也不对啊,我们之前key是DataSource的,但这边变成SqlSessionFactory。但是不要慌,没有就是null,事实断点调试下来,首次进入的时候就是null。那他进入else的分支下。我们看下else分支的代码。

2.1.2 真相大白-重点代码

我们点击openSession方法,最终会调用下面的这个方法。

private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
  Transaction tx = null;
  try {
    final Environment environment = configuration.getEnvironment();
    final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
    //重点代码
    tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
    final Executor executor = configuration.newExecutor(tx, execType);
    return new DefaultSqlSession(configuration, executor, autoCommit);
  } catch (Exception e) {
    closeTransaction(tx); // may have fetched a connection so lets call close()
    throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
  } finally {
    ErrorContext.instance().reset();
  }
}

看下创建了个默认的SqlSession,他根据executor来创建,继续 他是和Transaction有关。看下这个Transaction怎么创建的。先断点定位了transactionFactory是SpringManagedTransactionFactory。跟进他创建Transaction方法。

public class SpringManagedTransaction implements Transaction {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpringManagedTransaction.class);
    private final DataSource dataSource;
    private Connection connection;
    private boolean isConnectionTransactional;
    private boolean autoCommit;

    public SpringManagedTransaction(DataSource dataSource) {
        Assert.notNull(dataSource, "No DataSource specified");
        this.dataSource = dataSource;
    }

    public Connection getConnection() throws SQLException {
        if (this.connection == null) {
            this.openConnection();
        }

        return this.connection;
    }

    private void openConnection() throws SQLException {
        this.connection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.connection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
        LOGGER.debug(() -> {
            return "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring";
        });
    }

初始化很简单,直接把数据源放进去。然后我们返回2.1中的代码,会把这个传递到SqlSessionIntercetptor中的invoke方法,去执行下一个方法。

2.1.3 最终问题-DS切换失效的原因

但是我这边为什么把OpenConnection方法拿出来呢?这是因为在跟踪后面的代码里,发现sql执行前,会调用Tranaction的getConnection方法。而对应的Transaction刚好是这个SpringManagedTransaction.然后根据这个Connection去查询数据。所以这边也摆着失效的原因。 这边会出现两个分支一个针对有事务的时候,也就是获取了ConnectionHolder.一个是不存在事务的时候。记得之前事务拦截器分析的时候,我们默认会给里面Resource赋一个默认数据源。所以说只要你方法开启了事务,里面是有值的。如果不开启事务那他就是空的

拥有事务处理的时候逻辑

public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    Assert.notNull(dataSource, "No DataSource specified");
    //实际查询前会执行这个代码,如果没有事务这个就是null 如果有事务就是存在的,直接获取master数据源。这边我们之前分析了在2.1.1的时候存入TransactionSynchronizationManager了
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
       conHolder.requested();
       if (!conHolder.hasConnection()) {
          logger.debug("Fetching resumed JDBC Connection from DataSource");
          conHolder.setConnection(fetchConnection(dataSource));
       }
       return conHolder.getConnection();
    }
    // Else we either got no holder or an empty thread-bound holder here.
    //这里是分支 就是没有事务的时候执行方法
    logger.debug("Fetching JDBC Connection from DataSource");
    //重点:去获得数据源连接
    Connection con = fetchConnection(dataSource);

    if (TransactionSynchronizationManager.isSynchronizationActive()) {
       try {
          // Use same Connection for further JDBC actions within the transaction.
          // Thread-bound object will get removed by synchronization at transaction completion.
          ConnectionHolder holderToUse = conHolder;
          if (holderToUse == null) {
             holderToUse = new ConnectionHolder(con);
          }
          else {
             holderToUse.setConnection(con);
          }
          holderToUse.requested();
          TransactionSynchronizationManager.registerSynchronization(
                new ConnectionSynchronization(holderToUse, dataSource));
          holderToUse.setSynchronizedWithTransaction(true);
          if (holderToUse != conHolder) {
             TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
          }
       }
       catch (RuntimeException ex) {
          // Unexpected exception from external delegation call -> close Connection and rethrow.
          releaseConnection(con, dataSource);
          throw ex;
       }
    }

    return con;
}

没有事务的运行逻辑

private static Connection fetchConnection(DataSource dataSource) throws SQLException {
    Connection con = dataSource.getConnection();
    if (con == null) {
       throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource);
    }
    return con;
}
进入这个方法
public Connection getConnection() throws SQLException {
    String xid = TransactionContext.getXID();
    if (StringUtils.isEmpty(xid)) {
        return this.determineDataSource().getConnection();
    } else {
        String ds = DynamicDataSourceContextHolder.peek();
        ds = StringUtils.isEmpty(ds) ? this.getPrimary() : ds;
        ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds);
        return (Connection)(connection == null ? this.getConnectionProxy(xid, ds, this.determineDataSource().getConnection()) : connection);
    }
}
调用DynamicRoutingDataSource,获得了DS中配置的数据源,可以明确看出来这边DS是生效了,后面根据这个数据源获得Connection去执行查询。
public DataSource determineDataSource() {
    String dsKey = DynamicDataSourceContextHolder.peek();
    return this.getDataSource(dsKey);
}

DataSourceUtils.isConnectionTransactional

但是我们这边关注的是getConnection方法,因为我们迟早会调用这个获取Connection方法。 好了找到了,他此时获取的Resource不就是Key为DataSource吗?不就是master吗?原来如此。

public static boolean isConnectionTransactional(Connection con, @Nullable DataSource dataSource) {
    if (dataSource == null) {
       return false;
    }
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    return (conHolder != null && connectionEquals(conHolder, con));
}

补充:

debug执行链

1712322109781.png

1712322146797.png