本篇文章主要分享压测的(高并发)时候发现的一些问题。之前的两篇文章已经讲述了在高并发的情况下,消息队列和数据库连接池的一些总结和优化,有兴趣的可以在我的公众号中去翻阅。废话不多说,进入正题。
事务,想必各位 CRUD
之王对其并不陌生,基本上有多个写请求的都需要使用事务,而Spring对于事务的使用又特别的简单,只需要一个 @Transactional
注解即可,如下面的例子:
@Transactional public int createOrder(Order order){ orderDbStorage.save(order); orderItemDbStorage.save(order.getItems()); return order.getId(); }
在我们创建订单的时候, 通常需要将订单和订单项放在同一个事务里面保证其满足ACID,这里我们只需要在我们创建订单的方法上面写上事务注解即可。
对于上面的创建订单的代码,如果现在需要新增一个需求,再创建订单之后发送一个消息到消息队列或者调用一个RPC,你会怎么做呢?很多同学首先会想到,直接在事务方法里面进行调用:
@Transactional public int createOrder(Order order){ orderDbStorage.save(order); orderItemDbStorage.save(order.getItems()); sendRpc(); sendMessage(); return order.getId(); }
这种代码在很多人写的业务中都会出现,事务中嵌套rpc,嵌套一些非DB的操作,一般情况下这么写的确也没什么问题,一旦非DB写操作出现比较慢,或者流量比较大,就会出现大事务的问题。由于事务的一直不提交,就会导致数据库连接被占用。这个时候你可能会问,我扩大点数据库连接不就行了吗,100个不行就上1000个,再上篇文章已经讲过数据库连接池大小依然会影响我们数据库的性能,所以,数据库连接并不是想扩多少扩多少。
那我们应该怎么对其进行优化呢?在这里可以仔细想想,我们的非db操作,其实是不满足我们事务的ACID的,那么干嘛要写在事务里面,所以这里我们可以将其提取出来。
public int createOrder(Order order){ createOrderService.createOrder(order); sendRpc(); sendMessage(); }
在这个方法里面先去调用事务的创建订单,然后再去调用其他非DB操作。如果我们现在想要更复杂一点的逻辑,比如创建订单成功就发送成功的RPC请求,失败就发送失败的RPC请求,由上面的代码我们可以做如下转化:
public int createOrder(Order order){ try { createOrderService.createOrder(order); sendSuccessedRpc(); }catch (Exception e){ sendFailedRpc(); throw e; } }
通常我们会捕获异常,或者根据返回值来进行一些特殊处理,这里的实现需要显示的捕获异常,并且再次抛出,这种方式不是很优雅,那么怎么才能更好的写这种话逻辑呢?
在Spring的事务中刚好提供了一些工具方法,来帮助我们完成这种需求。在 TransactionSynchronizationManager
中提供了让我们对事务注册callBack的方法:
public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); if (!isSynchronizationActive()) { throw new IllegalStateException("Transaction synchronization is not active"); } synchronizations.get().add(synchronization); }
TransactionSynchronization也就是我们事务的callBack,提供了一些扩展点给我们:
public interface TransactionSynchronization extends Flushable { int STATUS_COMMITTED = 0; int STATUS_ROLLED_BACK = 1; int STATUS_UNKNOWN = 2; /** * 挂起时触发 */ void suspend(); /** * 挂起事务抛出异常的时候 会触发 */ void resume(); @Override void flush(); /** * 在事务提交之前触发 */ void beforeCommit(boolean readOnly); /** * 在事务完成之前触发 */ void beforeCompletion(); /** * 在事务提交之后触发 */ void afterCommit(); /** * 在事务完成之后触发 */ void afterCompletion(int status); }
我们可以利用afterComplettion方法实现我们上面的业务逻辑:
@Transactional public int createOrder(Order order){ orderDbStorage.save(order); orderItemDbStorage.save(order.getItems()); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCompletion(int status) { if (status == STATUS_COMMITTED){ sendSuccessedRpc(); }else { sendFailedRpc(); } } }); return order.getId(); }
这里我们直接实现了afterCompletion,通过事务的status进行判断,我们应该具体发送哪个RPC。当然我们可以进一步封装 TransactionSynchronizationManager.registerSynchronization
将其封装成一个事务的Util,可以使我们的代码更加简洁。
通过这种方式我们不必把所有非DB操作都写在方法之外,这样代码更具有逻辑连贯性,更加易读,并且优雅。
这个注册事务的回调代码在我们在我们的业务逻辑中经常会出现,比如某个事务做完之后的刷新缓存,发送消息队列,发送通知消息等等,在日常的使用中,大家用这个基本也没出什么问题,但是在打压的过程中,发现了这一块出现了瓶颈,耗时特别久,通过一系列的监测,发现是从数据库连接池获取连接等待的时间较长,最终我们定位到了afterCompeltion这个动作,居然没有归还数据库连接。
在Spring的AbstractPlatformTransactionManager中,对commit处理的代码如下:
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
这里我们只需要关注 倒数几行代码即可,可以发现我们的triggerAfterCompletion,是倒数第二个执行逻辑,当执行完所有的代码之后就会执行我们的cleanupAfterCompletion,而我们的归还数据库连接也在这段代码之中,这样就导致了我们获取数据库连接变慢。
对于上面的问题如何优化呢?这里有三种方案可以进行优化:
将非DB操作提到事务之外,这种方法也就是我们上面最原始的方法,对于一些简单的逻辑可以提取,但是对于一些复杂的逻辑,比如事务的嵌套,嵌套里面调用了afterCompletion,这样做会增大很多工作量,并且很容易出现问题。
通过多线程异步去做,提升数据库连接池归还速度,这种适合于注册afterCompletion时写在事务最后的时候,直接将需要做的放在其它线程去做。但是如果注册afterCompletion的时候出现在我们事务之间,比如嵌套事务,就会导致我们要做的后续业务逻辑和事务并行。
模仿Spring事务回调注册,实现新的注解。上面两种方法都有各自的弊端,所以最后我们采用了这种方法,实现了一个自定义注解 @MethodCallBack
,再使用事务的上面都打上这个注解,然后通过类似的注册代码进行。
@Transactional @MethodCallBack public int createOrder(Order order){ orderDbStorage.save(order); orderItemDbStorage.save(order.getItems()); MethodCallbackHelper.registerOnSuccess(() -> sendSuccessedRpc()); MethodCallbackHelper.registerOnThrowable(throwable -> sendFailedRpc()); return order.getId(); }
通过第三种方法基本只需要把我们注册事务回调的地方都进行替换就可以正常使用了。
说了这么久大事务,到底什么才是大事务呢?简单点就是事务时间运行得长,那么就是大事务。一般来说导致事务时间运行时间长的因素不外乎下面几种:
数据操作得很多,比如在一个事务里面插入了很多数据,那么这个事务执行时间自然就会变得很长。
锁的竞争大,当所有的连接都同时对同一个数据进行操作,那么就会出现排队等待,事务时间自然就会变长。
事务中有其他非DB操作,比如一些RPC请求,有些人说我的RPC很快的,不会增加事务的运行时间,但是RPC请求本身就是一个不稳定的因素,受很多因素影响,网络波动,下游服务响应缓慢,如果这些因素一旦出现,就会有大量的事务时间很长,有可能导致Mysql挂掉,从而引起雪崩。
上面的三种情况,前面两种可能来说不是特别常见,但是第三种事务中有很多非DB操作,这个是我们非常常见,通常出现这个情况的原因很多时候是我们自己习惯规范,初学者或者一些经验不丰富的人写代码,往往会先写一个大方法,直接在这个方法加上事务注解,然后再往里面补充,哪管他是什么逻辑,一把梭,就像下面这张图一样:
当然还有些人是想搞什么分布式事务,可惜用错了方法,对于分布式事务可以关注Seata,同样可以用一个注解就能帮助你做到分布式事务。
其实最后想想,为什么会出现这种问题呢?一般大家的理解都是会认为都是在完成之后做的了,数据库连接肯定早都释放了,但是事实并非如此。所以,我们使用很多API的时候不能望文生义,如果其没有详细的doc,那么你应该更加深入了解其实现细节。
当然最后希望大家写代码之前尽量还是不要一把梭,认真对待每一句代码。
如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O: