spring boot+mybatis plus 实现动态数据源

开篇说明

  • 如果在这里获得过启发和思考,希望点赞支持!对于内容有不同的看法欢迎来信交流。
  • 技术栈 >> java
  • 邮箱 >> 15673219519@163.com

描述

由于项目开发中设计到游戏运营平台的搭建,游戏中每一个不同的区服使用的是不同的数据库存储数据。 如:
区服1:包括 game_1_data(游戏数据数据库),game_1_log(游戏日志数据库);
区服2:包括 game_2_data(游戏数据数据库),game_2_log(游戏日志数据库);
… 并且之后会持续增多
除了以上的数据库还包含一些,游戏全局的库等,以及平台本身的数据库;那么在单体项目中如何切换每一个请求应该查询那个数据库,成为一个难点。

  • 最终实现的效果如下
/**
 * 查询指定区服的聊天记录 game_{0}_log 数据库名称格式,
 * ChatMonitorSearchDTO.xyServerId 请求参数中指定的区服ID
 * 若ChatMonitorSearchDTO.xyServerId=1, 则查询数据库game_1_log
 */
@SelectDB(dbName = "game_{0}_log", serverFiled = "ChatMonitorSearchDTO.xyServerId")
public Future<IPage<ChatMonitorListVO>> pageList(ChatMonitorSearchDTO DTO) throws Exception{
    IPage<ChatMonitorListVO> page = new Page<>(DTO.getCurrent(), DTO.getSize());
    page = chatMonitorMapper.pageList(page, DTO);
    return new AsyncResult(page);
}

我的思路

  • 项目其中时初始化本平台的数据源;初始化成功后查询其他游戏服数据库的数据源进行初始化。全部添加到指定的 Map<Object, Object> dataSources = new HashMap<>()中。
  • 利用AbstractRoutingDataSource+ThreadLocal+AOP配合使用,确保可以修改每条线程的数据源。
  • 为了确保主线程中开启事务的情况下,依然能够切换数据源查询游戏库,@SelectDB会新开线程执行。
  • 在aop中实现,新服数据库添加的逻辑。确保开启新的区服后会新增game_x_data,game_x_log两个库,也能够正常查询。

实现步骤

  • 第一步:初始化数据源
/**
 * 初始化数据源
 */
@Component
public class JavaCodeDataSourceProvider implements ApplicationListener<ContextRefreshedEvent> {
    @Value("${spring.datasource.url}")
    private String url;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;

    @Autowired
    private DataSourceInfoServiceImpl dataSourceInfoService;

    // 初始化本平台的数据源
    @PostConstruct
    public void init() {
        DynamicDataSourceService.addDataSource(DynamicDataSource.DEFAULT_DB_KEY, url, username, password);
    }

    // 查询本平台数据库中,配置的游戏数据库的连接信息,并加载到 Map<Object, Object> dataSources = new HashMap<>()中
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        dataSourceInfoService.buildDynamicDataSourceFromDB();
    }
}
@Service
public class DataSourceInfoServiceImpl {
    @Autowired
    private DataSourceInfoMapper dataSourceInfoMapper;
    /**
     * 从数据库中查询配置的数据库链接信息,构建动态的数据源
     */
    public void buildDynamicDataSourceFromDB(){
        List<DataSourceInfo> dataSourceInfos = dataSourceInfoMapper.selectList(Wrappers.lambdaQuery(DataSourceInfo.class));
        for (DataSourceInfo info : dataSourceInfos){
            DynamicDataSourceService.addDataSource(info.getDbName(), info.getUrl(), info.getUsername(), info.getPassword());
        }
    }
}
/**
 * 维护动态数据源
 */
@Slf4j
public class DynamicDataSourceService {

    private static final Map<Object, Object> dataSources = new HashMap<>();
    private static final ThreadLocal<String> dbKeys = ThreadLocal.withInitial(() -> null);

    /**
     * 构建DataSource
     * @param url 数据库地址
     * @param username 用户名
     * @param password 用户密码
     * @return DataSource
     */
    public static DataSource buildDataSource(String url, String username, String password) {
        DataSourceBuilder<?> builder = DataSourceBuilder.create();
        builder.driverClassName("com.mysql.cj.jdbc.Driver");
        builder.username(username);
        builder.password(password);
        builder.url(url);
        return builder.build();
    }

    /**
     * 动态添加一个数据源
     * @param name       数据源的key
     * @param dataSource 数据源对象
     */
    public static void addDataSource(String name, DataSource dataSource) {
        DynamicDataSource dynamicDataSource = SpringUtils.getBean(DynamicDataSource.class);
        dataSources.put(name, dataSource);
        dynamicDataSource.setTargetDataSources(dataSources);
        dynamicDataSource.afterPropertiesSet();
        log.info("添加了数据源:{}", name);
    }

    /**
     * 动态添加一个数据源
     * @param dbName 数据源的key
     * @param url 数据库地址
     * @param username 用户名
     * @param password 用户密码
     */
    public static void addDataSource(String dbName, String url, String username, String password){
        DataSource dataSource = buildDataSource(url, username, password);
        addDataSource(dbName, dataSource);
    }

    /**
     * 是否存在数据源
     */
    public static boolean exist(String dbKey) {
        return dataSources.get(dbKey) != null;
    }

    /**
     * 切换数据源
     */
    public static void switchDb(String dbKey) {
        dbKeys.set(dbKey);
    }

    /**
     * 重置数据源
     */
    public static void resetDb() {
        dbKeys.remove();
    }

    /**
     * 获取当前数据源
     */
    public static String currentDb() {
        return dbKeys.get();
    }
}
  • 第二步:基本数据库配置 mybatisPlus
@Configuration
public class DynamicDataSourceConfig {
    /**
     * 动态数据源
     */
    @Bean
    public DynamicDataSource dynamicDataSource() {
        DynamicDataSource dataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        dataSource.setTargetDataSources(targetDataSources);
        return dataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dynamicDataSource());
        sqlSessionFactoryBean.setTypeAliasesPackage("com.qykj.xyj.**.entity");
        sqlSessionFactoryBean.setTypeEnumsPackage("com.qykj.xiyouji.**.enums");
        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        sqlSessionFactoryBean.setMapperLocations(resolver.getResources("classpath*:mapper/*/*Mapper*.xml"));

        GlobalConfig globalConfig = new GlobalConfig();
        // 配置自定义填充器 MyMetaObjectHandler
        globalConfig.setMetaObjectHandler(new MybatisPlusMetaObjectHandler() );
        sqlSessionFactoryBean.setGlobalConfig(globalConfig);

        // 逻辑删除配置
        GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
        dbConfig.setLogicDeleteField("del");
        dbConfig.setLogicDeleteValue("1");
        dbConfig.setLogicNotDeleteValue("0");
        globalConfig.setDbConfig(dbConfig);

        // 设置XML
        MybatisConfiguration configuration = new MybatisConfiguration();
        configuration.setDefaultScriptingLanguage(MybatisXMLLanguageDriver.class);
        configuration.setJdbcTypeForNull(JdbcType.NULL);
        // 设置sql日志
        configuration.setLogImpl(StdOutImpl.class);
        // 设置枚举处理器
        configuration.setDefaultEnumTypeHandler(EnumValueTypeHandler.class);
        sqlSessionFactoryBean.setConfiguration(configuration);

        // 配置分页插件
        sqlSessionFactoryBean.setPlugins(mybatisPlusInterceptor());

        // 配置事务管理器
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 配置分页插件
     */
    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        PaginationInnerInterceptor paginationInnerInterceptor = new PaginationInnerInterceptor(DbType.MYSQL);
        paginationInnerInterceptor.setDialect(new MySqlDialect());
        interceptor.addInnerInterceptor(paginationInnerInterceptor);
        return interceptor;
    }
}
  • 第三步:基本数据库配置 AbstractRoutingDataSource
@Slf4j
public class DynamicDataSource extends AbstractRoutingDataSource {

    public static final String DEFAULT_DB_KEY = "game_base";
    
    @Override
    protected Object determineCurrentLookupKey() {
        String currentDb = DynamicDataSourceService.currentDb();
        log.info("currentDb:"+currentDb);
        if (currentDb == null) {
            return DEFAULT_DB_KEY;
        }
        return currentDb;
    }
}
  • 第四步:注解@SelectDB的编写

@Async(ThreadPoolConfig.THREAD_POOL),确保切换数据源之后为另一个线程。

@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Async(ThreadPoolConfig.THREAD_POOL)
public @interface SelectDB {

    // 数据库名称
    String dbName();
    
    // 查询参数中标识区服的字段
    String serverFiled() default "-1";
}
@Order(3)
@Slf4j
@Aspect
@Component
public class SelectDBAspect {

    @Value("${gjxy.db.url}")
    private String url;
    @Value("${gjxy.db.username}")
    private String username;
    @Value("${gjxy.db.password}")
    private String password;

    @Pointcut("@annotation(com.xxx.aspect.SelectDB)")
    public void pointcut(){}

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        // 获取接口上 SelectDB 注解
        MethodSignature methodSignature = (MethodSignature) point.getSignature();
        Method method = methodSignature.getMethod();
        SelectDB annotation = method.getAnnotation(SelectDB.class);
        String dbName = annotation.dbName();
        String serverFiled = annotation.serverFiled();

        if("-1".equals(serverFiled)){
            // 切换到对应的库,不区分区服的库
            DynamicDataSourceService.switchDb(dbName);
        } else {
            // 切换到对应的库,根据区服切换到不同的库
            Object[] strArr = AnalyzeParamsUtils.analyzeParams(point, serverFiled);
            dbName = MessageFormat.format(dbName, strArr);
            boolean exist = DynamicDataSourceService.exist(dbName);
            if(exist){
                DynamicDataSourceService.switchDb(dbName);
            }else {
                // 确保开启新的区服后会新增game_x_data,game_x_log两个库,也能够正常查询
                String urlFormat = MessageFormat.format(url, dbName);
                DynamicDataSourceService.addDataSource(dbName, urlFormat, username, password);
                DynamicDataSourceService.switchDb(dbName);
            }
        }
        Object proceed = point.proceed();
        DynamicDataSourceService.resetDb();
        return proceed;
    }
}
  • AnalyzeParamsUtils.java 解析方法入参,获取实际值
public class AnalyzeParamsUtils {

    // 解析参数,赋予实际的数据值
    public static Object[] analyzeParams(JoinPoint point, String params) {
        if(params == null || params.length() == 0) return new Object[0];
        List<Object> list = new ArrayList<>();
        Arrays.stream(params.split("\\,")).filter(s -> s.length() > 0).forEach(s -> {
            try {
                list.add(getParamValue(point, s));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        return list.toArray();
    }

    // 从参数列表中根据参数链获取值
    private static Object getParamValue(JoinPoint point, String params) throws Exception {
        String[] split = params.split("\\.");
        int length = split.length;

        MethodSignature methodSignature = (MethodSignature) point.getSignature();
        String[] argNames =  methodSignature.getParameterNames();
        Object[] argValues = point.getArgs();
        int argLength = argNames.length;

        for (int i = 0; i < argLength;i++){
            Object arg = argValues[i];
            final Class<?> clazz = arg.getClass();
            final String name = clazz.getSimpleName();
            if(name.equals(split[0])){ // 判断是否是指定类
                if(length == 1 ) {
                    ObjectMapper objectMapper = new ObjectMapper();
                    objectMapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
                    return objectMapper.writeValueAsString(arg);
                }else{
                    return getParamValue(arg, split[1]);
                }
            }else if(argNames[i].equals(split[0])){
                return arg;
            }
        }
        return new Object();
    }

    // 获取指定对象的指定字段的值
    private static Object getParamValue(Object argValue, String param) throws Exception {
        String getMethodName = "get" + firstUpperCase(param);
        Method declaredMethod = argValue.getClass().getMethod(getMethodName);
        return declaredMethod.invoke(argValue);
    }

    // 首字母大写转换
    private static String firstUpperCase(String field) {
        if (!StringUtils.isEmpty(field)) {
            char[] cs = field.toCharArray();
            cs[0] -= 32;
            return String.valueOf(cs);
        } else {
            return field;
        }
    }
}
  • 至此,动态数据源功能实现完成。核心就是利用AbstractRoutingDataSource+ThreadLocal+AOP配合使用,确保可以修改每条线程的数据源。
  • ThreadLocal中存放的就是,本次查询需要使用到的数据库名称,对应的就是Map<Object, Object> dataSources = new HashMap<>()中的key,value就是指定的数据源。
  • 一般设计到这种水平分库分表的情况,建议使用类似 sharding-jdbc这种优秀的第三方库去实现。我这里没有使用的原因是因为,sharding-jdbc设计到一个分片策略的问题。游戏中某些操作将会打破这个规则,如合服时两个水平库将会进行合并。会导致数据规则错乱。无法正确路由到指定的库,导致查询不到数据,或所有库查询。
  • 本例中这种方案,考虑到游戏服无法进行修改。做出的妥协方案。缺点:每次查询只能查询一个区服的数据。若需要多个区服查询,则需要复杂的数据汇总逻辑。

版权声明:本文为qq_39529562原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。