工作中偶尔会碰到需要统一修改SQL的情况,例如有以下表结构:
CREATE TABLE `test_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `account` varchar(70) NOT NULL COMMENT '账号', `user_name` varchar(60) NOT NULL COMMENT '姓名', `age` int(11) NOT NULL COMMENT '年龄', `sex` bit(1) NOT NULL COMMENT '性别:0-男,1-女', `create_time` timestamp NOT NULL DEFAULT '2019-01-01 00:00:00' COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uk_account` (`account`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表';
假设有如下Mapper SQL:
insert into `test_user`(`account`, `user_name`, `age`, `sex`, `create_time`) values ('test1', 'test_user_1', 1, 0, now()) on duplicate key update `user_name` = 'test_user_1', `age` = 1, `sex` = 0;
在Service层代码中通过判断Mapper返回的影响行数是否等于1来识别SQL是否执行成功。但假如 duplicate key update
设置的字段值和数据库中的记录值完全一致,则 mysql
不会执行update,因此在JDBC返回的影响行数会为0,导致Service层逻辑错误。
解决方法很简单,只需在 duplicate key update
中加上 update_time = now()
即可,但如果这种语句广泛存在,那么最简单的方法就是通过SQL Rewrite来实现。
系统使用 Mybatis 作为ORM, alibaba druid 作为数据库连接池。
Mybatis提供了plugin机制来修改SQL,例如 Mybatis-PageHelper 就是使用plugin机制修改SQL添加分页和Count语句。
Druid提供了Filter机制来修改SQL,例如 EncodingConvertFilter 就是使用了Filter机制在实际执行前执行了编码转换。
既然以上两者都能做到修改SQL,那么我们该选择在什么时候执行修改呢?其实这两者并没有什么显著的优劣区别,我个人来看有以下两点区别:
要改写SQL,首先得先解析SQL,分析SQL的语义来判断是否需要改写以及改写哪一部分,而词法分析历来是非常耗时的,因此SQL Parser框架很重要。Java生态中较为流行的SQL Parser有以下几种:
其实说到SQL Rewrite,我们很容易就想到数据库中间件的分库分表,因此我们在选择SQL Parser时完全可以参考那些知名的数据库中间件。 Apache Sharding Sphere(原当当Sharding-JDBC) 、 Mycat 都是国内目前大量使用的开源数据库中间件,这两者都使用了alibaba druid的SQL Parser模块,并且Mycat还开源了他们在选型时的对比分析 Mycat路由新解析器选型分析与结果.docx 。
注意: Apache Sharding Sphere 在1.5.x版本后改用自己研发的SQL Parser,理由是因为Sharding Sphere并不需要完整的SQL AST,因此改用自研的SQL Parser以降低SQL解析完整性为代价提升分库分表效率,详见 深度认识 Sharding-JDBC:做最轻量级的数据库中间层 。
综上所述,我们可以放心的选用alibaba druid提供的SQL Parser,唯一的问题就是如何使用druid SQL Parser。druid官方并没有详细的关于SQL Parser和Visitor的API文档说明(再次吐槽一下国内开源项目在文档和代码注释上的不完善,druid源码基本没有注释),因此我们只能从其他相关文档,以及已有的Visitor中参考,以下是druid官方的全部关于SQL Parser和Visitor的文档:
在Demo中实现了Mybatis Plugin以及Druid Filter两种模式,实现的功能很简单,就是在开篇中的 insert ... on duplicate key update
sql中加上 update_time = now()
。
Demo地址为 mybatis-plugin-or-druid-filter-rewrite-sql 。
在Demo中使用了H2模拟Mysql,H2的建表语句参考
src/test/resources/schema-h2.sql
。
Plugin代码是
src/main/java/com/github/larva/zhang/problems/SimpleRewriteSqlMybatisPlugin.java
。
@Slf4j @Intercepts({@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})}) public class SimpleRewriteSqlMybatisPlugin implements Interceptor { private final SimpleAppendUpdateTimeVisitor visitor = new SimpleAppendUpdateTimeVisitor(); @Override public Object intercept(Invocation invocation) throws Throwable { Object[] args = invocation.getArgs(); MappedStatement mappedStatement = (MappedStatement) args[0]; SqlCommandType sqlCommandType = mappedStatement.getSqlCommandType(); if (sqlCommandType != SqlCommandType.INSERT) { // 只处理insert return invocation.proceed(); } BoundSql boundSql = mappedStatement.getBoundSql(args[1]); String sql = boundSql.getSql(); List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL); if (CollectionUtils.isNotEmpty(sqlStatements)) { for (SQLStatement sqlStatement : sqlStatements) { sqlStatement.accept(visitor); } } if (visitor.getAndResetRewriteStatus()) { // 改写了SQL,需要替换MappedStatement String newSql = SQLUtils.toSQLString(sqlStatements, JdbcConstants.MYSQL); log.info("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql); BoundSql newBoundSql = new BoundSql(mappedStatement.getConfiguration(), newSql, boundSql.getParameterMappings(), boundSql.getParameterObject()); // copy原始MappedStatement的各项属性 MappedStatement.Builder builder = new MappedStatement.Builder(mappedStatement.getConfiguration(), mappedStatement.getId(), new WarpBoundSqlSqlSource(newBoundSql), mappedStatement.getSqlCommandType()); builder.cache(mappedStatement.getCache()).databaseId(mappedStatement.getDatabaseId()) .fetchSize(mappedStatement.getFetchSize()) .flushCacheRequired(mappedStatement.isFlushCacheRequired()) .keyColumn(StringUtils.join(mappedStatement.getKeyColumns(), ',')) .keyGenerator(mappedStatement.getKeyGenerator()) .keyProperty(StringUtils.join(mappedStatement.getKeyProperties(), ',')) .lang(mappedStatement.getLang()).parameterMap(mappedStatement.getParameterMap()) .resource(mappedStatement.getResource()).resultMaps(mappedStatement.getResultMaps()) .resultOrdered(mappedStatement.isResultOrdered()) .resultSets(StringUtils.join(mappedStatement.getResultSets(), ',')) .resultSetType(mappedStatement.getResultSetType()).statementType(mappedStatement.getStatementType()) .timeout(mappedStatement.getTimeout()).useCache(mappedStatement.isUseCache()); MappedStatement newMappedStatement = builder.build(); // 将新生成的MappedStatement对象替换到参数列表中 args[0] = newMappedStatement; } return invocation.proceed(); } /** * 生成代理类然后添加到{@link InterceptorChain}中 * * Mybatis的{@link Executor}依赖以下几个组件: * <ol> * <li>{@link StatementHandler} 负责创建JDBC {@link java.sql.Statement}对象</li> * <li>{@link ParameterHandler} 负责将实际参数填充到JDBC {@link java.sql.Statement}对象中</li> * <li>{@link ResultSetHandler} 负责JDBC {@link java.sql.Statement#execute(String)} * 后返回的{@link java.sql.ResultSet}的处理</li> * </ol> * 因为此Plugin只对Executor生效所以只代理{@link Executor}对象 * * @param target * @return */ @Override public Object plugin(Object target) { if (target instanceof Executor) { return Plugin.wrap(target, this); } return target; } @Override public void setProperties(Properties properties) { } static class WarpBoundSqlSqlSource implements SqlSource { private final BoundSql boundSql; public WarpBoundSqlSqlSource(BoundSql boundSql) { this.boundSql = boundSql; } @Override public BoundSql getBoundSql(Object parameterObject) { return boundSql; } } }
使用时只需声明Mybatis Configuration Bean时添加该Plugin实例到Interceptor列表中即可,参考
src/test/java/com/github/larva/zhang/problems/mybatis/TestMybatisPluginRewriteSqlConfig.java
。
@Bean @Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public org.apache.ibatis.session.Configuration mybatisConfiguration() { org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(); // 各项属性设置 ... // 使用Mybatis Plugin机制改写SQL configuration.addInterceptor(mybatisPlugin()); return configuration; } @Bean public SimpleRewriteSqlMybatisPlugin mybatisPlugin() { return new SimpleRewriteSqlMybatisPlugin(); }
Filter代码是
src/main/java/com/github/larva/zhang/problems/SimpleRewriteSqlDruidFilter.java
。
@Slf4j public class SimpleRewriteSqlDruidFilter extends FilterAdapter { private final SimpleAppendUpdateTimeVisitor visitor = new SimpleAppendUpdateTimeVisitor(); @Override public boolean statement_execute(FilterChain chain, StatementProxy statement, String sql) throws SQLException { String dbType = chain.getDataSource().getDbType(); List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, dbType); sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor)); if (visitor.getAndResetRewriteStatus()) { // 改写了SQL,需要替换 String newSql = SQLUtils.toSQLString(sqlStatements, dbType); log.info("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql); return super.statement_execute(chain, statement, newSql); } return super.statement_execute(chain, statement, sql); } @Override public PreparedStatementProxy connection_prepareStatement(FilterChain chain, ConnectionProxy connection, String sql, int autoGeneratedKeys) throws SQLException { List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL); sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor)); if (visitor.getAndResetRewriteStatus()) { // 改写了SQL,需要替换 String newSql = SQLUtils.toSQLString(sqlStatements, JdbcConstants.MYSQL); log.info("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql); return super.connection_prepareStatement(chain, connection, newSql, autoGeneratedKeys); } return super.connection_prepareStatement(chain, connection, sql, autoGeneratedKeys); } }
该Filter支持在 Statement
和 PreparedStatement
两种模式下执行的SQL Rewrite,但是缺少对其他类型的SQL的支持。
相较于Mybatis Plugin不好的一点是不论是什么SQL都需要先经过SQL Parser解析AST,当然这点也可以通过在 prepareStatement_execute
重写SQL而非 connection_prepareStatement
阶段。
prepareStatement_execute
阶段重写需要重新生成 PreparedStatementProxy
并且重设JdbcParameters,这点又比 connection_prepareStatement
阶段重写SQL要麻烦。
使用时只需在Druid DataSource实例声明时加入到Filter列表中即可,用法类型Druid的WallFilter。参考
src/test/java/com/github/larva/zhang/problems/druid/DruidFilterRewriteSqlConfig.java
。
@Bean(initMethod = "init", destroyMethod = "close") public DruidDataSource dataSource(@Value("${spring.datasource.url}") String url, @Value("${spring.datasource.username}") String username, @Value("${spring.datasource.password}") String password) throws SQLException { DruidDataSource druidDataSource = new DruidDataSource(); // 各项属性设置 ... // 添加改写SQL的Filter druidDataSource.setProxyFilters(Collections.singletonList(simpleRewriteSqlDruidFilter())); return druidDataSource; } @Bean public FilterAdapter simpleRewriteSqlDruidFilter() { return new SimpleRewriteSqlDruidFilter(); }
从上述的Plugin和Filter代码中都可以看到,实际的SQL改写是交给了
src/main/java/com/github/larva/zhang/problems/SimpleAppendUpdateTimeVisitor.java
。
@Slf4j public class SimpleAppendUpdateTimeVisitor extends MySqlASTVisitorAdapter { private static final ThreadLocal<Boolean> REWRITE_STATUS_CACHE = new ThreadLocal<>(); private static final String UPDATE_TIME_COLUMN = "update_time"; @Override public boolean visit(MySqlInsertStatement x) { boolean hasUpdateTimeCol = false; // duplicate key update得到的都是SQLBinaryOpExpr List<SQLExpr> duplicateKeyUpdate = x.getDuplicateKeyUpdate(); if (CollectionUtils.isNotEmpty(duplicateKeyUpdate)) { for (SQLExpr sqlExpr : duplicateKeyUpdate) { if (sqlExpr instanceof SQLBinaryOpExpr && ((SQLBinaryOpExpr) sqlExpr).conditionContainsColumn(UPDATE_TIME_COLUMN)) { hasUpdateTimeCol = true; break; } } if (!hasUpdateTimeCol) { // append update time column String tableAlias = x.getTableSource().getAlias(); StringBuilder setUpdateTimeBuilder = new StringBuilder(); if (!StringUtils.isEmpty(tableAlias)) { setUpdateTimeBuilder.append(tableAlias).append('.'); } setUpdateTimeBuilder.append(UPDATE_TIME_COLUMN).append(" = now()"); SQLExpr sqlExpr = SQLUtils.toMySqlExpr(setUpdateTimeBuilder.toString()); duplicateKeyUpdate.add(sqlExpr); // 重写状态记录 REWRITE_STATUS_CACHE.set(Boolean.TRUE); } } return super.visit(x); } /** * 返回重写状态并重置重写状态 * * @return 重写状态,{@code true}表示已重写,{@code false}表示未重写 */ public boolean getAndResetRewriteStatus() { boolean rewriteStatus = Optional.ofNullable(REWRITE_STATUS_CACHE.get()).orElse(Boolean.FALSE); // reset rewrite status REWRITE_STATUS_CACHE.remove(); return rewriteStatus; } }
本文由博客一文多发平台 OpenWrite 发布!