转载

HikariPool源码(二)设计思想借鉴

HikariPool源码(二)设计思想借鉴
Java极客  |  作者  /  铿然一叶
这是 Java极客 的第 52 篇原创文章

1.利用Java并发工具而非synchronized来保证线程安全

synchronized是重量级的锁,在HikariPool中没有一处使用,都是通过Java并发工具类来解决线程安全问题。我们来看一些例子:

1.1、通过volatile关键字保证可见性

volatile关键字定义的变量并不能保证线程安全,但他能保证一个线程的修改对另外一个线程立即可见。例如在PoolEntry和ConcurrentBag中都使用了volatile关键字。

1.2、使用JUC包下的Atomic类

例如:

1.ConcurrentBag中用AtomicInteger来记录等待获取连接的线程数量。

2.HikariDataSource中用AtomicBoolean记录数据源是否已经关闭。

3.在PoolEntry中用AtomicIntegerFieldUpdater来更新PoolEntry的状态。

stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state");
复制代码

这样使得PoolEntry类的state属性的更新可以保证原子性。

4.在ConcurrentBag中使用CopyOnWriteArrayList来记录数据库连接

CopyOnWriteArrayList适用于读多写少的场景,读取时不加锁,写时才加锁,但这样怎么保证线程安全?

通常我们设计一个资源池,会将未使用资源放入一个可用资源池中,如果池中还有资源就从池中取出,否则就等待或者超时报错,直到有新的资源回收到资源池中。

HikariPool源码(二)设计思想借鉴

获取资源和释放资源的代码如下:

Resource resource = resourcePool.remove(); // 从池中获取资源,池中资源数量减少
reourcePool.add(resource);  // 将资源释放会池中,池中资源数量增加
复制代码

为了保证线程安全,这两个方法均要用synchronized关键字修饰。

而在HikariPool中对于可用资源不是直接通过资源池的资源数量来决定,而是通过资源的状态来决定,资源定义了如下几个状态:

// 池化资源的状态定义
      int STATE_NOT_IN_USE = 0;
      int STATE_IN_USE = 1;
      int STATE_REMOVED = -1;
      int STATE_RESERVED = -2;
复制代码

在获取资源时通过遍历资源池并判断资源状态得到可用资源:

//ConcurrentBag.java
      try {
         // 遍历所有资源
         for (T bagEntry : sharedList) {  // 这里非线程安全
            // 获得未使用资源并更新状态为可用
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // 这里是线程安全的
               // If we may have stolen another waiter's connection, request another bag add.
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }
复制代码

因此,虽然CopyOnWriteArrayList的读操作非线程安全,但可通过AtomicIntegerFieldUpdater来保证对池中的资源PoolEntry在状态更新时的线程安全,因此整个操作是线程安全的。

这样就避免了对池资源的出池和入池加锁,性能得到提升。

2、对性能的追求

我们通过如何获取连接来看下HikariPool对性能的追求。

在上一节我们已经提及了如何获取资源,但实际的获取过程还不仅如此,HikariPool获取资源的过程如下:

2.1、先从ThrodLocal变量中获取

//ConcurrentBag.java
      // Try the thread-local list first
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }
复制代码

我们知道ThrodLocal变量的特点是该变量在同一个线程中可见,这样可不需要通过方法参数传递变量,并且是线程安全得,而在一次业务操作中有可能多次获取数据库连接(注意:多个连接意味着事务问题需要解决),这时HikariPool会将释放的连接放入ThrodLocal变量中,当前线程如果要再次使用连接就可以直接从ThrodLocal变量中获取。

//ConcurrentBag.java
      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
复制代码

2.2、从资源池中获取

这一步前面已介绍,从资源池中遍历资源,通过判断资源状态是否可用来获取资源。

2.3、资源不足时获取资源的方式

一般的,当资源不足时,如果没有超过最大资源数限制,就会新建一个新资源并返回,而HikariPool不是,它的获取过程如下:

HikariPool源码(二)设计思想借鉴

1.获取资源的线程获取资源

2.发现资源不足,则会异步调用创建资源的线程去创建资源

3.然后就开始等待资源返回

//ConcurrentBag.java

         // 异步调用创建资源线程创建资源,其中waiting是等待获取资源的线程数
         listener.addBagItem(waiting);

         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            // 等待获取资源
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);
复制代码

4.创建资源线程异步创建资源,创建资源时会判断是否有需要获取资源的线程在等待资源,如果有才创建,否则就不创建

//HikariPool.java
      // connectionBag.getWaitingThreadCount() > 0 判断有等待资源的线程才会继续创建资源
      private synchronized boolean shouldCreateAnotherConnection() {
         return getTotalConnections() < config.getMaximumPoolSize() &&
            (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
      }
复制代码

5.其他使用资源的线程使用完资源后,会释放资源,这时资源池中有了可用资源,会分给等待线程使用

//ConcurrentBag.java
   // 使用资源的线程释放资源
   public void requite(final T bagEntry)
   {
      bagEntry.setState(STATE_NOT_IN_USE);

      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
         else if ((i & 0xff) == 0xff) { // 0xff 是255, 每隔256进去一次
            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
            yield();
         }
      }

      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
   }
复制代码

HikariPool这么做的好处是:

  1. 图中a和b操作谁先执行完就用谁的资源,大并发情况下,也可能b比a快,这样性能有提升。
  2. 如果b先执行完,等待线程获取到资源后,如果没有新的等待线程,a就不会创建新资源,这样就节省了一个资源,少了占用连接,也节省了内存。

以上这个巧妙的处理方式借助了SynchronousQueue来实现,我们可以模拟下以上处理方式:

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueTest {
    // 入参为true,公平锁,保证FIFO
    private SynchronousQueue<PoolEntry> queue = new SynchronousQueue(true);

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueueTest queueTest = new SynchronousQueueTest();
        queueTest.execute();
    }

    public void execute() {
        // 模拟生产者创建资源
        new Producer("Producer-generate-poolentry", queue, 2000).start();
        // 模拟其他消费者释放资源
        new Producer("OtherConsumer-release-poolentry", queue, 5000).start();
        // 等待上面两个线程启动
        sleep();
        // 模拟消费者
        new Consumer(queue).start();
    }

    private void sleep() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException E) {
            Thread.currentThread().interrupt();
        }
    }
}

// 用来模拟生产者和释放资源的消费者
class Producer extends Thread {
    private SynchronousQueue<PoolEntry> queue;

    // 模拟执行耗时
    private long executeCostTimeMillis;

    public Producer(String name, SynchronousQueue queue, long executeCostTimeMillis) {
        this.queue = queue;
        setName(name);
        this.executeCostTimeMillis = executeCostTimeMillis;
    }


    @Override
    public void run() {
        try {
            while(true) {
                int random = (int) (Math.random()*10);
                PoolEntry poolEntry = new PoolEntry(random);
                System.out.println(Thread.currentThread().getName() + ", " + poolEntry.toString());
                // 资源入队
                while(!queue.offer(poolEntry)) {
                    yield();
                }
                Thread.sleep(executeCostTimeMillis);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread {
    private SynchronousQueue<PoolEntry> queue;

    public Consumer(SynchronousQueue queue) {
        this.queue = queue;
        setName("Consumer");
    }

    @Override
    public void run() {
        try {
            while(true) {
                long timeout = 200;
                // 资源出队
                PoolEntry poolEntry = queue.poll(timeout, TimeUnit.MILLISECONDS);
                if (poolEntry != null) {
                    System.out.println(Thread.currentThread().getName() + ", " + poolEntry.toString());
                } else {
//                    System.out.println("queue is null.");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// 资源类
class PoolEntry {
    int num;
    public PoolEntry(int i) {
        this.num = i;
    }

    @Override
    public String toString() {
        return "PoolEntry instance " + num;
    }
}
复制代码

输出:

Producer-generate-poolentry, PoolEntry instance 4
OtherConsumer-release-poolentry, PoolEntry instance 5
Consumer, PoolEntry instance 5
Consumer, PoolEntry instance 4
Producer-generate-poolentry, PoolEntry instance 8
Consumer, PoolEntry instance 8
Producer-generate-poolentry, PoolEntry instance 7
Consumer, PoolEntry instance 7
OtherConsumer-release-poolentry, PoolEntry instance 6
Consumer, PoolEntry instance 6
Producer-generate-poolentry, PoolEntry instance 6
Consumer, PoolEntry instance 6
Producer-generate-poolentry, PoolEntry instance 2
Consumer, PoolEntry instance 2
OtherConsumer-release-poolentry, PoolEntry instance 1
Consumer, PoolEntry instance 1
Producer-generate-poolentry, PoolEntry instance 2
Consumer, PoolEntry instance 2
Producer-generate-poolentry, PoolEntry instance 3
Consumer, PoolEntry instance 3
Producer-generate-poolentry, PoolEntry instance 7
Consumer, PoolEntry instance 7
OtherConsumer-release-poolentry, PoolEntry instance 9
Consumer, PoolEntry instance 9
Producer-generate-poolentry, PoolEntry instance 7
Consumer, PoolEntry instance 7
复制代码

可以看出:

1.生产者和其他消费者谁先把资源入队,消费者就先使用哪个资源

2.没有可用资源,消费者会一致等待

在使用池化资源大并发场景下,又追求极致性能时,这种处理方式值得借鉴。

3、使用弱引用节省内存

弱引用在调用垃圾回收后会被释放,对于通过ThreadLocal变量缓存的资源,为了避免线程生命周期结束后资源不被及时回收,使用了弱引用来存储资源,这样当内存不足,调用GC操作时就会被回收,减少内存占用。

//ConcurrentBag.java
      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
复制代码

4、使用空方法使得代码处理逻辑统一

实现类使用空方法使得处理逻辑统一,不需要添加if判断来处理。类似编码规范中对于返回一个集合的方法,建议不要返回NULL,而返回一个大小为0的集合,这样外部处理逻辑统一,不需要额外增加为NULL的判断,或者引起空指针异常。

4.1、ProxyLeakTask

4.1.1、空方法实现类

//ProxyLeakTask.java
   static
   {
      // 不需要监控连接泄露的ProxyLeakTask的实现类
      NO_LEAK = new ProxyLeakTask() {
         @Override
         void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}

         @Override
         public void run() {}  // 默认啥都不做

         @Override
         public void cancel() {} // 默认啥都不做
      };
   }
复制代码

4.1.2、实例化

//ProxyLeakTaskFactory.java
   ProxyLeakTask schedule(final PoolEntry poolEntry)
   {
      // 根据配置来创建不同的代理泄露监控类
      return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry);
   }
复制代码

4.1.3、调用点

//ProxyLeakTaskFactory.java
   private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
      ProxyLeakTask task = new ProxyLeakTask(poolEntry);
      // 这里就不用加是否为NULL的判断
      task.schedule(executorService, leakDetectionThreshold);

      return task;
   }
复制代码

5、总结

  1. 充分利用JUC工具解决并发问题和提升性能。
  2. 池化资源可以通过资源状态来获取可用资源,而不需要通过idle池的出队,入队来获取,减少锁的使用,提高性能。
  3. 在对使用内存有严格要求时,例如低端机不能占用过多内存时,使用好弱引用,软引用。
  4. 极致性能要考虑很多细节,如文中获取资源的例子,一般情况下不会想这么细。
  5. 使用空方法实现来统一外部处理逻辑。

end.

原文  https://juejin.im/post/5e881033f265da47c712022c
正文到此结束
Loading...