[TOC]
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DBRouter {
    /**
     * 分库分表字段
     */
    String key() default "UId";
    /**
     * 是否分表
     */
    boolean splitTable() default false;
}
// 使用切面进行处理
@Pointcut("@annotation(com.seamew.middleware.db.router.annotation.DBRouter)")
public void aopPoint() {
}
@Around("aopPoint() && @annotation(dbRouter)")
public Object doRouter(ProceedingJoinPoint jp, DBRouter dbRouter) throws Throwable {
    String dbKey = dbRouter.key();
    if (StringUtils.isBlank(dbKey)) {
        throw new RuntimeException("annotation DBRouter key is null!");
    }
    // 路由属性
    String dbKeyAttr = getAttrValue(dbKey, jp.getArgs());
    // 路由策略
    dbRouterStrategy.doRouter(dbKeyAttr);
    // 返回结果
    try {
        return jp.proceed();
    } finally {
        dbRouterStrategy.clear();
    }
}
在基于事务@Transactional情况下,当我们在执行事务方法时,会通过AOP机制先执行DataSourceTransactionManager的doBegin()方法,该方法进一步调用AbstractRoutingDataSource的getConnection()方法,再调用determineCurrentLookupKey()决定当前线程使用哪个数据源。
DataSourceTransactionManager
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
    if (!txObject.hasConnectionHolder() ||
        txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 这里会获取connection
        Connection newCon = obtainDataSource().getConnection();
        // 忽略大部分代码 .....
    }
AbstractRoutingDataSource
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
// 最后调用这个determineTargetDataSource方法获取数据源
// 需要注意此时并没有对自定义注解进行处理
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
    dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
    throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
如果使用事务mybatis会执行一下代码
获取sqlsession
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 获取sqlsession
    SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
                                          SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
    try {
        Object result = method.invoke(sqlSession, args);
        if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
            // force commit even on non-dirty sessions because some databases require
            // a commit/rollback before calling close()
            sqlSession.commit(true);
        }
        return result;
反射执行方法
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
    // 这一步就是获取mysql connection
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
    stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.query(stmt, resultHandler);
} finally {
    closeStatement(stmt);
}
}
获取sql connnection
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
// 直接获取缓存的connection
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
简单总结:
在开启事务后,便将默认连接放入缓存中,后续操作中直接复用了该连接,导致数据源切换失效。
在事务开启前,实现数据源切换。改写决定数据源的核心方法determineCurrentLookupKey,先执行特定方法,将数据源放入ThreadLocal中,后续事务执行该方法中,从ThreadLocal中获取到要连接到数据源。
这里用编程式事务举例:
@Override
protected Result grabActivity(PartakeReq partake, ActivityBillVO bill) {
    try {
        dbRouter.doRouter(partake.getUId());
        return transactionTemplate.execute(status -> {
            try {
                // 执行sql 事务
                insert();
                updata();
                return Result.buildResult(Constants.ResponseCode.INDEX_DUP);
            }
            return Result.buildSuccessResult();
        });
    } finally {
        // 注意清理ThreadLocal缓存,不然会导致内存泄漏
        dbRouter.clear();
    }
}
@Override
public void doRouter(String dbKeyAttr) {
    int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount();
    // 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
    int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16));
    // 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
    int dbIdx = idx / dbRouterConfig.getTbCount() + 1;
    int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1);
    // 设置到 ThreadLocal
    DBContextHolder.setDBKey(String.format("%02d", dbIdx));
    DBContextHolder.setTBKey(String.format("%03d", tbIdx));
    log.debug("数据库路由 dbIdx:{} tbIdx:{}",  dbIdx, tbIdx);
}