转载

sharding-JDBC源码分析(三)SQL路由

SQL router

分库分表中重要的一个模块就是数据切分,将数据根据一定的规则分布在多个DB中,那么这个过程中涉及到了路由,即根据SQL中分片键通过规则(分片算法)计算出某个DB节点,这个过程称为SQL路由。

The source code

sharding-JDBC路由入口统一在 shardingStatementShardingPreparedStatement ,分析 shardingStatement 源码

    private void shard(final String sql) {
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        //路由引擎,改引擎中处理非预编译SQL,所有的非预编译SQL路由都在这处理
        SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(), 
                runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getDatabaseType(), runtimeContext.getParseEngine());
        //根据路由引擎执行路由
        sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
    }
复制代码

sqlRouteResult 是根据路由结果构建的数据结构,整个执行流程会将 sqlRouteResult 作为参数传递。

shard方法执行路由逻辑

    public SQLRouteResult shard(final String sql, final List<Object> parameters) {
        //拷贝预编译参数,防止在路由过程中对原始值修改
        List<Object> clonedParameters = cloneParameters(parameters);
        //route
        SQLRouteResult result = executeRoute(sql, clonedParameters);
        //根据路由结果构建可执行单元,这块涉及到了SQL改写,分表需要改写SQL中的tableName ,比如原始表名acct,改写后acct_1,acct_2等
        result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));
        //日志的方式输出路由结果,包含SQL和db节点信息,方便问题追踪
        if (shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW)) {
            boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
            SQLLogger.logSQL(sql, showSimple, result.getOptimizedStatement(), result.getRouteUnits());
        }
        return result;
    }
复制代码

executeRoute方法

    private SQLRouteResult executeRoute(final String sql, final List<Object> clonedParameters) {
        //routingHook注入的钩子方法,获取路由过程中的信息,此处用到的是SPI扩展技术,用户可通过SPI将具体的钩子实现注入,实现自己的逻辑,比如获取路由执行时间等
        routingHook.start(sql);
        try {
            //路由
            SQLRouteResult result = route(sql, clonedParameters);
            //执行钩子中finishSuccess方法
            routingHook.finishSuccess(result, metaData.getTable());
            return result;
            // CHECKSTYLE:OFF
        } catch (final Exception ex) {
            // CHECKSTYLE:ON
            //执行钩子中finishFailure方法
            routingHook.finishFailure(ex);
            throw ex;
        }
    }
复制代码

route方法

//抽象方法,实现类有两个 SimpleQueryShardingEngine和PreparedQueryShardingEngine,一个处理预编译路由,另外一个处理非预编译SQL路由
protected  abstract SQLRouteResult route(String sql, List<Object> parameters);

复制代码

我们分析非预编译SQL路由 SimpleQueryShardingEngine

    public SimpleQueryShardingEngine(final ShardingRule shardingRule, final ShardingProperties shardingProperties, 
                                     final ShardingMetaData metaData, final DatabaseType databaseType, final SQLParseEngine sqlParseEngine) {
        super(shardingRule, shardingProperties, metaData);
        //StatementRoutingEngine 路由引擎 
        routingEngine = new StatementRoutingEngine(shardingRule, metaData, databaseType, sqlParseEngine);
    }
    
    @Override
    protected List<Object> cloneParameters(final List<Object> parameters) {
        return Collections.emptyList();
    }
    
    @Override
    protected SQLRouteResult route(final String sql, final List<Object> parameters) {
        //执行路由
        return routingEngine.route(sql);
    }
复制代码
   public SQLRouteResult route(final String logicSQL) {
       //sql 解析
        SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false);
       //shardingRouter.route 根据SQL解析结果路由
        return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement));
    }
复制代码
    public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        OptimizedStatement optimizedStatement = ShardingOptimizeEngineFactory.newInstance(sqlStatement).optimize(shardingRule, shardingMetaData.getTable(), logicSQL, parameters, sqlStatement);
        //是否需要将子查询分片值与外部查询分片值进行合并,关于sharding-JDB中子查询的分析,在我的另外一篇文章中已经分析,有兴趣的可以去看看
        boolean needMergeShardingValues = isNeedMergeShardingValues(optimizedStatement);
        if (optimizedStatement instanceof ShardingConditionOptimizedStatement && needMergeShardingValues) {
            //对子查询的分片键进行检查,主要确定与外部是否同一个分片键
            checkSubqueryShardingValues(optimizedStatement, ((ShardingConditionOptimizedStatement) optimizedStatement).getShardingConditions());
            mergeShardingConditions(((ShardingConditionOptimizedStatement) optimizedStatement).getShardingConditions());
        }
        //路由,工厂模式RoutingEngineFactory创建路由算法,然后执行路由,返回RoutingResult
        RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), optimizedStatement).route();
        if (needMergeShardingValues) {
            //不支持跨分片子查询
            Preconditions.checkState(1 == routingResult.getRoutingUnits().size(), "Must have one sharding with subquery.");
        }
        if (optimizedStatement instanceof ShardingInsertOptimizedStatement) {
            setGeneratedValues((ShardingInsertOptimizedStatement) optimizedStatement);
        }
        SQLRouteResult result = new SQLRouteResult(optimizedStatement);
        result.setRoutingResult(routingResult);
        return result;
    }
复制代码

主要是根据表配置的路由策略构建具体的路由算法实现类

shardingRule:
  tables:
    t_order:
      actualDataNodes: ds_${0..1}.t_order
      #表路由策略
      tableStrategy:
      #路由算法类型
        inline:
          #算法参数
          shardingColumn: order_id
          algorithmExpression: t_order_${order_id % 2}
      databaseStrategy:
        inline:
          shardingColumn: order_id
          algorithmExpression: ds_${order_id % 2}
复制代码

路由算法类型可有多种类型选择,比如complex,standard等,同理可以自己实现自定义算法,在算法参数中加入自定义路由实现类即可.

sharding-JDBC同时也支持多个字段路由(多维度路由)

conclusion

sharding-JDBC SQL router相对还算简单,整体流程已经介绍,与预编译的SQL路由不同之处是,预编译需要提取?参数,有兴趣的可以翻阅源码,后续会对SQL结果集合并做分析,包括内存合并和流式合并,这块相对复杂些.

原文  https://juejin.im/post/5eef5c1f6fb9a058a57b058e
正文到此结束
Loading...