|
@@ -0,0 +1,197 @@
|
|
|
+> [TOC]
|
|
|
+
|
|
|
+# 1、使用注解形式配置多数据源
|
|
|
+
|
|
|
+```java
|
|
|
+@Documented
|
|
|
+@Retention(RetentionPolicy.RUNTIME)
|
|
|
+@Target({ElementType.TYPE, ElementType.METHOD})
|
|
|
+public @interface DBRouter {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 分库分表字段
|
|
|
+ */
|
|
|
+ String key() default "UId";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 是否分表
|
|
|
+ */
|
|
|
+ boolean splitTable() default false;
|
|
|
+
|
|
|
+}
|
|
|
+// 使用切面进行处理
|
|
|
+@Pointcut("@annotation(com.seamew.middleware.db.router.annotation.DBRouter)")
|
|
|
+public void aopPoint() {
|
|
|
+}
|
|
|
+
|
|
|
+@Around("aopPoint() && @annotation(dbRouter)")
|
|
|
+public Object doRouter(ProceedingJoinPoint jp, DBRouter dbRouter) throws Throwable {
|
|
|
+ String dbKey = dbRouter.key();
|
|
|
+ if (StringUtils.isBlank(dbKey)) {
|
|
|
+ throw new RuntimeException("annotation DBRouter key is null!");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 路由属性
|
|
|
+ String dbKeyAttr = getAttrValue(dbKey, jp.getArgs());
|
|
|
+ // 路由策略
|
|
|
+ dbRouterStrategy.doRouter(dbKeyAttr);
|
|
|
+ // 返回结果
|
|
|
+ try {
|
|
|
+ return jp.proceed();
|
|
|
+ } finally {
|
|
|
+ dbRouterStrategy.clear();
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+# 2、使用@Transactional
|
|
|
+
|
|
|
+在基于事务@Transactional情况下,当我们在执行事务方法时,会通过AOP机制先执行`DataSourceTransactionManager`的doBegin()方法,该方法进一步调用`AbstractRoutingDataSource`的getConnection()方法,再调用`determineCurrentLookupKey()`决定当前线程使用哪个数据源。
|
|
|
+
|
|
|
+* DataSourceTransactionManager
|
|
|
+
|
|
|
+```java
|
|
|
+@Override
|
|
|
+protected void doBegin(Object transaction, TransactionDefinition definition) {
|
|
|
+ DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
|
|
|
+ Connection con = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (!txObject.hasConnectionHolder() ||
|
|
|
+ txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
|
|
|
+ // 这里会获取connection
|
|
|
+ Connection newCon = obtainDataSource().getConnection();
|
|
|
+ // 忽略大部分代码 .....
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+* AbstractRoutingDataSource
|
|
|
+
|
|
|
+```java
|
|
|
+@Override
|
|
|
+public Connection getConnection() throws SQLException {
|
|
|
+ return determineTargetDataSource().getConnection();
|
|
|
+}
|
|
|
+// 最后调用这个determineTargetDataSource方法获取数据源
|
|
|
+// 需要注意此时并没有对自定义注解进行处理
|
|
|
+protected DataSource determineTargetDataSource() {
|
|
|
+ Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
|
|
|
+ Object lookupKey = determineCurrentLookupKey();
|
|
|
+ DataSource dataSource = this.resolvedDataSources.get(lookupKey);
|
|
|
+ if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
|
|
|
+ dataSource = this.resolvedDefaultDataSource;
|
|
|
+ }
|
|
|
+ if (dataSource == null) {
|
|
|
+ throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
|
|
|
+ }
|
|
|
+ return dataSource;
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+# 3、造成失效原因
|
|
|
+
|
|
|
+如果使用事务mybatis会执行一下代码
|
|
|
+
|
|
|
+1. 获取sqlsession
|
|
|
+
|
|
|
+```java
|
|
|
+private class SqlSessionInterceptor implements InvocationHandler {
|
|
|
+ @Override
|
|
|
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
|
|
+ // 获取sqlsession
|
|
|
+ SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
|
|
|
+ SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
|
|
|
+ try {
|
|
|
+ Object result = method.invoke(sqlSession, args);
|
|
|
+ if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
|
|
|
+ // force commit even on non-dirty sessions because some databases require
|
|
|
+ // a commit/rollback before calling close()
|
|
|
+ sqlSession.commit(true);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+```
|
|
|
+
|
|
|
+2. 反射执行方法
|
|
|
+
|
|
|
+```java
|
|
|
+@Override
|
|
|
+public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
|
|
|
+ Statement stmt = null;
|
|
|
+ try {
|
|
|
+ // 这一步就是获取mysql connection
|
|
|
+ Configuration configuration = ms.getConfiguration();
|
|
|
+ StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
|
|
|
+ stmt = prepareStatement(handler, ms.getStatementLog());
|
|
|
+ return handler.query(stmt, resultHandler);
|
|
|
+ } finally {
|
|
|
+ closeStatement(stmt);
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+3. 获取sql connnection
|
|
|
+
|
|
|
+```java
|
|
|
+ private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
|
|
|
+ Statement stmt;
|
|
|
+ // 直接获取缓存的connection
|
|
|
+ Connection connection = getConnection(statementLog);
|
|
|
+ stmt = handler.prepare(connection, transaction.getTimeout());
|
|
|
+ handler.parameterize(stmt);
|
|
|
+ return stmt;
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+**简单总结:**
|
|
|
+
|
|
|
+> 在开启事务后,便将默认连接放入缓存中,后续操作中直接复用了该连接,导致数据源切换失效。
|
|
|
+
|
|
|
+# 4、解决方法
|
|
|
+
|
|
|
+在事务开启前,实现数据源切换。改写决定数据源的核心方法`determineCurrentLookupKey`,先执行特定方法,将数据源放入`ThreadLocal`中,后续事务执行该方法中,从ThreadLocal中获取到要连接到数据源。
|
|
|
+
|
|
|
+这里用编程式事务举例:
|
|
|
+
|
|
|
+```java
|
|
|
+@Override
|
|
|
+protected Result grabActivity(PartakeReq partake, ActivityBillVO bill) {
|
|
|
+ try {
|
|
|
+ dbRouter.doRouter(partake.getUId());
|
|
|
+ return transactionTemplate.execute(status -> {
|
|
|
+ try {
|
|
|
+ // 执行sql 事务
|
|
|
+ insert();
|
|
|
+ updata();
|
|
|
+ return Result.buildResult(Constants.ResponseCode.INDEX_DUP);
|
|
|
+ }
|
|
|
+ return Result.buildSuccessResult();
|
|
|
+ });
|
|
|
+ } finally {
|
|
|
+ // 注意清理ThreadLocal缓存,不然会导致内存泄漏
|
|
|
+ dbRouter.clear();
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+```java
|
|
|
+@Override
|
|
|
+public void doRouter(String dbKeyAttr) {
|
|
|
+ int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount();
|
|
|
+
|
|
|
+ // 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
|
|
|
+ int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16));
|
|
|
+
|
|
|
+ // 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
|
|
|
+ int dbIdx = idx / dbRouterConfig.getTbCount() + 1;
|
|
|
+ int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1);
|
|
|
+
|
|
|
+ // 设置到 ThreadLocal
|
|
|
+ DBContextHolder.setDBKey(String.format("%02d", dbIdx));
|
|
|
+ DBContextHolder.setTBKey(String.format("%03d", tbIdx));
|
|
|
+ log.debug("数据库路由 dbIdx:{} tbIdx:{}", dbIdx, tbIdx);
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+# 5、参考文章
|
|
|
+
|
|
|
+[注意清理ThreadLocal缓存](../Java/JAVA高阶/JUC编程/Threadlocal.md)
|