掘金 后端 ( ) • 2024-04-26 09:55

theme: smartblue

Spring + MyBatis 基本是现在 Java Web 开发的标配了,MyBatis 提供简单易用的 ORM 能力,Spring 提供容器管理立即及极强的扩展能力,两者结合简直是所向披靡。在使用关系数据库的时候,事务的支持至关重要,在使用 JDBC 的时候,我们可以通过获取 Connection,并且设置 Connection.setAutoCommit(false) 来手动控制事务的开启,那么 Spring 控制的事务是如何与 MyBatis 结合并正确作用的呢?

这里只分析简单事务也就是 DataSourceTransactionManager。

TransactionInterceptor

首先肯定得找到事务入口,看过 Spring 源码的同学一定都能找 Spring TX 的入口就是在TxAdviceBeanDefinitionParser。这里将解析 TX 的配置,生成 TransactionInterceptor 对象,这个就是一个普通的切面类,只要符合 AOP 规则的调用都会进入此切面。

在 invoke 方法中最重要的一段代码,会调用 invokeWithinTransaction() 方法,这里就是开启、处理事务的核心过程了。

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
    // 获取配置的TransactionAttribute信息
    final TransactionAttribute txAttr = this.getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    final PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
    final String joinpointIdentification = this.methodIdentification(method, targetClass);
    if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
        ... ....  // CallbackPreferringPlatformTransactionManager 暂时不需要分析
    } else {
        //开启一个新的事务
        TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // 被代理对象的原始代码逻辑执行
            retVal = invocation.proceedWithInvocation();
        } catch (Throwable var15) {
            // 发生异常时候对异常的处理
            this.completeTransactionAfterThrowing(txInfo, var15);
            throw var15;
        } finally {
            // 清理TransactionInfo信息
            this.cleanupTransactionInfo(txInfo);
        }
		//提交事务
        this.commitTransactionAfterReturning(txInfo);
        return retVal;
    }
}

createTransactionIfNecessary()

首先看下开启事务过程,也就是 createTransactionIfNecessary() 方法:

protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
    ... ....
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction((TransactionDefinition)txAttr);
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
        }
    }
    return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}

可以看到主要就是通过 TransactionManager 来获取一个事务,这里的 TransactonManager 就是 PlatformTransactionManager。

PlatformTransactionManager#getTransaction()

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();
    if (definition == null) {
        definition = new DefaultTransactionDefinition();
    }
    // 这个判断很重要,是否已经存在的一个transaction
    if (isExistingTransaction(transaction)) {
        // 如果是存在的事务做一些必要的处置
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }
    ... ...
    // 如果是事务传播级别是 PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,
    // PROPAGATION_NESTED 这三种类型将开启一个新的事务
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 开启新事物
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        ... ...
    }
    else {
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}

doGetTransaction()

这段代码是比较核心的一段代码,首先这里将执行 doGetTransaction() 方法来获取一个 Transaction。

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    // 这一行代码中 TransactionSynchronizationManager 很重要
    // 是对connection的核心获取、持有、删除等
    ConnectionHolder conHolder =
        (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    // 这里不论获取到或者获取不到都将此设置newConnectionHolder为false
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

这段代码中主要是根据 this.dataSource 来获取 ConnectionHolder,ConnectionHolder 是放在TransactionSynchronizationManager 的 ThreadLocal 中持有的,如果是第一次来获取,肯定得到的是 null。

接着代码往下将执行到 isExistingTransaction(),这里主要是判断依据是:

txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()

如果是第一次开启事务这里判断结果必然是 false,否则将是 true。

我们这里先讨论第一次进入的情况,也就是判断结果为 false 的时候。如果判断结果为 false,将继续往下执行到判断事务 Propagation 的时候了,如果 Propagation 为:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED 中的一个将开启一个新事物。创建一个新的DefaultTransactionStatus ,并且将 newTransaction 设置为 true,这个状态很重要,因为后面不论回滚、提交都是根据这个属性来判断是否在这个 TransactionStatus 上来进行。

doBegin()

接着将执行 doBegin() 方法:

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
        if (txObject.getConnectionHolder() == null ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 从dataSource 中获取一个 Connection
            Connection newCon = this.dataSource.getConnection();
            // 为当前 Transaction 设置 ConnectionHolder
            // 并且设置 newConnectionHolder 为 true,标识这是一个新的 connection
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();
        // 这里主要是根据 definition 对 connection 进行一些设置
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        // 开启事务,设置autoCommit为false
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            con.setAutoCommit(false);
        }
        // 这里设置 transactionActive 为true
        // 还记得前面判断是否存在的 transaction 吧?就是根据这个
        txObject.getConnectionHolder().setTransactionActive(true);
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }
        if (txObject.isNewConnectionHolder()) {
            // 这里将当前的 connection  放入TransactionSynchronizationManager 中持有
            // 如果下次调用就可以判断为已有的事务
            TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
        }
    }

是不有了熟悉的代码?这里其实主要就是从 DataSource 中获取一个新的 Connection,并创建一个ConnectionHolder 来持有这个 Connection,并将 ConnectionHolder 放入 TransactionManager 中持有。记得前面 doGetTransaction() 方法吧,如果同一个线程,再此执行的话就会获取到同一个 ConnectionHolder,再执行 isExistingTransaction 方法也可以判定为是已有的 Transaction。

并且在这里将调用 con.setAutoCommit(false) 真正的开启事务。

prepareSynchronization()

我们接着看 getTransaction 方法中调用的 prepareSynchronization() 方法,这段代码比较简单,主要是对TransactionSynchronizationManager 的一系列设置。

prepareTransactionInfo()

返回后将继续执行 createTransactionIfNecessary() 方法中的 prepareTransactionInfo() 方法。

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
            TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        txInfo.newTransactionStatus(status);
    }
    txInfo.bindToThread();
    return txInfo;
}

private void bindToThread() {
    this.oldTransactionInfo = transactionInfoHolder.get();
    transactionInfoHolder.set(this);
}

这里其实也比较简单,主要生成一个 TransactionInfo 并绑定到当前线程的 ThreadLocal,通过TransactionAspectSupport.currentTransactionStatus() 可以获取当前的 Transaction 状态。

invocation.proceedWithInvocation()

prepareTransactionInfo() 方法执行完成后,就完成了 Transaction 的创建过程,将返回上层代码,执行 invocation.proceedWithInvocation() 方法也就是执行被代理对象的实际业务代码了。

cleanupTransactionInfo()

执行完毕后,如果存在异常将执行回滚操作方法 completeTransactionAfterThrowing,最后将进入 finally 代码块,将执 cleanupTransactionInfo() 方法,这里将调用 TransactionInfo#restoreThreadLocalStatus()。

private void restoreThreadLocalStatus() {
    transactionInfoHolder.set(this.oldTransactionInfo);
}

这里就是将 TransactionInfo 进行重置工作,让它恢复到前一个状态。

commitTransactionAfterReturning()

如果没有发生回滚,将执行提交操作方法 commitTransactionAfterReturning(),我们一起来分析下提交过程吧,回滚的过程操作与提交过程类似。

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
    if (txInfo != null && txInfo.hasTransaction()) {
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

可以看到也没有做其他的操作,只是调用了 TransactionManager#commit() 方法,在 commit() 方法中,经过判断将执行 processCommit() 方法。

TransactionManager#processCommit()

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;
        try {
            prepareForCommit(status);
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;
            boolean globalRollbackOnly = false;
            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                globalRollbackOnly = status.isGlobalRollbackOnly();
            }
            if (status.hasSavepoint()) {
                status.releaseHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                doCommit(status);
            }
            if (globalRollbackOnly) {
                throw new UnexpectedRollbackException(
                    "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        ... ...
        try {
            triggerAfterCommit(status);
        }
        finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    }
    finally {
        cleanupAfterCompletion(status);
    }
}

首先将执行一些提交前的准备工作,这里将进行是否有 savepoint 判断,如果有的话将释放 savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint)。

接着就判断是否是新的 Transaction:DefaultTransactionStatus#isNewTransaction(),如果是的话将执行 doCommit() 方法。

doCommit()

protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        con.commit();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    }
}

其实也就是调用了 Connection#commit() 方法,也是 JDBC 事务控制的本质。

cleanupAfterCompletion()

最后无论成功与否都将调用 finally 块中的 cleanupAfterCompletion() 方法。

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
        // TransactionSynchronizationManager 清理工作
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
      	// 这个比较重要
        doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
        resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

首先对 TransactionSynchronizationManager 进行一系列清理工作,然后就将执行doCleanupAfterCompletion() 方法。

doCleanupAfterCompletion()
protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    if (txObject.isNewConnectionHolder()) {
        // 从 TransactionSynchronizationManager 中解绑相应的 connectionHolder
        TransactionSynchronizationManager.unbindResource(this.dataSource);
    }
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        // 对获取到的 Connection 进行一些还原
        if (txObject.isMustRestoreAutoCommit()) {
            con.setAutoCommit(true);
        }
        DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
    }
    catch (Throwable ex) {
    }
    if (txObject.isNewConnectionHolder()) {
        // 如果是 newConnection 将这个链接关闭,如果是连接池将还给连接池
        DataSourceUtils.releaseConnection(con, this.dataSource);
    }
    // 这里将置 transactionActive 为 false
    txObject.getConnectionHolder().clear();
}

其实就是将 TransactionSynchronizationManager 中持有的 ConnectionHolder 释放,并且还原当前Connection 的状态,将 Connection 交还连接池,然后将对当前的 Transaction 进行清理,包括设置 transactionActive 为 false 等。

事务传播过程

事务是怎么传播的呢?其实上面已经说了,在 PlatformTransactionManager#getTransaction() 的时候,将判断是否是一个已经存在的事务。

PlatformTransactionManager#getTransaction()

// 判断是否已经存在 Transaction
if (isExistingTransaction(transaction)) {
    return handleExistingTransaction(definition, transaction, debugEnabled);
}


protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
}

isExistingTransaction() 这个方法主要是用来判断当前线程是否已经存在一个 Transaction,且需要是活跃的,如果是的话进入执行 handleExistingTransaction() 方法:

handleExistingTransaction()

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
	// 事务传播级别为不开启,经抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
            "Existing transaction found for transaction marked with propagation 'never'");
    }
    // 事务传播级别为非事务方式执行,以非事务方式运行,如果存在事务,将当前事务挂起
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
            definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    // 事务传播级别为开启新事务,将创建一个新的事务,与第一次开启事务的过程一致
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
        catch (Error beginErr) {
            resumeAfterBeginException(transaction, suspendedResources, beginErr);
            throw beginErr;
        }
    }
	// 事务传播级别为嵌套事务,则在嵌套事务中执行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                "Transaction manager does not allow nested transactions by default - " +
                "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }
    ... ...
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

首先需要根据 TransactionDefinition#getPropagationBehavior() 方法来获取当前事务传播属性,比如说如果是 TransactionDefinition.PROPAGATION_REQUIRES_NEW 的时候,就需要新建一个事务,细看其中的代码,是否完全跟之前分析第一次事务开启的时候一样?

如果都不算上面判断的事务传播级别将执行到最后,也就是当是 TransactionDefinition.PROPAGATION_REQUIRED 的时候,将初始化一个 TransactionStatus,并且将此 TransactionStatus 的 newTransaction 属性置为 false。

其后的调用过程跟上篇分析一致,而提交或者回滚的时候都会根据 TransactionStatus 进行一些判断。

至此整个 Spring 的事务处理过程也就结束了。