synchronized是重量级的锁,在HikariPool中没有一处使用,都是通过Java并发工具类来解决线程安全问题。我们来看一些例子:
volatile关键字定义的变量并不能保证线程安全,但他能保证一个线程的修改对另外一个线程立即可见。例如在PoolEntry和ConcurrentBag中都使用了volatile关键字。
例如:
1.ConcurrentBag中用AtomicInteger来记录等待获取连接的线程数量。
2.HikariDataSource中用AtomicBoolean记录数据源是否已经关闭。
3.在PoolEntry中用AtomicIntegerFieldUpdater来更新PoolEntry的状态。
stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state"); 复制代码
这样使得PoolEntry类的state属性的更新可以保证原子性。
4.在ConcurrentBag中使用CopyOnWriteArrayList来记录数据库连接
CopyOnWriteArrayList适用于读多写少的场景,读取时不加锁,写时才加锁,但这样怎么保证线程安全?
通常我们设计一个资源池,会将未使用资源放入一个可用资源池中,如果池中还有资源就从池中取出,否则就等待或者超时报错,直到有新的资源回收到资源池中。
获取资源和释放资源的代码如下:
Resource resource = resourcePool.remove(); // 从池中获取资源,池中资源数量减少 reourcePool.add(resource); // 将资源释放会池中,池中资源数量增加 复制代码
为了保证线程安全,这两个方法均要用synchronized关键字修饰。
而在HikariPool中对于可用资源不是直接通过资源池的资源数量来决定,而是通过资源的状态来决定,资源定义了如下几个状态:
// 池化资源的状态定义 int STATE_NOT_IN_USE = 0; int STATE_IN_USE = 1; int STATE_REMOVED = -1; int STATE_RESERVED = -2; 复制代码
在获取资源时通过遍历资源池并判断资源状态得到可用资源:
//ConcurrentBag.java try { // 遍历所有资源 for (T bagEntry : sharedList) { // 这里非线程安全 // 获得未使用资源并更新状态为可用 if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // 这里是线程安全的 // If we may have stolen another waiter's connection, request another bag add. if (waiting > 1) { listener.addBagItem(waiting - 1); } return bagEntry; } } 复制代码
因此,虽然CopyOnWriteArrayList的读操作非线程安全,但可通过AtomicIntegerFieldUpdater来保证对池中的资源PoolEntry在状态更新时的线程安全,因此整个操作是线程安全的。
这样就避免了对池资源的出池和入池加锁,性能得到提升。
我们通过如何获取连接来看下HikariPool对性能的追求。
在上一节我们已经提及了如何获取资源,但实际的获取过程还不仅如此,HikariPool获取资源的过程如下:
//ConcurrentBag.java // Try the thread-local list first final List<Object> list = threadList.get(); for (int i = list.size() - 1; i >= 0; i--) { final Object entry = list.remove(i); @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } } 复制代码
我们知道ThrodLocal变量的特点是该变量在同一个线程中可见,这样可不需要通过方法参数传递变量,并且是线程安全得,而在一次业务操作中有可能多次获取数据库连接(注意:多个连接意味着事务问题需要解决),这时HikariPool会将释放的连接放入ThrodLocal变量中,当前线程如果要再次使用连接就可以直接从ThrodLocal变量中获取。
//ConcurrentBag.java final List<Object> threadLocalList = threadList.get(); if (threadLocalList.size() < 50) { threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); } 复制代码
这一步前面已介绍,从资源池中遍历资源,通过判断资源状态是否可用来获取资源。
一般的,当资源不足时,如果没有超过最大资源数限制,就会新建一个新资源并返回,而HikariPool不是,它的获取过程如下:
1.获取资源的线程获取资源
2.发现资源不足,则会异步调用创建资源的线程去创建资源
3.然后就开始等待资源返回
//ConcurrentBag.java // 异步调用创建资源线程创建资源,其中waiting是等待获取资源的线程数 listener.addBagItem(waiting); timeout = timeUnit.toNanos(timeout); do { final long start = currentTime(); // 等待获取资源 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000); 复制代码
4.创建资源线程异步创建资源,创建资源时会判断是否有需要获取资源的线程在等待资源,如果有才创建,否则就不创建
//HikariPool.java // connectionBag.getWaitingThreadCount() > 0 判断有等待资源的线程才会继续创建资源 private synchronized boolean shouldCreateAnotherConnection() { return getTotalConnections() < config.getMaximumPoolSize() && (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle()); } 复制代码
5.其他使用资源的线程使用完资源后,会释放资源,这时资源池中有了可用资源,会分给等待线程使用
//ConcurrentBag.java // 使用资源的线程释放资源 public void requite(final T bagEntry) { bagEntry.setState(STATE_NOT_IN_USE); for (int i = 0; waiters.get() > 0; i++) { if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) { return; } else if ((i & 0xff) == 0xff) { // 0xff 是255, 每隔256进去一次 parkNanos(MICROSECONDS.toNanos(10)); } else { yield(); } } final List<Object> threadLocalList = threadList.get(); if (threadLocalList.size() < 50) { threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); } } 复制代码
HikariPool这么做的好处是:
以上这个巧妙的处理方式借助了SynchronousQueue来实现,我们可以模拟下以上处理方式:
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueTest { // 入参为true,公平锁,保证FIFO private SynchronousQueue<PoolEntry> queue = new SynchronousQueue(true); public static void main(String[] args) throws InterruptedException { SynchronousQueueTest queueTest = new SynchronousQueueTest(); queueTest.execute(); } public void execute() { // 模拟生产者创建资源 new Producer("Producer-generate-poolentry", queue, 2000).start(); // 模拟其他消费者释放资源 new Producer("OtherConsumer-release-poolentry", queue, 5000).start(); // 等待上面两个线程启动 sleep(); // 模拟消费者 new Consumer(queue).start(); } private void sleep() { try { Thread.sleep(2000); } catch (InterruptedException E) { Thread.currentThread().interrupt(); } } } // 用来模拟生产者和释放资源的消费者 class Producer extends Thread { private SynchronousQueue<PoolEntry> queue; // 模拟执行耗时 private long executeCostTimeMillis; public Producer(String name, SynchronousQueue queue, long executeCostTimeMillis) { this.queue = queue; setName(name); this.executeCostTimeMillis = executeCostTimeMillis; } @Override public void run() { try { while(true) { int random = (int) (Math.random()*10); PoolEntry poolEntry = new PoolEntry(random); System.out.println(Thread.currentThread().getName() + ", " + poolEntry.toString()); // 资源入队 while(!queue.offer(poolEntry)) { yield(); } Thread.sleep(executeCostTimeMillis); } } catch (Exception e) { e.printStackTrace(); } } } class Consumer extends Thread { private SynchronousQueue<PoolEntry> queue; public Consumer(SynchronousQueue queue) { this.queue = queue; setName("Consumer"); } @Override public void run() { try { while(true) { long timeout = 200; // 资源出队 PoolEntry poolEntry = queue.poll(timeout, TimeUnit.MILLISECONDS); if (poolEntry != null) { System.out.println(Thread.currentThread().getName() + ", " + poolEntry.toString()); } else { // System.out.println("queue is null."); } } } catch (Exception e) { e.printStackTrace(); } } } // 资源类 class PoolEntry { int num; public PoolEntry(int i) { this.num = i; } @Override public String toString() { return "PoolEntry instance " + num; } } 复制代码
输出:
Producer-generate-poolentry, PoolEntry instance 4 OtherConsumer-release-poolentry, PoolEntry instance 5 Consumer, PoolEntry instance 5 Consumer, PoolEntry instance 4 Producer-generate-poolentry, PoolEntry instance 8 Consumer, PoolEntry instance 8 Producer-generate-poolentry, PoolEntry instance 7 Consumer, PoolEntry instance 7 OtherConsumer-release-poolentry, PoolEntry instance 6 Consumer, PoolEntry instance 6 Producer-generate-poolentry, PoolEntry instance 6 Consumer, PoolEntry instance 6 Producer-generate-poolentry, PoolEntry instance 2 Consumer, PoolEntry instance 2 OtherConsumer-release-poolentry, PoolEntry instance 1 Consumer, PoolEntry instance 1 Producer-generate-poolentry, PoolEntry instance 2 Consumer, PoolEntry instance 2 Producer-generate-poolentry, PoolEntry instance 3 Consumer, PoolEntry instance 3 Producer-generate-poolentry, PoolEntry instance 7 Consumer, PoolEntry instance 7 OtherConsumer-release-poolentry, PoolEntry instance 9 Consumer, PoolEntry instance 9 Producer-generate-poolentry, PoolEntry instance 7 Consumer, PoolEntry instance 7 复制代码
可以看出:
1.生产者和其他消费者谁先把资源入队,消费者就先使用哪个资源
2.没有可用资源,消费者会一致等待
在使用池化资源大并发场景下,又追求极致性能时,这种处理方式值得借鉴。
弱引用在调用垃圾回收后会被释放,对于通过ThreadLocal变量缓存的资源,为了避免线程生命周期结束后资源不被及时回收,使用了弱引用来存储资源,这样当内存不足,调用GC操作时就会被回收,减少内存占用。
//ConcurrentBag.java final List<Object> threadLocalList = threadList.get(); if (threadLocalList.size() < 50) { threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); } 复制代码
实现类使用空方法使得处理逻辑统一,不需要添加if判断来处理。类似编码规范中对于返回一个集合的方法,建议不要返回NULL,而返回一个大小为0的集合,这样外部处理逻辑统一,不需要额外增加为NULL的判断,或者引起空指针异常。
4.1.1、空方法实现类
//ProxyLeakTask.java static { // 不需要监控连接泄露的ProxyLeakTask的实现类 NO_LEAK = new ProxyLeakTask() { @Override void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {} @Override public void run() {} // 默认啥都不做 @Override public void cancel() {} // 默认啥都不做 }; } 复制代码
4.1.2、实例化
//ProxyLeakTaskFactory.java ProxyLeakTask schedule(final PoolEntry poolEntry) { // 根据配置来创建不同的代理泄露监控类 return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry); } 复制代码
4.1.3、调用点
//ProxyLeakTaskFactory.java private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) { ProxyLeakTask task = new ProxyLeakTask(poolEntry); // 这里就不用加是否为NULL的判断 task.schedule(executorService, leakDetectionThreshold); return task; } 复制代码
end.