public class JdbcTransactionFactory implements TransactionFactory { public JdbcTransactionFactory() { } public void setProperties(Properties props) { } public Transaction newTransaction(Connection conn) { return new JdbcTransaction(conn); } public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) { return new JdbcTransaction(ds, level, autoCommit); } }
public class DefaultSqlSessionFactory implements SqlSessionFactory { public SqlSession openSession(ExecutorType execType, Connection connection) { return this.openSessionFromConnection(execType, connection); } private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {//分为两种创建(dataSource、fromConnection) DefaultSqlSession var8; try { boolean autoCommit; try { autoCommit = connection.getAutoCommit(); } catch (SQLException var13) { autoCommit = true; } Environment environment = this.configuration.getEnvironment(); TransactionFactory transactionFactory = this.getTransactionFactoryFromEnvironment(environment); Transaction tx = transactionFactory.newTransaction(connection); Executor executor = this.configuration.newExecutor(tx, execType); var8 = new DefaultSqlSession(this.configuration, executor, autoCommit); } catch (Exception var14) { throw ExceptionFactory.wrapException("Error opening session. Cause: " + var14, var14); } finally { ErrorContext.instance().reset(); } return var8; }
public class DefaultSqlSession implements SqlSession { public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { List var5; try { MappedStatement ms = this.configuration.getMappedStatement(statement); var5 = this.executor.query(ms, this.wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER); } catch (Exception var9) { throw ExceptionFactory.wrapException("Error querying database. Cause: " + var9, var9); } finally { ErrorContext.instance().reset(); } return var5; } public int update(String statement, Object parameter) { int var4; try { this.dirty = true; MappedStatement ms = this.configuration.getMappedStatement(statement); var4 = this.executor.update(ms, this.wrapCollection(parameter)); } catch (Exception var8) { throw ExceptionFactory.wrapException("Error updating database. Cause: " + var8, var8); } finally { ErrorContext.instance().reset(); } return var4; } public void commit(boolean force) { try { this.executor.commit(this.isCommitOrRollbackRequired(force)); this.dirty = false; } catch (Exception var6) { throw ExceptionFactory.wrapException("Error committing transaction. Cause: " + var6, var6); } finally { ErrorContext.instance().reset(); } } public void rollback(boolean force) { try { this.executor.rollback(this.isCommitOrRollbackRequired(force)); this.dirty = false; } catch (Exception var6) { throw ExceptionFactory.wrapException("Error rolling back transaction. Cause: " + var6, var6); } finally { ErrorContext.instance().reset(); } }
public class SimpleExecutor extends BaseExecutor { public SimpleExecutor(Configuration configuration, Transaction transaction) { super(configuration, transaction); } public int doUpdate(MappedStatement ms, Object parameter) throws SQLException { Statement stmt = null; int var6; try { Configuration configuration = ms.getConfiguration(); StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, (ResultHandler)null, (BoundSql)null); stmt = this.prepareStatement(handler, ms.getStatementLog()); var6 = handler.update(stmt); } finally { this.closeStatement(stmt); } return var6; } public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { Statement stmt = null; List var9; try { Configuration configuration = ms.getConfiguration(); StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, resultHandler, boundSql); stmt = this.prepareStatement(handler, ms.getStatementLog()); var9 = handler.query(stmt, resultHandler); } finally { this.closeStatement(stmt); } return var9; } private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Connection connection = this.getConnection(statementLog); Statement stmt = handler.prepare(connection, this.transaction.getTimeout()); handler.parameterize(stmt); return stmt; }
public class SimpleExecutor extends BaseExecutor { public Transaction getTransaction() { if (this.closed) { throw new ExecutorException("Executor was closed."); } else { return this.transaction; } } public void close(boolean forceRollback) { try { try { this.rollback(forceRollback); } finally { if (this.transaction != null) { this.transaction.close(); } } } catch (SQLException var11) { log.warn("Unexpected exception on closing transaction. Cause: " + var11); } finally { this.transaction = null; this.deferredLoads = null; this.localCache = null; this.localOutputParameterCache = null; this.closed = true; } } public void commit(boolean required) throws SQLException { if (this.closed) { throw new ExecutorException("Cannot commit, transaction is already closed"); } else { this.clearLocalCache(); this.flushStatements(); if (required) { this.transaction.commit(); } } } public void rollback(boolean required) throws SQLException { if (!this.closed) { try { this.clearLocalCache(); this.flushStatements(true); } finally { if (required) { this.transaction.rollback(); } } } } public void clearLocalCache() { if (!this.closed) { this.localCache.clear(); this.localOutputParameterCache.clear(); } }
public class JdbcTransaction implements Transaction { public JdbcTransaction(Connection connection) { this.connection = connection; } public Connection getConnection() throws SQLException { if (this.connection == null) { this.openConnection(); } return this.connection; } public void commit() throws SQLException { if (this.connection != null && !this.connection.getAutoCommit()) { if (log.isDebugEnabled()) { log.debug("Committing JDBC Connection [" + this.connection + "]"); } this.connection.commit(); } } public void rollback() throws SQLException { if (this.connection != null && !this.connection.getAutoCommit()) { if (log.isDebugEnabled()) { log.debug("Rolling back JDBC Connection [" + this.connection + "]"); } this.connection.rollback(); } } public void close() throws SQLException { if (this.connection != null) { this.resetAutoCommit(); if (log.isDebugEnabled()) { log.debug("Closing JDBC Connection [" + this.connection + "]"); } this.connection.close(); } } }
public class ConnectionImpl extends ConnectionPropertiesImpl implements Connection { public void commit() throws SQLException { synchronized(this.getMutex()) { this.checkClosed(); try { if (this.connectionLifecycleInterceptors != null) { IterateBlock iter = new IterateBlock(this.connectionLifecycleInterceptors.iterator()) { void forEach(Object each) throws SQLException { if (!((ConnectionLifecycleInterceptor)each).commit()) { this.stopIterating = true; } } }; iter.doForAll(); if (!iter.fullIteration()) { return; } } if (this.autoCommit && !this.getRelaxAutoCommit()) { throw SQLError.createSQLException("Can't call commit when autocommit=true"); } if (this.transactionsSupported) { if (this.getUseLocalSessionState() && this.versionMeetsMinimum(5, 0, 0) && !this.io.inTransactionOnServer()) { return; } this.execSQL((StatementImpl)null, "commit", -1, (Buffer)null, 1003, 1007, false, this.database, (Field[])null, false); } } catch (SQLException var8) { if ("08S01".equals(var8.getSQLState())) { throw SQLError.createSQLException("Communications link failure during commit(). Transaction resolution unknown.", "08007"); } throw var8; } finally { this.needsPing = this.getReconnectAtTxEnd(); } } } }
当 MyBatis 与不同的应用结合时,需要不同的事务管理机制。与 Spring 结合时,由 Spring 来管理事务;单独使用时需要自行管理事务,在容器里运行时可能由容器进行管理。
MyBatis 用 Enviroment 来表示运行环境,其封装了三个属性:
public class Configuration { // 一个 MyBatis 的配置只对应一个环境 protected Environment environment; // 其他属性 ..... } public final class Environment { private final String id; private final TransactionFactory transactionFactory; private final DataSource dataSource; }
MyBatis 把事务管理抽象出 Transaction
接口,由 TransactionFactory
public interface Transaction { Connection getConnection() throws SQLException; void commit() throws SQLException; void rollback() throws SQLException; void close() throws SQLException; Integer getTimeout() throws SQLException; } public interface TransactionFactory { void setProperties(Properties props); Transaction newTransaction(Connection conn); Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit); }
Executor 的实现持有一个 SqlSession 实现,事务控制是委托给 SqlSession 的方法来实现的。
public abstract class BaseExecutor implements Executor { protected Transaction transaction; public void commit(boolean required) throws SQLException { if (closed) { throw new ExecutorException("Cannot commit, transaction is already closed"); } clearLocalCache(); flushStatements(); if (required) { transaction.commit(); } } public void rollback(boolean required) throws SQLException { if (!closed) { try { clearLocalCache(); flushStatements(true); } finally { if (required) { transaction.rollback(); } } } } // 省略其他方法、属性 }
与 Spring 集成时,通过 SqlSessionFactoryBean 来初始化 MyBatis 。
protected SqlSessionFactory buildSqlSessionFactory() throws IOException { Configuration configuration; XMLConfigBuilder xmlConfigBuilder = null; if (this.configuration != null) { configuration = this.configuration; if (configuration.getVariables() == null) { configuration.setVariables(this.configurationProperties); } else if (this.configurationProperties != null) { configuration.getVariables().putAll(this.configurationProperties); } } else if (this.configLocation != null) { xmlConfigBuilder = new XMLConfigBuilder(this.configLocation.getInputStream(), null, this.configurationProperties); configuration = xmlConfigBuilder.getConfiguration(); } else { configuration = new Configuration(); configuration.setVariables(this.configurationProperties); } if (this.objectFactory != null) { configuration.setObjectFactory(this.objectFactory); } if (this.objectWrapperFactory != null) { configuration.setObjectWrapperFactory(this.objectWrapperFactory); } if (this.vfs != null) { configuration.setVfsImpl(this.vfs); } if (hasLength(this.typeAliasesPackage)) { String[] typeAliasPackageArray = tokenizeToStringArray(this.typeAliasesPackage, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS); for (String packageToScan : typeAliasPackageArray) { configuration.getTypeAliasRegistry().registerAliases(packageToScan, typeAliasesSuperType == null ? Object.class : typeAliasesSuperType); } } if (!isEmpty(this.typeAliases)) { for (Class<?> typeAlias : this.typeAliases) { configuration.getTypeAliasRegistry().registerAlias(typeAlias); } } if (!isEmpty(this.plugins)) { for (Interceptor plugin : this.plugins) { configuration.addInterceptor(plugin); } } if (hasLength(this.typeHandlersPackage)) { String[] typeHandlersPackageArray = tokenizeToStringArray(this.typeHandlersPackage, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS); for (String packageToScan : typeHandlersPackageArray) { configuration.getTypeHandlerRegistry().register(packageToScan); } } if (!isEmpty(this.typeHandlers)) { for (TypeHandler<?> typeHandler : this.typeHandlers) { configuration.getTypeHandlerRegistry().register(typeHandler); } } if (this.databaseIdProvider != null) {//fix #64 set databaseId before parse mapper xmls try { configuration.setDatabaseId(this.databaseIdProvider.getDatabaseId(this.dataSource)); } catch (SQLException e) { throw new NestedIOException("Failed getting a databaseId", e); } } if (this.cache != null) { configuration.addCache(this.cache); } if (xmlConfigBuilder != null) { try { xmlConfigBuilder.parse(); } catch (Exception ex) { throw new NestedIOException("Failed to parse config resource: " + this.configLocation, ex); } finally { ErrorContext.instance().reset(); } } // 创建 SpringManagedTransactionFactory if (this.transactionFactory == null) { this.transactionFactory = new SpringManagedTransactionFactory(); } // 封装成 Environment configuration.setEnvironment(new Environment(this.environment, this.transactionFactory, this.dataSource)); if (!isEmpty(this.mapperLocations)) { for (Resource mapperLocation : this.mapperLocations) { if (mapperLocation == null) { continue; } try { XMLMapperBuilder xmlMapperBuilder = new XMLMapperBuilder(mapperLocation.getInputStream(), configuration, mapperLocation.toString(), configuration.getSqlFragments()); xmlMapperBuilder.parse(); } catch (Exception e) { throw new NestedIOException("Failed to parse mapping resource: '" + mapperLocation + "'", e); } finally { ErrorContext.instance().reset(); } } } else { } return this.sqlSessionFactoryBuilder.build(configuration); } public class SqlSessionFactoryBuilder { public SqlSessionFactory build(Configuration config) { return new DefaultSqlSessionFactory(config); } }
重点是在构建 MyBatis Configuration 对象时,把 transactionFactory 配置成 SpringManagedTransactionFactory,再封装成 Environment 对象。
Mapper 的代理对象持有的是 SqlSessionTemplate
,其实现了 SqlSession
的方法并不直接调用具体的 SqlSession
的方法,而是委托给一个动态代理,通过代理 SqlSessionInterceptor
负责获取真实的与数据库关联的 SqlSession
public class SqlSessionTemplate implements SqlSession, DisposableBean { private final SqlSessionFactory sqlSessionFactory; private final ExecutorType executorType; private final SqlSession sqlSessionProxy; private final PersistenceExceptionTranslator exceptionTranslator; public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required"); notNull(executorType, "Property 'executorType' is required"); this.sqlSessionFactory = sqlSessionFactory; this.executorType = executorType; this.exceptionTranslator = exceptionTranslator; // 因为 SqlSession 接口声明的方法也不少, // 在每个方法里添加事务相关的拦截比较麻烦, // 不如创建一个内部的代理对象进行统一处理。 this.sqlSessionProxy = (SqlSession) newProxyInstance( SqlSessionFactory.class.getClassLoader(), new Class[] { SqlSession.class }, new SqlSessionInterceptor()); } public int update(String statement) { // 在代理对象上执行方法调用 return this.sqlSessionProxy.update(statement); } // 对方法调用进行拦截,加入事务控制逻辑 private class SqlSessionInterceptor implements InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 获取与数据库关联的会话 SqlSession sqlSession = SqlSessionUtils.getSqlSession( SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator); try { // 执行 SQL 操作 Object result = method.invoke(sqlSession, args); if (!SqlSessionUtils.isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) { // 如果 sqlSession 不是 Spring 管理的,则要自行提交事务 sqlSession.commit(true); } return result; } catch (Throwable t) { Throwable unwrapped = unwrapThrowable(t); if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) { SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory); sqlSession = null; Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped); if (translated != null) { unwrapped = translated; } } throw unwrapped; } finally { if (sqlSession != null) { SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory); } } } } }
封装了对 Spring 事务管理机制的访问。
// SqlSessionUtils public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { // 从 Spring 的事务管理机制那里获取当前事务关联的会话 SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory); SqlSession session = sessionHolder(executorType, holder); if (session != null) { // 已经有一个会话则复用 return session; } // 创建新的 会话 session = sessionFactory.openSession(executorType); // 注册到 Spring 的事务管理机制里 registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session); return session; } private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator, SqlSession session) { SqlSessionHolder holder; if (TransactionSynchronizationManager.isSynchronizationActive()) { Environment environment = sessionFactory.getConfiguration().getEnvironment(); if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) { holder = new SqlSessionHolder(session, executorType, exceptionTranslator); TransactionSynchronizationManager.bindResource(sessionFactory, holder); // 重点:注册会话管理的回调钩子,真正的关闭动作是在回调里完成的。 TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory)); holder.setSynchronizedWithTransaction(true); // 维护会话的引用计数 holder.requested(); } else { if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) { } else { throw new TransientDataAccessResourceException( "SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization"); } } } else { } } public static void closeSqlSession(SqlSession session, SqlSessionFactory sessionFactory) { // 从线程本地变量里获取 Spring 管理的会话 SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory); if ((holder != null) && (holder.getSqlSession() == session)) { // Spring 管理的不直接关闭,由回调钩子来关闭 holder.released(); } else { // 非 Spring 管理的直接关闭 session.close(); } }
是 SqlSessionUtils
的内部私有类,用于作为回调钩子与 Spring 的事务管理机制协调工作, TransactionSynchronizationManager
private static final class SqlSessionSynchronization extends TransactionSynchronizationAdapter { private final SqlSessionHolder holder; private final SqlSessionFactory sessionFactory; private boolean holderActive = true; public SqlSessionSynchronization(SqlSessionHolder holder, SqlSessionFactory sessionFactory) { this.holder = holder; this.sessionFactory = sessionFactory; } public int getOrder() { return DataSourceUtils.CONNECTION_SYNCHRONIZATION_ORDER - 1; } public void suspend() { if (this.holderActive) { TransactionSynchronizationManager.unbindResource(this.sessionFactory); } } public void resume() { if (this.holderActive) { TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder); } } public void beforeCommit(boolean readOnly) { if (TransactionSynchronizationManager.isActualTransactionActive()) { try { this.holder.getSqlSession().commit(); } catch (PersistenceException p) { if (this.holder.getPersistenceExceptionTranslator() != null) { DataAccessException translated = this.holder .getPersistenceExceptionTranslator() .translateExceptionIfPossible(p); if (translated != null) { throw translated; } } throw p; } } } public void beforeCompletion() { if (!this.holder.isOpen()) { TransactionSynchronizationManager.unbindResource(sessionFactory); this.holderActive = false; // 真正关闭数据库会话 this.holder.getSqlSession().close(); } } public void afterCompletion(int status) { if (this.holderActive) { TransactionSynchronizationManager.unbindResourceIfPossible(sessionFactory); this.holderActive = false; // 真正关闭数据库会话 this.holder.getSqlSession().close(); } this.holder.reset(); } }
// DefaultSqlSessionFactory private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment(); // 获取事务工厂实现 final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment); tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit); final Executor executor = configuration.newExecutor(tx, execType); return new DefaultSqlSession(configuration, executor, autoCommit); } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) { if (environment == null || environment.getTransactionFactory() == null) { return new ManagedTransactionFactory(); } return environment.getTransactionFactory(); }
/1. 步骤 1 是 Spring AOP 添加的切面的执行,事务是其中一个切面。
/2. 步骤 2 是 Spring 事务管理机制的部分。
/3. 步骤 3 是业务代码,调用 Mapper 代理上的方法。
/4. 步骤 4 是代理上的方法调用被 MapperProxy.invoke 拦截。
/5. 步骤 5、6 是因为 MapperProxy 持有的 sqlSession 是 SqlSessionTemplate,调用到 template 上的方法,又被转发给内部类 SqlSessionInterceptor ,该类获得 Spring 事务管理持有的数据库连接,用以创建 Executor h和 DefaultSqlSession。
/6. 步骤 7 用 DefaultSqlSession 发起数据库调用。
原文 http://www.cnblogs.com/eternal-heathens/p/13366799.html