数据存储库通常是超高要求系统的瓶颈。在这些系统中,正在执行的查询数量非常大。DelayedBatchExecutor是一个用于减少所需查询数量的组件,通过在Java多线程应用程序中对所需查询进行批处理。
假设有一个对关系数据库执行查询的Java应用程序,以便在给定其唯一标识符(id)的情况下检索Product实体(row)。
查询如下所示:
SELECT * FROM PRODUCT WHERE ID =
现在,检索n个Products,有如下两种方法:
SELECT * FROM PRODUCT WHERE ID = SELECT * FROM PRODUCT WHERE ID = ... SELECT * FROM PRODUCT WHERE ID =
-- Example using IN OPERATOR SELECT * FROM PRODUCT WHERE ID IN (, , ..., )
后者在网络流量和数据库服务器资源(CPU和磁盘)方面更为有效,因为:
这不仅适用于SELECT操作,而且适用于其他操作,例如 INSERTs,UPDATEs和DELETEs。实际上,JDBC API包括上述操作的批量处理操作。
同样的情况也适用于NoSQL存储库,其中大多都明确提供BULK操作。
需要从数据库中检索数据的Java应用程序,如REST微服务或异步消息处理器,通常以多线程应用程序(*1)实现,其中:
在这种场景下,数据库很可能在较短的时间间隔内多次执行相同的查询。
如前所述,如果将1个参数的n个查询替换为具有n个参数的单个等效查询,那么则应用程序将使用较少的数据库服务器和网络资源。
好消息是它可以通过timewindows(时间窗口)的机制来实现,如下所示:
第一个尝试执行查询的线程会打开一个时间窗口,因此其参数被存储在一个列表中,同时该线程被暂停。在时间窗口内执行相同查询的其余线程会将其参数添加到列表中,并且也会被暂停。此时,数据库上未执行任何查询。
时间窗口结束或列表已满(先前已定义最大容量限制)后,将使用列表中存储的所有参数执行单个查询。最后,一旦数据库提供了该查询的结果,每个线程将接收相应的结果,同时所有线程将自动恢复。
笔者构建了一个简单而轻量级的应用机制(DelayedBatchExecutor),很容易在新的或现有的应用程序中使用。它基于Reactor库,并且为参数列表使用超时的Flux缓冲发布器。
假设针对Products的REST微服务公开了一个端点,用于检索数据库中给定的 productId的Product数据。在没有DelayedBatchExecutor的情况下,如果每秒对端点命中200次,则数据库每秒执行200个查询。如果端点使用的DelayedBatchExecutor 配置了50毫秒的时间窗口且最大容量=10个参数,数据库每秒钟将只执行10个参数的20个查询,代价是每执行一个线程,最多在50毫秒内增加延时(*2)。
换句话说,为了将延时增加50毫秒(* 2),数据库每秒接收的查询减少了10倍,然而保持了系统的整体吞吐量。还不错!!
其他有趣的配置:
深入研究Product微服务示例。假设对于每个传入的HTTP请求,微服务的控制器都要求检索已有id的Product(Java Bean),因此将调用以下方法:
DAO组件(ProductDAO)的public Product getProductById(IntegerproductId) .
以下分别是有和没有 DelayedBatchExecutor的DAO执行。
没有 DelayedBatchExecutor
public classProductDAO { public Product getProductById(Integer id) { Product product= ...// execute the query SELECT * FROM PRODUCT WHERE ID= // using your favourite API: JDBC, JPA, Hibernate... return product; } ... }
有DelayedBatchExecutor
// Singleton publicclass ProductDAO { DelayedBatchExecutor2 delayedBatchExecutorProductById = DelayedBatchExecutor.define(Duration.ofMillis(50), 10, this::retrieveProductsByIds); public Product getProductById(Integer id) { Product product = delayedBatchExecutorProductById.execute(id); return product; } private List retrieveProductsByIds(List idList) { List productList = ...// execute query:SELECT * FROM PRODUCT WHERE ID IN (idList.get(0), ..., idList.get(n)); // using your favourite API: JDBC, JPA, Hibernate... // The positions of the elements of the list to return must match the ones in the parameters list. // For instance, the first Product of the list to be returned must be the one with // the Id in the first position of productIdsList and so on... // NOTE: null could be used as value, meaning that no Product exist for the given productId return productList; } ... }
首先,必须在DAO中创建一个DelayedBatchExecutor实例,在本例中为 delayedBatchExecutorProductById。需要以下三个参数:
其次,已经重构了DAO方法 publicProduct getProductById(Integer productId),以简单调用delayedBatchExecutorProductById 实例的execute 方法。所有的“magic”都是由 DelayedBatchExecutor完成的。
之所以delayedBatchExecutorProductById是DelayedBatchExecutor2
如果execute方法需要接收两个参数(例如,一个 Integer和一个String)并返回Product实例,则定义为 DelayedBatchExecutor3
最终,retrieveProductsByIds方法必须返回List
如果使用的是 DelayedBatchExecutor3
就是这样。
一旦运行,执行控制器逻辑的并发线程会在某时刻调用方法 getProductById(Integerid) ,并且此方法将返回对应的Product。并发线程不知自己已经被 DelayedBatchExecutor暂停并恢复了。
尽管本文与数据存储库有关,但 DelayedBatchExecutor也可以用在其他地方,例如:对REST进行微服务请求。再说,用1个参数启动n个GET请求要比使用n个参数启动1个GET昂贵得多。
DelayedBatchExecutor的优化
笔者创建了 DelayedBatchExecutor并使用了一段时间,有效地解决了个人项目中并发线程启动的多个查询的执行问题。因此相信它对其他人可能也有用处,所以决定将其公开。
话虽如此,DelayedBatchExecutor改进和功能扩展的空间还很大。最有趣的是能够根据执行的特定条件动态更改DelayedBatchExecutor参数(窗口时间和最大容量)的功能,以便在利用带有n个参数的查询时很大程度地减少延时。