分库分表中重要的一个模块就是数据切分,将数据根据一定的规则分布在多个DB中,那么这个过程中涉及到了路由,即根据SQL中分片键通过规则(分片算法)计算出某个DB节点,这个过程称为SQL路由。
sharding-JDBC路由入口统一在 shardingStatement
和 ShardingPreparedStatement
,分析 shardingStatement
源码
private void shard(final String sql) {
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
//路由引擎,改引擎中处理非预编译SQL,所有的非预编译SQL路由都在这处理
SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(),
runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getDatabaseType(), runtimeContext.getParseEngine());
//根据路由引擎执行路由
sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}
复制代码
sqlRouteResult
是根据路由结果构建的数据结构,整个执行流程会将 sqlRouteResult
作为参数传递。
shard方法执行路由逻辑
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
//拷贝预编译参数,防止在路由过程中对原始值修改
List<Object> clonedParameters = cloneParameters(parameters);
//route
SQLRouteResult result = executeRoute(sql, clonedParameters);
//根据路由结果构建可执行单元,这块涉及到了SQL改写,分表需要改写SQL中的tableName ,比如原始表名acct,改写后acct_1,acct_2等
result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));
//日志的方式输出路由结果,包含SQL和db节点信息,方便问题追踪
if (shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW)) {
boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
SQLLogger.logSQL(sql, showSimple, result.getOptimizedStatement(), result.getRouteUnits());
}
return result;
}
复制代码
executeRoute方法
private SQLRouteResult executeRoute(final String sql, final List<Object> clonedParameters) {
//routingHook注入的钩子方法,获取路由过程中的信息,此处用到的是SPI扩展技术,用户可通过SPI将具体的钩子实现注入,实现自己的逻辑,比如获取路由执行时间等
routingHook.start(sql);
try {
//路由
SQLRouteResult result = route(sql, clonedParameters);
//执行钩子中finishSuccess方法
routingHook.finishSuccess(result, metaData.getTable());
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
//执行钩子中finishFailure方法
routingHook.finishFailure(ex);
throw ex;
}
}
复制代码
route方法
//抽象方法,实现类有两个 SimpleQueryShardingEngine和PreparedQueryShardingEngine,一个处理预编译路由,另外一个处理非预编译SQL路由
protected abstract SQLRouteResult route(String sql, List<Object> parameters);
复制代码
我们分析非预编译SQL路由 SimpleQueryShardingEngine
public SimpleQueryShardingEngine(final ShardingRule shardingRule, final ShardingProperties shardingProperties,
final ShardingMetaData metaData, final DatabaseType databaseType, final SQLParseEngine sqlParseEngine) {
super(shardingRule, shardingProperties, metaData);
//StatementRoutingEngine 路由引擎
routingEngine = new StatementRoutingEngine(shardingRule, metaData, databaseType, sqlParseEngine);
}
@Override
protected List<Object> cloneParameters(final List<Object> parameters) {
return Collections.emptyList();
}
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
//执行路由
return routingEngine.route(sql);
}
复制代码
public SQLRouteResult route(final String logicSQL) {
//sql 解析
SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false);
//shardingRouter.route 根据SQL解析结果路由
return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement));
}
复制代码
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
OptimizedStatement optimizedStatement = ShardingOptimizeEngineFactory.newInstance(sqlStatement).optimize(shardingRule, shardingMetaData.getTable(), logicSQL, parameters, sqlStatement);
//是否需要将子查询分片值与外部查询分片值进行合并,关于sharding-JDB中子查询的分析,在我的另外一篇文章中已经分析,有兴趣的可以去看看
boolean needMergeShardingValues = isNeedMergeShardingValues(optimizedStatement);
if (optimizedStatement instanceof ShardingConditionOptimizedStatement && needMergeShardingValues) {
//对子查询的分片键进行检查,主要确定与外部是否同一个分片键
checkSubqueryShardingValues(optimizedStatement, ((ShardingConditionOptimizedStatement) optimizedStatement).getShardingConditions());
mergeShardingConditions(((ShardingConditionOptimizedStatement) optimizedStatement).getShardingConditions());
}
//路由,工厂模式RoutingEngineFactory创建路由算法,然后执行路由,返回RoutingResult
RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), optimizedStatement).route();
if (needMergeShardingValues) {
//不支持跨分片子查询
Preconditions.checkState(1 == routingResult.getRoutingUnits().size(), "Must have one sharding with subquery.");
}
if (optimizedStatement instanceof ShardingInsertOptimizedStatement) {
setGeneratedValues((ShardingInsertOptimizedStatement) optimizedStatement);
}
SQLRouteResult result = new SQLRouteResult(optimizedStatement);
result.setRoutingResult(routingResult);
return result;
}
复制代码
主要是根据表配置的路由策略构建具体的路由算法实现类
shardingRule:
tables:
t_order:
actualDataNodes: ds_${0..1}.t_order
#表路由策略
tableStrategy:
#路由算法类型
inline:
#算法参数
shardingColumn: order_id
algorithmExpression: t_order_${order_id % 2}
databaseStrategy:
inline:
shardingColumn: order_id
algorithmExpression: ds_${order_id % 2}
复制代码
路由算法类型可有多种类型选择,比如complex,standard等,同理可以自己实现自定义算法,在算法参数中加入自定义路由实现类即可.
sharding-JDBC同时也支持多个字段路由(多维度路由)
sharding-JDBC SQL router相对还算简单,整体流程已经介绍,与预编译的SQL路由不同之处是,预编译需要提取?参数,有兴趣的可以翻阅源码,后续会对SQL结果集合并做分析,包括内存合并和流式合并,这块相对复杂些.