写在文章开头
阅读大量的源码设计对于我们日常开发思维都会有着质的提升,因为笔者之前写过的一篇多数据源实践,于是便考虑到了多数据源的问题,所以笔者参考MyBatis-Plus源码设计
总结了一套针对数据源事务管理的一些实现思路,希望对你有所帮助。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
魔改思路
Spring
多数据源框架在进行数据库交互时会调用determineCurrentLookupKey
获取数据源的key
进行动态切换数据源,所以我们继承了AbstractRoutingDataSource
重写数据源切换的逻辑。
public class DynamicDataSource extends AbstractRoutingDataSource {
//......
//重写多数据源切换的逻辑
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceHolder.getDynamicDataSourceKey();
}
//......
}
重写的数据源切换的逻辑其本质就是在不同的服务上
上定义其对应的数据源ID
的注解标识与其匹配的数据源:
@Ds(DB3)
@Service
public class CarService {
//......
}
然后通过AOP
动态获取这个数据源的ID
将其存入ThreadLocal
中:
//环绕通知
@Around("dynamicDataSourcePointCut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
//获取数据源的key
String key = getDefineAnnotation(joinPoint).value();
//将数据源设置为该key的数据源(存入ThreadLocal中)
DynamicDataSourceHolder.setDynamicDataSourceKey(key);
try {
return joinPoint.proceed();
} finally {
//移除key
DynamicDataSourceHolder.removeDynamicDataSourceKey();
}
}
如此一来,上述重写的数据源切换的逻辑determineCurrentLookupKey
就可以通过个人封装的ThreadLocal
切换数据源:
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceHolder.getDynamicDataSourceKey();
}
基于determineCurrentLookupKey
这个key
的返回值,AbstractRoutingDataSource
根据这个lookupKey拿到对应的数据源dataSource
。
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
//我们重写的determineCurrentLookupKey会得到key值
Object lookupKey = determineCurrentLookupKey();
//基于这个key得到数据源
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
//......
return dataSource;
}
这样一来,我们的持久层Mapper
就会通过这个datasource
得到对应的connection
完成sql操作
:
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
总结一下思路就是不同的线程通过注解获取数据源信息,通过AOP
动态切换数据源的key
值,从而让Spring
基于这个key
值完成数据源切换:
通过这一点我们可知,只要能够拿到每次切换数据源的connection
,只有所有的connection
都执行成功,我们再进行一次commit
,反之rollback
就可以实现多数据源事务了。
方案落地
定义多数据源事务注解
我们有了大概的思路,首先自然是先定义一个注解,后续所有的方法可以通过这个注解标识为需要进行多数据源事务管理。方便后续我们后续通过切面进行标识和管理:
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DSTransactional {
}
基于装饰器模式封装连接
我们在上文的源码解读中提到通过多数据源的key
最终可以拿到一个数据源的connection
信息。所以我们不妨继承这个Connection
接口,通过装饰器模式封装从数据源中拿到的Connection
,再重写其提交和回滚等逻辑,将所有sql操作的提交和回滚操作的主动权掌握在我们自己手中。
代码如下所示,可以看到笔者通过继承Connection
接口拿到Connection
所有的行为,将数据源的key和数据源的连接作为成员变量,并将提交和回滚做了空实现,真正的提交和回滚交由notify
的布尔值决定是否提交当前事务。
@Slf4j
@Data
public class ConnectionProxy implements Connection {
private Connection connection;
private String ds;
public ConnectionProxy(Connection connection, String ds) {
this.connection = connection;
this.ds = ds;
}
//通过notify的布尔值决定是否提交当前事务
public void notify(Boolean commit) {
try {
if (commit) {
this.connection.commit();
} else {
this.connection.rollback();
}
this.connection.close();
} catch (Exception e) {
log.error("事务操作失败,失败原因:{}",e.getMessage(),e);
}
}
public void commit() throws SQLException {
}
public void rollback() throws SQLException {
}
public void close() throws SQLException {
}
//......
}
然后我们封装一个ThreadLocal
缓存这个连接代理对象ConnectionProxy
及其对应的数据源的key
值:
public class ConnectionHolder {
private static ThreadLocal<Map<String, ConnectionProxy>> CONNECT_HOLDER = ThreadLocal.withInitial(ConcurrentHashMap::new);
//将ConnectionProxy和其数据源key都存到当前线程的threadLocalMap中
@SneakyThrows
public static void putConnection(String ds, ConnectionProxy connectionProxy) {
if (!CONNECT_HOLDER.get().containsKey(ds)) {
connectionProxy.setAutoCommit(false);
}
CONNECT_HOLDER.get().put(ds, connectionProxy);
}
//获取当前线程对应数据源key的连接代理对象
public static ConnectionProxy getConnection(String ds) {
return CONNECT_HOLDER.get().get(ds);
}
//获取当前线程下所有的连接代理对象
public static Map<String, ConnectionProxy> connectionProxyMap() {
return CONNECT_HOLDER.get();
}
}
重写getConnection
我们继承AbstractRoutingDataSource
重写了获取数据源的逻辑,而本次因为需要管理Connection
所以我们也重写getConnection
方法,确保拿到Connection
之后将其封装为connectionProxy
并缓存到ThreadLocal
中:
/**
* 集成抽象类AbstractRoutingDataSource,实现自定义数据源获取逻辑determineCurrentLookupKey
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DynamicDataSource extends AbstractRoutingDataSource {
//备份所有数据源信息,
private Map<Object, Object> defineTargetDataSources;
/**
* 返回当前线程需要用到的数据源bean
*/
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceHolder.getDynamicDataSourceKey();
}
@Override
public Connection getConnection() throws SQLException {
//拿到数据源的key值,
String ds = DynamicDataSourceHolder.getDynamicDataSourceKey();
ConnectionProxy connectionProxy = ConnectionHolder.getConnection(ds);
if (connectionProxy != null) {
return connectionProxy;
}
//拿到connection对象将其封装为ConnectionProxy 存入当前线程的threadLocal中
Connection conn = super.getConnection();
ConnectionProxy proxy = new ConnectionProxy(conn, ds);
ConnectionHolder.putConnection(ds, proxy);
return proxy;
}
}
切面提交或者回滚事务
最终我们通过一个切面,感知当前方法是否包含DSTransactional
注解,如果包含则只要当前接口抛出异常则将state
设置为false
,调用notify
将操作回滚,反之提交所有操作。
@Aspect
@Component
public class DSTransactionalAspect {
//作用在方法上
@Pointcut("@annotation(com.sharkChili.annotation.DSTransactional)")
public void dsTransactionalPointCut() {
}
//环绕通知
@Around("dsTransactionalPointCut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
boolean state = true;
try {
Object proceed = joinPoint.proceed();
return proceed;
} catch (Throwable throwable) {
//只要报错就将state设置为false,在finally语句中将连接代理对象的操作回滚
state = false;
} finally {
if (!state) {
log.info("回滚所有事务");
ConnectionHolder.connectionProxyMap().entrySet().forEach(e->e.getValue().notify(false));
} else {
log.info("提交所有事务");
ConnectionHolder.connectionProxyMap().entrySet().forEach(e->e.getValue().notify(true));
}
}
return true;
}
}
测试
这里笔者给出对应测试代码,在所有sql
操作完成后设置一个错误:
@PostMapping("/orderCar")
@DSTransactional
public Boolean orderCar(@RequestBody OrderParams params) {
//在主库中查询汽车信息列表
TUser user = userService.getUserInfo(params.getUid());
if (user == null) {
throw new RuntimeException("用户不存在");
}
Car car = null;
//在新的数据源中查询用户信息
car = carService.getCarInfo(params.getCid());
if (car == null) {
throw new RuntimeException("汽车不存在");
}
//下单
Order order = new Order();
order.setUid(user.getId());
order.setCid(car.getId());
order.setTotal(car.getPrice());
orderService.saveOrderInfo(order);
//更新余额
user.setTotal(user.getTotal() - order.getTotal());
userService.update(user);
//设置错误
System.out.println(1 / 0);
return true;
}
最终在事务失败后触发事务回滚
2024-02-12 17:03:55.702 INFO 9716 --- [io-18080-exec-1] c.s.aspect.DSTransactionalAspect : 回滚所有事务
遇到的问题
笔者在功能开发过程中,遇到了多数据源死循环问题,如下所示,可以看到对应持久层依赖于DataSourceInitializerInvoker
,而DataSourceInitializerInvoker
最终又会走回dynamicDataSource
造成死循环:
Description:
The dependencies of some of the beans in the application context form a cycle:
testController (field private com.sharkChili.service.UserService com.sharkChili.controller.TestController.userService)
↓
userService
↓
TUserMapper defined in file [F:\github\learnExample\multiTransactional\target\classes\com\sharkChili\mapper\TUserMapper.class]
↓
sqlSessionFactory defined in class path resource [com/baomidou/mybatisplus/autoconfigure/MybatisPlusAutoConfiguration.class]
┌─────┐
| dynamicDataSource defined in class path resource [com/sharkChili/config/DynamicDataSourceConfig.class]
↑ ↓
| master defined in class path resource [com/sharkChili/config/DynamicDataSourceConfig.class]
↑ ↓
| org.springframework.boot.autoconfigure.jdbc.DataSourceInitializerInvoker
└─────┘
解决办法即破除DataSourceInitializerInvoker
到dynamicDataSource
这一圈循环,根据自动装配来源定位到DataSourceInitializerInvoker
由DataSourceAutoConfiguration
导入,所有我们只需排除这个装配类即可。
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
小结
这套方案实际是大名鼎鼎的MyBatis-Plus
多数据源所提供的思路,其核心代码如下,本质上和笔者所仿写的思路差不多,当相对健壮一些,感兴趣的读者可以自行查阅了解:
@Slf4j
public class DynamicLocalTransactionAdvisor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (!StringUtils.isEmpty(TransactionContext.getXID())) {
return methodInvocation.proceed();
}
boolean state = true;
Object o;
//生成事务id并缓存至当前线程的ThreadLocal
String xid = UUID.randomUUID().toString();
TransactionContext.bind(xid);
try {
o = methodInvocation.proceed();
} catch (Exception e) {
//当前切面操作失败则将state 设置为false
state = false;
throw e;
} finally {
//根据state 决定提交还是回滚所有事务
ConnectionFactory.notify(state);
//移除当前线程的缓存的值,避免内存泄漏
TransactionContext.remove();
}
return o;
}
}
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。 因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
参考
MyBatis-Plus:https://baomidou.com/ springboot多数据源死循环,springboot动态据源死循环:https://www.cnblogs.com/fanshuyao/p/12765558.html 动态数据源循环依赖问题 :https://blog.csdn.net/m0_37607945/article/details/111682777
本文使用 markdown.com.cn 排版