转载

java并发编程 - 利用对象等待和通知机制实现一个等待超时的连接池

众所周知,Java的Object对象提供的,wait()和notify()/notifyAll()等接口是并发编程的重要组成部分。它们对多线程之间的协作起了非常重要的作用,实际开发中也有很多场景可以采用。废话少说,今天我们就用此机制来模拟实现一个jdbc支持等待超时模式的连接池。

一、模拟实现一个数据库连接接口

//类说明:空实现一个Connection接口(因为重点不在这里,所以以下接口中的方法只做简单处理)
public class SqlConnectImpl implements Connection{
   
   /*拿一个数据库连接*/
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }

   @Override
   public boolean isWrapperFor(Class<?> arg0) throws SQLException {
      // TODO Auto-generated method stub
      return false;
   }
   //因为重点不在这里,所以这里省略其它接口...
 }

二、实现数据库等待超时连接池的核心方法

//类说明:连接池的实现
DBPool {
    //模拟:数据库连接池
    LinkedList<Connection> pool = LinkedList<Connection>()(initialSize) {
        if(initialSize > ) {
            for(int i = 0;i < initialSize; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
    }

    //连接池:释放连接,通知其他线程
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool){
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }

    //连接池:获取连接使用
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool){
            //未设置超时,直接获取
            if(mills <0){
                while (pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }
            //设置超时
            long future = System.currentTimeMillis()+mills;/*超时时刻*/
            long remaining = mills;
            while (pool.isEmpty() && remaining > 0){
                pool.wait(remaining);
                //唤醒一次:重新计算等待时长
                remaining = future - System.currentTimeMillis();
            }
            Connection connection = null;
            if(!pool.isEmpty()){
                connection = pool.removeFirst();
            }
            return connection;
        }
    }
}

三、多线程并发模式下对连接池的访问

//类说明:数据库连接池测试类
public class DBPoolTest {
    static DBPool pool  = new DBPool(10);
    // 控制器:控制main线程将会等待所有Woker结束后才能继续执行
    static CountDownLatch end;

    public static void main(String[] args) throws Exception {
       // 线程数量
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        int count = 20;//每个线程的操作次数
        AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
        AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new Worker(count, got, notGot), 
                  "worker_"+i);
            thread.start();
        }
        end.await();// main线程在此处等待
        System.out.println("总共尝试了: " + (threadCount * count));
        System.out.println("拿到连接的次数:  " + got);
        System.out.println("没能连接的次数: " + notGot);
    }

    static class Worker implements Runnable {
        int           count;
        AtomicInteger got;
        AtomicInteger notGot;

        public Worker(int count, AtomicInteger got,
                               AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            while (count > 0) {
                try {
                    // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                    // 分别统计连接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                        System.out.println(Thread.currentThread().getName()
                              +"等待超时!");
                    }
                } catch (Exception ex) {
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}

四、测试结果报告

java并发编程 - 利用对象等待和通知机制实现一个等待超时的连接池

五、结束语

总结:1) 对wait()、notify()、notifyAll()等方法使用时须用,synchronized关键包裹(对象、方法或块)均可, 不包裹运行中必报错;

2) 线程在执行wait()方法在会自动释放持有的锁;

3) 线程在执行notify()或notifyAll()后,不会立即释放该线程持有的锁资源,只有在synchronized包裹的语句块或方法执行完毕后才会释放;

4) 采用notify()方法唤醒时只会随机唤醒一个线程,在多唤醒条件下不适用此方法。推荐使用notifyAll()唤醒所有与锁对象相关的所有线程;

原文  https://blog.51cto.com/14815984/2495815
正文到此结束
Loading...