springboot|spring boot多数据源动态切换, 多数据源事务管理

1 . 项目目标

  1. 实现 不同数据源的切换 (使用AbstractRoutingDataSource)
  2. 不同数据源之间,事物管理,多个数据源之间同时commit或者同时rollback
  3. 兼容不同的连接池(dbpc,druid)
  4. 兼容mybatis,JPA 等不同的方式 (spring- boot -starter)
以上就是该项目实现的所有功能,因代码量和篇幅的问题,本文只写大概的实现思路,中间遇到的坑,以及我封装的 spring-starter 的用法
如果有其他实现上的问题可以加QQ随意聊.
话不多数先贴github
github
gitee


2. 项目简介 本框架用 spring-starter的模式封装, 可以直接使用以下maven坐标快速使用
io.github.cao2068959 multidatasource-spring-starter 1.1.0

其他 mysql驱动这个不用我多说了吧.
然后会根据是否引入了 mybatis-starter和JPA-starter 来自动创建数据源,自动配置类如下,同时也是项目的入口
springboot|spring boot多数据源动态切换, 多数据源事务管理
文章图片


在 application.yaml 中配置 对应的数据源
springboot|spring boot多数据源动态切换, 多数据源事务管理
文章图片

这配置了2个数据源 .第一个 使用了 druid连接池 , 第二个使用了 spring data 默认的形式
然后在项目中就可以直接使用 下图为 mybatis的方式
springboot|spring boot多数据源动态切换, 多数据源事务管理
文章图片

用 @DataSource 来指定使用什么连接池.
如果是 JPA的形式.则
@Repository @DataSource("userdb") public interface UserDao extends JpaRepository, JpaMulti {}

注意这里 需要继承 接口 JpaMulti 不然aop不能正确扫描到
然后是事物,我这里多个数据源,我想让他同时提交,获取异常后同时回滚,这里使用了一个注解
@TransactionMulti 来指定要事物管理的数据源

@TransactionMulti public void setUser(){ Random random = new Random(); //使用了storedb数据源的写操作 userMapper.setEmployee(random.nextInt(10000),"小绿","12345",11,new Date(),new Date()); //使用了userdb数据源的读操作 userMapper.setUser(random.nextInt(10000),"小红","12345",14,1,new Date(),new Date()); int i = 2 /0; }


这里的 setEmployee 和 setUser 分别连接的是不同的数据源. 抛出异常后,2个数据都没有插入数据库
以上就是本项目的基本功能,以下是基本实现思路

3 . 自动数据源切换 这里 使用了 类AbstractRoutingDataSource 这是一个模版类,主要实现方法determineCurrentLookupKey()
这个数据源里有个 map 存入了自定义的多个数据源,在调用 getConnect 方法的时候,会去根据 自定义 determineCurrentLookupKey() 方法去获取对应的key,然后拿到正真的数据源.
public class DataSourceRouting extends AbstractRoutingDataSource {ThreadLocal threadLocal = new ThreadLocal<>(); //把当前事物下的连接塞入,用于事物处理 ThreadLocal connectionThreadLocal = new ThreadLocal<>(); //这里只是留一个备份,切换数据源的时候,如果没有对应ke就直接异常,真正调用会传给AbstractRoutingDataSource处理 //这里只读,没有线程安全问题 Map dataSourceMap = new HashMap<>(); @Override protected Object determineCurrentLookupKey() { String currentName = threadLocal.get(); //没有时,拿第一个 if(currentName == null){ currentName = dataSourceMap.keySet().iterator().next(); }return currentName; } }

然后是用aop来扫描 @DataSourse 注解,从而动态切换数据源
@Aspect public class DataSourceChangeAop {@Autowired private DataSourceRouting dataSourceRouting; @Pointcut("@annotation(chy.frame.multidatasourcespringstarter.annotation.DataSource)") public void annotationPointcut(){}@Pointcut("this(chy.frame.multidatasourcespringstarter.annotation.JpaMulti)") public void interfacePoint(){}@Before("annotationPointcut()") public void beforMethod(JoinPoint point){ MethodSignature methodSignature = (MethodSignature) point.getSignature(); Method method = methodSignature.getMethod(); DataSource annotation = method.getAnnotation(DataSource.class); String value = https://www.it610.com/article/annotation.value(); //切换了数据源 dataSourceRouting.changeDataSource(value); }@Before("interfacePoint()") public void interfacePointBefore(JoinPoint point) throws Exception { //获取代理对象上所有的接口 Class[] interfaces = point.getTarget().getClass().getInterfaces(); //扫描上面的DataSource 注解 for (Class anInterface : interfaces) { DataSource annotation = anInterface.getAnnotation(DataSource.class); if(annotation == null){ continue; }String value = https://www.it610.com/article/annotation.value(); //切换了数据源 dataSourceRouting.changeDataSource(value); return; } //dataSourceRouting.changeDataSource(value); }@After("annotationPointcut() ||interfacePoint() ") public void After(JoinPoint point) throws Exception { dataSourceRouting.clearThreadLocal(); }}

4 . 事物管理 spring 的事物管理器DataSourceTransactionManager
springboot|spring boot多数据源动态切换, 多数据源事务管理
文章图片

但是绑定到当前线程中后,每次拿connect就 不会调用 determineCurrentLookupKey() 方法去获取 不同的数据源从而拿到不同的connect,而是直接去 拿这里绑定的 connect, 所以使用 原生的事物管理器,并不能完成我们需要的功能.
所以我决定直接在aop中拿 connect来开启事物
以下是 事物管理的 aop
@Aspect public class MultiTransactionManagerAop {@Autowired DataSourceRouting dataSourceRouting; @Pointcut("@annotation(chy.frame.multidatasourcespringstarter.annotation.TransactionMulti)") public void annotationPointcut() { }@Around("annotationPointcut()") public void roundExecute(ProceedingJoinPoint joinpoint) throws Throwable { MethodSignature methodSignature = (MethodSignature) joinpoint.getSignature(); Method method = methodSignature.getMethod(); TransactionMulti annotation = method.getAnnotation(TransactionMulti.class); int transactionType = annotation.transactionType(); //开启事务 dataSourceRouting.beginTransaction(transactionType); //正真执行了 方法 joinpoint.proceed(); //提交事务 dataSourceRouting.commitTransaction(); }@AfterThrowing(pointcut = "annotationPointcut()", throwing = "e") public void handleThrowing(JoinPoint joinPoint, Exception e) {//controller类抛出的异常在这边捕获 try { //回滚事物 dataSourceRouting.rollbackTransaction(); } catch (SQLException e1) { e1.printStackTrace(); } }}

同时这里有 1个问题要解决
1 . 因为现在事物是我自己管理,但是mybatis 每次拿完 connect就会自动 调用close 和 commit方法.这样导致我自己操作的事物失效.所以我要先让mybatis不能自作主张帮我关闭connect
解决方法: spring data获取connect的时候,给他包装类(或者代理类),覆盖了原来的close和commit方法
类class DataSourceRouting extends AbstractRoutingDataSource 中覆盖
/** * 如果 在connectionThreadLocal 中有 说明开启了事物,就从这里面拿 * * @return * @throws SQLException */ @Override public Connection getConnection() throws SQLException { Optional currentTransactionCarrier = getCurrentTransactionCarrier(); if (currentTransactionCarrier.isPresent()) { TransactionCarrier transactionCarrier = currentTransactionCarrier.get(); //开了事物 那么从 currentTransactionCarrier中去获取对应的 connect; String currentName = (String) determineCurrentLookupKey(); Optional transactionConnect = transactionCarrier.getConnect(currentName); //使用了已经开启了事务的connect; if (transactionConnect.isPresent()) { return transactionConnect.get(); } //开启事物后第一次获取connect, 那么先获取一个新的 connect Connection connection = new ConnectWarp(determineTargetDataSource().getConnection()); //把新获取到的 connection 放入 transactionCarrier中,后续再次获取就能直接拿到 transactionCarrier.addTransactionConnect(currentName, connection); return connection; } else { //没开事物 直接走 return determineTargetDataSource().getConnection(); } }

而这个包装里中
springboot|spring boot多数据源动态切换, 多数据源事务管理
文章图片

这样只有我手动 调用 commit(true) 和 close(true) 才会正真提交和关闭连接
提交事务和回滚事务的主逻辑如下:
/** * 提交事物 * * @throws SQLException */ public void commitTransaction() throws SQLException { getCurrentTransactionCarrier().orElseThrow(() -> new SqlTransactionException("当前线程中事物没有开启")) .commitTransaction(); //提交事物后清理释放资源 clearTransaction(); }public void commitTransaction() throws SQLException { for (Map.Entry connectionEntry : transactionConnects.entrySet()) { Connection connection = connectionEntry.getValue(); if (!(connection instanceof ConnectWarp)){ continue; } ConnectWarp connectWarp = (ConnectWarp) connection; connectWarp.commit(true); connectWarp.close(true); } }/** * 撤销事物 * * @throws SQLException */ public void rollbackTransaction() throws SQLException { getCurrentTransactionCarrier().orElseThrow(() -> new SqlTransactionException("当前线程中事物没有开启")) .rollbackTransaction(); //提交事物后清理释放资源 clearTransaction(); }public void rollbackTransaction() throws SQLException { for (Map.Entry connectionEntry : transactionConnects.entrySet()) { Connection connection = connectionEntry.getValue(); if (!(connection instanceof ConnectWarp)){ continue; } ConnectWarp connectWarp = (ConnectWarp) connection; connectWarp.rollback(); connectWarp.close(true); }}

【springboot|spring boot多数据源动态切换, 多数据源事务管理】

    推荐阅读