掘金 后端 ( ) • 2024-05-17 10:27

title: Spring事务源码分析(二):事务管理器的运行原理 date: 2023-05-11 tags: Spring源码分析 categories: Spring源码分析

概述

spring-tx 模块为 Spring 框架提供了强大而灵活的事务管理功能。在这个系列的文章中,我们将深入源码探讨 Spring 框架事务部分的内部工作原理和关键组件。 一般来说,当我们使用 Spring 事务的功能时,实际上分为两种用法:

  • 编程式事务:即通过 TransactionTemplate 显式的在代码中声明事务;
  • 声明式事务:即使用 @Transcational 注解将方法操作声明在事务中,是最常用的手段;

不过,虽然两者的生效方式方式不同——前者依靠具体的组件,后者依靠 AOP 代理增强——但是实际上都是基于事务管理 TranscationManager 实现。

在前文,我们对 Spring 事务机制中的一些基本概念和组件有了基本的了解,而在本篇文章,笔者将从编程式事务的入口 TranscationTemplate 开始,自下而上的一步一步的研究 Spring 事务的运行原理。

1.TranscationTemplate

image.png

TranscationTemplate 是编程式事务的核心入口,通过上图,我们可以对 TranscationTemplate 有一个基本的认识。

1.1.事务定义

TranscationTemplate 实现了 TransactionDefinition 接口,所以它本身就是一个事务定义对象。我们可以通过 set 方法设置修改一些事务的配置,比如通过它开启是事务的是否只读、设置隔离级别是什么或者传播方式等等:

// 是否只读
transactionTemplate.setReadOnly(false);
// 超时时间
transactionTemplate.setTimeout(30);
// 隔离级别
transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT);
// 传播行为
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

当开启一个事务的时候,将根据它本身的属性来配置事务。

1.2.事务操作

TransactionDefinition 接口外,它还实现了 TransactionOperations 接口。

TransactionOperations 的核心方法其实就是 <T> T execute(TransactionCallback<T> action) ,它允许用户传入一个接受 TransactionStatus ,并且有返回特定值的回调方法:

public interface TransactionOperations {

	// 执行事务操作,有返回值
	@Nullable
	<T> T execute(TransactionCallback<T> action) throws TransactionException;
	
	// 执行事务操作,没有返回值
	default void executeWithoutResult(Consumer<TransactionStatus> action) throws TransactionException {
		execute(status -> {
			action.accept(status);
			return null;
		});
	}
}

@FunctionalInterface
public interface TransactionCallback<T> {
	@Nullable
	T doInTransaction(TransactionStatus status);
}

1.2.执行流程

看完了接口,现在我们关注一下 TransactionTemplate 本身。

我们会发现,TransactionTemplate 的成员变量只有一个平台事务管理器 PlatformTransactionManager ,这几乎是明示我们真正的核心逻辑都在这里面实现,而实际也确实如此:

  1. 它先将自己作为一个 TransactionDefinition ,传入内部的事务管理器并获得 TransactionStatus
  2. 然后执行用户传入的回调方法;
  3. 当回调方法发生异常时,调用管理器的 rollback 方法提交 status ,并由管理器完成真正的回滚操作;
  4. 当回调方法顺利执行后,调用管理器的 commit 提交该 status ,最后由该管理器根据事务状态完成真正的提交或者回滚操作;
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
		Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

		if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
			return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
		}
		else {
			// 获取事务对象
			TransactionStatus status = this.transactionManager.getTransaction(this);
			T result;
			try {
				// 执行用户的回调方法
				result = action.doInTransaction(status);
			}
			catch (RuntimeException | Error ex) {
				// Transactional code threw application exception -> rollback
				rollbackOnException(status, ex);
				throw ex;
			}
			catch (Throwable ex) {
				// Transactional code threw unexpected exception -> rollback
				rollbackOnException(status, ex);
				throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
			}
			// 调用 commit 方法
			this.transactionManager.commit(status);
			return result;
		}
	}

// 输出一下日志,调用 rollback 方法
private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
	Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

	logger.debug("Initiating transaction rollback on application exception", ex);
	try {
		this.transactionManager.rollback(status);
	}
	catch (TransactionSystemException ex2) {
		logger.error("Application exception overridden by rollback exception", ex);
		ex2.initApplicationException(ex);
		throw ex2;
	}
	catch (RuntimeException | Error ex2) {
		logger.error("Application exception overridden by rollback exception", ex);
		throw ex2;
	}
}

2.事务的开始

到这里为止,我们已经基本搞清楚了 TransactionTemplate 在整个流程中做了什么,要继续深入研究,就得看看 TransactionManager 了。

DataSourceTransactionManager 为例,当调用事务管理器的 getTransaction 方法时,根据事务的传播行为,将会决定当前的操作到底在怎样的情况中执行:

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

		// 如果没有事务定义就创建一个默认的事务定义
		// Use defaults if no transaction definition given.
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

		//获取事务
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();

		// 1、是否已经有事务
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

		// 确认事务的超时时间大于 -1
		// Check definition settings for new transaction.
		if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
		}

		// 2、如果没有事务

		// 如果传播行为是 MANDATORY,即当前没有事务就抛异常
		// No existing transaction found -> check propagation behavior to find out how to proceed.
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		// 如果有必要,就创建一个新事务
		// 1.REQUIRED:没有就创建
		// 2.REQUIRES_NEW:总是创建一个新事务
		// 3.NESTED:总是创建一个新的嵌套事务
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			// 创建挂起资源对象以保存 REQUIRES_NEW 和 NESTED 模式要挂起的已有资源
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
			}
			try {
				// 开启一个新事务
				return startTransaction(def, transaction, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		else {

			// 如果用户指定了不创建事务,则此时隔离级别无效
			// 1.SUPPORTS:有事务就加入,没有就算了,到这一步必然是没有事务的;
			// 2.NOT_SUPPORTED:总是忽略当前事务,以非事务的方式进行;
			// 3.NEVER:有事务抛异常
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + def);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
		}
	}

到这里,我们可以看到,根据当前是否有事务,大概分为了两个分支:

  • 如果有事务:就按有事务的方式根据对应的传播行为进行处理;
  • 如果没有事务:
    1. 隔离级别是 MANDATORY 就直接抛异常;
    2. 隔离级别是 REQUIREDREQUIRES_NEW 或者 NESTED 就开启一个新事务;
    3. 隔离级别是 SUPPORTSNOT_SUPPORTEDNEVER 就以非事务方式进行;

其中我们需要关注几个步骤中的核心方法:

  • doGetTransaction:创建一个事务对象;
  • isExistingTransaction & handleExistingTransaction :是否有事务,如果有事务怎么办;
  • suspend :创建用于处理资源挂起的资源占位符,用于在 REQUIRES_NEWNESTED 模式下挂起已开启的事务;
  • startTransaction :开启一个新事务;
  • prepareTransactionStatus :在不开事务的情况执行,用于开启新事务并刷新当前事务状态;

接下里,我们将围绕上述逻辑中涉及到的几个关键操作,比如:挂起旧事务,创建新事务以及开启事务等,而不会再线性的围绕 getTransaction 方法展开。

2.1.获取事务对象

doGetTransaction方法被用于创建一个事务对象,在 AbstractPlatformTransactionManager 它是一个抽象方法,子类 DataSourceTransactionManager 在实现的方法中返回一个内部类对象 DataSourceTransactionObject

protected Object doGetTransaction() {
	DataSourceTransactionObject txObject = new DataSourceTransactionObject();
	// 如果支持嵌套事务,那么说明支持保存点机制
	txObject.setSavepointAllowed(isNestedTransactionAllowed());
	// 从 TransactionSynchronizationManager 根据当前的数据源,获取已经存着的数据库链接
	ConnectionHolder conHolder =
			(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
	txObject.setConnectionHolder(conHolder, false);
	return txObject;
}

// 该方法实际上即获取当前数据源
protected DataSource obtainDataSource() {
	DataSource dataSource = getDataSource(); // 此处直接获取 DataSourceTransactionManager 的成员变量 DataSource
	Assert.state(dataSource != null, "No DataSource set");
	return dataSource;
}

在这一步,它创建了一个 DataSourceTransactionObject

private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
	
	// 是否是新创建的连接,即不是从上下文获取的旧连接
	private boolean newConnectionHolder;
	// 是否自动提交
	private boolean mustRestoreAutoCommit;

	public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
		super.setConnectionHolder(connectionHolder);
		this.newConnectionHolder = newConnectionHolder;
	}

	public boolean isNewConnectionHolder() {
		return this.newConnectionHolder;
	}

	public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
		this.mustRestoreAutoCommit = mustRestoreAutoCommit;
	}

	public boolean isMustRestoreAutoCommit() {
		return this.mustRestoreAutoCommit;
	}

	public void setRollbackOnly() {
		getConnectionHolder().setRollbackOnly();
	}

	@Override
	public boolean isRollbackOnly() {
		return getConnectionHolder().isRollbackOnly();
	}

	// 当刷新时,触发 TransactionSynchronization 回调接口的 flush 方法
	@Override
	public void flush() {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			TransactionSynchronizationUtils.triggerFlush();
		}
	}
}

我们不必过多的关注其实现,知道它包装了一层 ConnectionHolder ——而 ConnectionHolder 又包装了从数据库获得的 Connection ——然后附加了一些诸如管理保存点之类的功能即可。

这一步的重点在于,如果之前上下文中已有旧的链接可用,就直接拿它,如果没有就空着。

2.2.当没有事务时

当创建了事务对象 DataSourceTransactionObject 以后,将会调用 isExistingTransaction 判断是否已有事务:

@Override
protected boolean isExistingTransaction(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

这个方法同样实现自 AbstractPlatformTransactionManager ,逻辑很简单:DataSourceTransactionObject 中是否已经有已经开启(处于)事务的 ConnectionHolder ? 如果当前已有事务,则会调用 handleExistingTransaction 方法进一步处理:

private TransactionStatus handleExistingTransaction(
		TransactionDefinition definition, Object transaction, boolean debugEnabled)
		throws TransactionException {

	// 1、如果有事务,那传播级别必然不可能是 NEVER
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
		throw new IllegalTransactionStateException(
				"Existing transaction found for transaction marked with propagation 'never'");
	}
	
	// 2、如果传播级别是 NOT_SUPPORTED,那么就先挂起当前事务
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction");
		}
		// 挂起事务
		Object suspendedResources = suspend(transaction);
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(
				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
	}

	// 3、如果传播级别是 REQUIRES_NEW,那挂起已有 事务,开一个新的
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction, creating new transaction with name [" +
					definition.getName() + "]");
		}
		SuspendedResourcesHolder suspendedResources = suspend(transaction);
		try {
			return startTransaction(definition, transaction, debugEnabled, suspendedResources);
		}
		catch (RuntimeException | Error beginEx) {
			resumeAfterBeginException(transaction, suspendedResources, beginEx);
			throw beginEx;
		}
	}

	// 4、如果传播级别是 NESTED,那么就开启一个嵌套的事务
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		if (!isNestedTransactionAllowed()) {
			throw new NestedTransactionNotSupportedException(
					"Transaction manager does not allow nested transactions by default - " +
					"specify 'nestedTransactionAllowed' property with value 'true'");
		}
		if (debugEnabled) {
			logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
		}

		// 如果支持保存点,那么嵌套事务就用保存点来完成
		if (useSavepointForNestedTransaction()) {
			// Create savepoint within existing Spring-managed transaction,
			// through the SavepointManager API implemented by TransactionStatus.
			// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
			DefaultTransactionStatus status =
					prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
			status.createAndHoldSavepoint();
			return status;
		}
		// 如果不支持,那么就直接开一个新事务
		else {
			// Nested transaction through nested begin and commit/rollback calls.
			// Usually only for JTA: Spring synchronization might get activated here
			// in case of a pre-existing JTA transaction.
			return startTransaction(definition, transaction, debugEnabled, null);
		}
	}

	// 5、如果是:
	// SUPPORTS: 有就加入,没有就算了;
	// REQUIRED:有就加入,没有就开启
	// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
	if (debugEnabled) {
		logger.debug("Participating in existing transaction");
	}
	// 当前事务是否有效
	if (isValidateExistingTransaction()) {
		// 当前事务定义的隔离界别必须与已有事务的隔离级别一致
		if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
			Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
				Constants isoConstants = DefaultTransactionDefinition.constants;
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] specifies isolation level which is incompatible with existing transaction: " +
						(currentIsolationLevel != null ?
								isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
								"(unknown)"));
			}
		}
		// 当前事务定义是只读,那么已有的事务也必须是只读
		if (!definition.isReadOnly()) {
			if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] is not marked as read-only but existing transaction is");
			}
		}
	}
	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

2.3.挂起已有事务

AbstractPlatformTransactionManagersuspend 方法中,将会临时性的”清空“当时事务,并将其转移到 SuspendedResourcesHolder 中:

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
		// 如果已经有注册到上下文的 TransactionSynchronization
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			// 调用移除上下文中的 TransactionSynchronization,并调用它们的 suspend 方法
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {
					// 清空 DataSourceTransactionObject 中的 ConnectionHolder,并将其从上下文中当前事务的“资源”中移除
					suspendedResources = doSuspend(transaction);
				}
				// 移除事务名称
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				// 移除只读标识
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				// 移除隔离级别
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				// 重置事务激活标志
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				// 把上述所有重置的信息都转移到 SuspendedResourcesHolder
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException | Error ex) {
				// 调用 TransactionSynchronization 的 resume 方法
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		}
		// 如果没有注册到上下文的 TransactionSynchronization,但是当前事务又不为空,那么只需要清空事务资源就可以了
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		// 啥都没有
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

// 移除 TransactionSynchronizationManager 中的事务资源
protected Object doSuspend(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	txObject.setConnectionHolder(null);
	return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

// 调用 TransactionSynchronization 的 suspend 方法
private List<TransactionSynchronization> doSuspendSynchronization() {
	List<TransactionSynchronization> suspendedSynchronizations =
			TransactionSynchronizationManager.getSynchronizations();
	for (TransactionSynchronization synchronization : suspendedSynchronizations) {
		synchronization.suspend();
	}
	TransactionSynchronizationManager.clearSynchronization();
	return suspendedSynchronizations;
}

// 调用 TransactionSynchronization 的 resume 方法
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
	TransactionSynchronizationManager.initSynchronization();
	for (TransactionSynchronization synchronization : suspendedSynchronizations) {
		synchronization.resume();
		TransactionSynchronizationManager.registerSynchronization(synchronization);
	}
}

suspend 方法主要干了这样一件事:

  • 如果当前上下文中已经注册有TransactionSynchronization
    1. 把这些 TransactionSynchronization 回调从上下文移除,并调用它们的 suspend 方法;
    2. 清空当前事务对象 DataSourceTransactionObject 持有的 ConnectionHolder
    3. 清空当前上下文中的当前线程相关的全部事务资源,即所有 DataSource 涉及的 ConnectionHolder
    4. 清空上下文中其他的当前事务的信息;
    5. 如果上述过程中抛出了异常,那么就调用被移除的 TransactionSynchronization  的 resume 方法,然后将它们重新注册回上下文;
  • 如果当前上下文中没有注册 TransactionSynchronization ,但是已经开启了事务,那只需要清空当前事务对象 DataSourceTransactionObject 持有的 ConnectionHolder 即可;
  • 如果当前上下文中没有注册 TransactionSynchronization ,也没有开启事务,那什么都不需要做;

在这个方法中,主要做了“挂起当前事务”的逻辑,显而易见的:

  • 事务其实就是上下文的事务信息,因此“挂起”其实就是将他们从上下文移除;
  • “挂起”需要能还原,因此从上下文移除的信息不会真的就扔了,而是转移到了挂起资源持有者 SuspendedResourcesHolder 中;

也就是说,如果事务被挂起了,那么在当前事务结束前

2.4.创建新事务

prepareTransactionStatus 这个方法用于开启一个新事务:

  1. 使用 newTransactionStatus 创建一个新的 DefaultTransactionStatus
  2. 使用 prepareSynchronization 根据当前新事务的定义,重新初始化上下文中的事务信息;
protected final DefaultTransactionStatus prepareTransactionStatus(
		TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
		boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
	// 创建一个新的 TransactionStatus
	DefaultTransactionStatus status = newTransactionStatus(
			definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
	// 根据给定的事务定义重新初始化上下文中的事务状态
	prepareSynchronization(status, definition);
	return status;
}

// 创建一个新的 TransactionStatus
protected DefaultTransactionStatus newTransactionStatus(
		TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
		boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

	// 是否要开启新事务,要求上下文中确实没有激活的事务的时候才可以开启
	// 如果前一步操作是挂起,或者已经确认了没有事务,那么这里就是 true
	boolean actualNewSynchronization = newSynchronization &&
			!TransactionSynchronizationManager.isSynchronizationActive();
	// 创建一个新的 TransactionStatus
	return new DefaultTransactionStatus(
			transaction, newTransaction, actualNewSynchronization,
			definition.isReadOnly(), debug, suspendedResources);
}

// 根据给定的事务定义,重新初始化上下文中的事务信息
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
	if (status.isNewSynchronization()) {
		// 设置事务激活标志位
		TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
		// 设置事务的隔离级别
		TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
				definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
						definition.getIsolationLevel() : null);
		// 设置事务的只读标志位
		TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
		// 设置事务的名称
		TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
		// 初始化 TransactionSynchronization 集合
		TransactionSynchronizationManager.initSynchronization();
	}
}

2.5.开启新事物

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
		boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	// 创建一个新事务
	DefaultTransactionStatus status = newTransactionStatus(
			definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
	doBegin(transaction, definition);
	// 使用新事务的定义重新初始化上下文中的当前事务信息
	prepareSynchronization(status, definition);
	return status;
}

这里我们重点关注 doBegin

protected void doBegin(Object transaction, TransactionDefinition definition) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	Connection con = null;

	try {
		// 如果当前事务没有绑定连
		if (!txObject.hasConnectionHolder() ||
				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
			Connection newCon = obtainDataSource().getConnection();
			if (logger.isDebugEnabled()) {
				logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
			}
			// 为当前事务对象绑定一个新的数据库链接
			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
		}
		
		// 标记当前链接已经与当前的事务同步
		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
		con = txObject.getConnectionHolder().getConnection();

		// 初始化事务的隔离界别和只读标志位
		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
		txObject.setPreviousIsolationLevel(previousIsolationLevel);
		txObject.setReadOnly(definition.isReadOnly());

		// 如果有必要,关闭自动提交
		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
		// so we don't want to do it unnecessarily (for example if we've explicitly
		// configured the connection pool to set it already).
		if (con.getAutoCommit()) {
			txObject.setMustRestoreAutoCommit(true);
			if (logger.isDebugEnabled()) {
				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
			}
			con.setAutoCommit(false);
		}
		
		// 如果是只读的,那么执行“SET TRANSACTION READ ONLY” SQL 语句将事务标记为只读
		prepareTransactionalConnection(con, definition);
		// 标记当前事务已经激活
		txObject.getConnectionHolder().setTransactionActive(true);
		
		// 设置事务的超时时间
		int timeout = determineTimeout(definition);
		if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
			txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
		}

		// 将该连接(资源)绑定到上下文
		// Bind the connection holder to the thread.
		if (txObject.isNewConnectionHolder()) {
			TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
		}
	}

	catch (Throwable ex) {
		// 发生异常的时候释放连接
		if (txObject.isNewConnectionHolder()) {
			DataSourceUtils.releaseConnection(con, obtainDataSource());
			txObject.setConnectionHolder(null, false);
		}
		throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
	}
}

doBegin 方法中,管理器真正的开启了一个新事物——即从 DataSource 中真正的拿了一个 Connection 出来,并绑定到当前事务对象持有的 ConnectionHolder 与上下文中,然后做了一些关于是否只读和超时时间的初始化处理。

4.事务的完成

通过 TransactionManager ,我们成功的获取了一个新的事务,当我们执行完操作后,需要调用 commit 方法将事务提交:

public final void commit(TransactionStatus status) throws TransactionException {
	// 如果此时事务已经标记为完成则直接报错
	if (status.isCompleted()) {
		throw new IllegalTransactionStateException(
				"Transaction is already completed - do not call commit or rollback more than once per transaction");
	}

	// 如果本地事务标记为回滚,则回滚事务
	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	if (defStatus.isLocalRollbackOnly()) {
		if (defStatus.isDebug()) {
			logger.debug("Transactional code has requested rollback");
		}
		processRollback(defStatus, false);
		return;
	}

	// 如果全局事务已经回滚,但是本地(局部)事务还没回滚,那么回滚该局部事务
	if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
		if (defStatus.isDebug()) {
			logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
		}
		processRollback(defStatus, true);
		return;
	}

	// 提交事务
	processCommit(defStatus);
}

这个方法主要走了三条逻辑:

  • 如果本地事务标记为回滚,那就回滚;
  • 如果全局事务已经标记为回滚了,但是需要随着全局事务一起回滚的本地事务未能回滚,则回滚事务;
  • 如果本地事务不需要回滚,那么就提交;

4.1.提交事务

我们先关注事务的提交,也就是 processCommit 中的逻辑:

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
	try {
		boolean beforeCompletionInvoked = false;

		try {
			boolean unexpectedRollback = false;
			// 触发同步器的 beforeCommit 和 beforeCompletion 回调方法
			prepareForCommit(status); // 空实现
			triggerBeforeCommit(status);
			triggerBeforeCompletion(status);
			beforeCompletionInvoked = true;

			// 是嵌套事务(保存点),且全局事务已经标记为回滚,那么就通过 ConnectionHolder 中的 Connection 释放保存点
			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Releasing transaction savepoint");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				status.releaseHeldSavepoint();
			}
			// 不是全新的事务,如果全局事务已经标记为回滚,那就提交本地事务
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction commit");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				// 调用 Connection.commit 提交事务
				doCommit(status);
			}
			else if (isFailEarlyOnGlobalRollbackOnly()) {
				unexpectedRollback = status.isGlobalRollbackOnly();
			}

			// Throw UnexpectedRollbackException if we have a global rollback-only
			// marker but still didn't get a corresponding exception from commit.
			if (unexpectedRollback) {
				throw new UnexpectedRollbackException(
						"Transaction silently rolled back because it has been marked as rollback-only");
			}
		}

		// 1、因为内外层事务回滚行为不一致抛出的移除
		catch (UnexpectedRollbackException ex) {
			// can only be caused by doCommit
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
			throw ex;
		}
		// 2、调用 doCommit 提交失败
		catch (TransactionException ex) {
			// can only be caused by doCommit
			// 如果设置了当提交失败时回滚,那就回滚,否则调用同步器的 afterCompletion 回调
			if (isRollbackOnCommitFailure()) {
				doRollbackOnCommitException(status, ex);
			}
			else {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			}
			throw ex;
		}
		// 3、因为其他的原因失败
		catch (RuntimeException | Error ex) {
			if (!beforeCompletionInvoked) {
				triggerBeforeCompletion(status);
			}
			doRollbackOnCommitException(status, ex);
			throw ex;
		}

		// 触发同步器的 afterCommit 和 afterCompletion 回调方法
		// Trigger afterCommit callbacks, with an exception thrown there
		// propagated to callers but the transaction still considered as committed.
		try {
			triggerAfterCommit(status);
		}
		finally {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
		}

	}
	finally {
		// 把挂起的事务重新放回上下文
		cleanupAfterCompletion(status);
	}
}

private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
	// 如果是全新的事务
	if (status.isNewSynchronization()) {
		// 调用全部同步器的 afterCompletion 的回调方法,并将其从上下文情况
		List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
		TransactionSynchronizationManager.clearSynchronization();
		if (!status.hasTransaction() || status.isNewTransaction()) {
			if (status.isDebug()) {
				logger.trace("Triggering afterCompletion synchronization");
			}
			// No transaction or new transaction for the current scope ->
			// invoke the afterCompletion callbacks immediately
			invokeAfterCompletion(synchronizations, completionStatus);
		}
		else if (!synchronizations.isEmpty()) { 
			// JTA 管理器重写了相关的逻辑,默认情况下依然只是调用 TransactionSynchronization 的 afterCompletion 的回调方法
			// Existing transaction that we participate in, controlled outside
			// of the scope of this Spring transaction manager -> try to register
			// an afterCompletion callback with the existing (JTA) transaction.
			registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
		}
	}
}

protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
	TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
}

private void triggerAfterCommit(DefaultTransactionStatus status) {
	// 如果是全新的事务,就调用全部的同步器的afterCommit方法
	if (status.isNewSynchronization()) {
		if (status.isDebug()) {
			logger.trace("Triggering afterCommit synchronization");
		}
		TransactionSynchronizationUtils.triggerAfterCommit();
	}
}

暂且忽略提交异常时回滚相关的逻辑,这个方法大体逻辑如下:

  1. 依次触发 TransactionSynchronizationbeforeCommitbeforeCompletion 回调方法;
  2. 如果事务是一个嵌套事务(存在保存点),并且全局事务已经标记为回滚,则通过 ConnectionHolder 中的连接释放保存点;
  3. 如果事务是一个全新的事务,则调用 doCommit 通过ConnectionHolder 中的连接进行事务提交;
  4. 完成上述步骤后,如果发现全局事务已经被设置为回滚,那么显然上述提交(释放保存点、提交本地事务)是与全局的操作不一致的,那么直接抛出 UnexpectedRollbackException 异常;
  5. 如果上述过程中抛出了异常,那么就需要根据情况回滚当前事务,或者设置全局事务的回滚标志位;
  6. 完成上述操作后,依次触发 TransactionSynchronizationafterCommit (仅在正常提交时)和 afterCompletion 回调方法;

4.2.事务的回滚

当本地事务的回滚标志位 isLocalRollbackOnlytrue  时,管理器将调用 processRollback 方法回滚事务:

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
	try {
		boolean unexpectedRollback = unexpected;

		try {
			// 触发同步器的 beforeCompletion ************************回调
			triggerBeforeCompletion(status);

			// 如果是基于保存点的本地事务,就直接回滚到保存点
			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Rolling back transaction to savepoint");
				}
				status.rollbackToHeldSavepoint();
			}
			// 如果是独立的本地事务,直接将其回滚
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction rollback");
				}
				doRollback(status);
			}
			else {
				// 如果当前事务的更大的事务中的一部分
				// Participating in larger transaction
				if (status.hasTransaction()) {
					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
						}
						// 设置回滚标志位
						doSetRollbackOnly(status);
					}
					else {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
						}
					}
				}
				else {
					logger.debug("Should roll back transaction but cannot - no transaction available");
				}
				// Unexpected rollback only matters here if we're asked to fail early
				if (!isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = false;
				}
			}
		}
		catch (RuntimeException | Error ex) {
			// 若发生一次,则调用同步器的 afterCompletion 回调
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			throw ex;
		}

		triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

		// Raise UnexpectedRollbackException if we had a global rollback-only marker
		if (unexpectedRollback) {
			throw new UnexpectedRollbackException(
					"Transaction rolled back because it has been marked as rollback-only");
		}
	}
	finally {
		cleanupAfterCompletion(status);
	}
}

processRollbackprocessCommit 的逻辑也是基本一致,都是区分三者:

  • 如果是基于保存点的本地事务,那么就回滚至上一保存点;
  • 如果是不是基于保存点的本地事务,那么直接回滚事务;
  • 如果全局事务还未回滚,那么就为全局事务设置一个回滚标志位;

不过与提交事务不同,提交的时候提交失败可以回滚,如果回滚失败那只能抛异常了。

4.3.事务完成后操作

不管提交或者回滚,当事务操作完成后,都会调用 cleanupAfterCompletion 方法:

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
	status.setCompleted();
	// 如果当前事务是独立事务,那么完成后清空上下文的事务信息
	if (status.isNewSynchronization()) {
		TransactionSynchronizationManager.clear();
	}
	if (status.isNewTransaction()) {
		doCleanupAfterCompletion(status.getTransaction());
	}
	// 如果有被挂起的事务,那么就将其还原
	if (status.getSuspendedResources() != null) {
		if (status.isDebug()) {
			logger.debug("Resuming suspended transaction after completion of inner transaction");
		}
		Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
		resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
	}
}

此处的 doCleanupAfterCompletion 也是一个抽象方法,在 DataSourceTransactionManager 中,它的实现如下:

protected void doCleanupAfterCompletion(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

	// 如果当前是一个独立事务,那么就将 Connection 从上下文中解绑
	// Remove the connection holder from the thread, if exposed.
	if (txObject.isNewConnectionHolder()) {
		TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}

	// 重新设置该 Connection 的各项配置
	// Reset connection.
	Connection con = txObject.getConnectionHolder().getConnection();
	try {
		if (txObject.isMustRestoreAutoCommit()) {
			con.setAutoCommit(true);
		}
		DataSourceUtils.resetConnectionAfterTransaction(
				con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
	}
	catch (Throwable ex) {
		logger.debug("Could not reset JDBC Connection after transaction", ex);
	}

	if (txObject.isNewConnectionHolder()) {
		if (logger.isDebugEnabled()) {
			logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
		}
		DataSourceUtils.releaseConnection(con, this.dataSource);
	}

	txObject.getConnectionHolder().clear();
}

总结

有了事务上下文 TransactionSynchronizationManager 后,我们可以在不同的方法堆栈间传递事务信息,并且根据需要,我们可以临时的将上下文中的事务挂起或移除,从而改变事务的传播行为。

总的来说,这些传播行为基本可以分为七种:

传播行为 说明 PROPAGATION_REQUIRED 如果当前有事务就加入,没有就新开一个事务。 PROPAGATION_SUPPORTS 如果当前有事务就加入,没有事务就以非事务方式执行。 PROPAGATION_MANDATORY 如果当前有事务就加入,如果当前没有事务就抛出异常。 PROPAGATION_REQUIRES_NEW 总是新开一个事务,如果当前有事务就挂起。 PROPAGATION_NOT_SUPPORTED 总是以非事务方式执行,如果当前有事务就挂起。 PROPAGATION_NEVER 总是以非事务方式执行,如果当前有事务就抛出异常。 PROPAGATION_NESTED 如果当前存在事务,则开启一个嵌套事务执行,如果当前没有事务,则开启一个事务

它们本质上都是在操作事务管理器时,针对上下文中事务状态的不同处理方案。