转载

Jedis客户端的调用流程-JedisPool技术-Apache-common-pool的连接池

=

=

=

Jedis客户端的调用流程  ( https://www.jianshu.com/p/7913f9984765  )

Redis的各种语言客户端列表,请参见 Redis Client 。其中Java客户端在github上start最高的是Jedis和Redisson。Jedis提供了完整Redis命令,而Redisson有更多分布式的容器实现。

使用Jedis客户端

首先通过maven引入Jedis的依赖:

<dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.8.0</version>
    </dependency>

创建Jedis对象,调用set方法,并通过get方法获取到值并打印:

Jedis jedis = new Jedis("localhost", 6379);
    jedis.set("singleJedis", "hello jedis!");
    System.out.println(jedis.get("singleJedis"));
    jedis.close();

注意Jedis对象并不是线程安全的,在多线程下使用同一个Jedis对象会出现并发问题。为了避免每次使用Jedis对象时都需要重新构建,Jedis提供了 JedisPoolJedisPool 是基于Commons Pool 2实现的一个线程安全的连接池。

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
    jedisPoolConfig.setMaxTotal(10);
    JedisPool pool = new JedisPool(jedisPoolConfig, "localhost", 6379);

    Jedis jedis = null;
    try{
        jedis = pool.getResource();
        jedis.set("pooledJedis", "hello jedis pool!");
        System.out.println(jedis.get("pooledJedis"));
    }catch(Exception e){
        e.printStackTrace();
    }finally {
        //还回pool中
        if(jedis != null){
            jedis.close();
        }
    }

    pool.close();

使用完后记得主动调一下close方法,将Jedis对象归还到连接池中。Jedis的close方法:

public void close() {
    //如果使用了连接池,检查客户端状态,归还到池中
    if (dataSource != null) {
      if (client.isBroken()) {
        this.dataSource.returnBrokenResource(this);
      } else {
        this.dataSource.returnResource(this);
      }
    } else {
      //未使用连接池,直接关闭
      client.close();
    }
  }

Jedis代码分析

从类结构、启动、执行命令三个方面分析一下Jedis客户端。

Jedis类结构

Jedis类是整个客户端的入口,通过Jedis可以建立与Redis Server的连接并发送命令。Jedis继承自BinaryJedis,同时Jedis和BinaryJedis都实现了很多接口,通过一张类图来描述:

Jedis客户端的调用流程-JedisPool技术-Apache-common-pool的连接池

每一个接口都代表了一类Redis命令,例如JedisCommands中包含了 SET GET 等命令,MultiKeyCommands中包含了针对多个Key的 MSET MGET 等命令。一部分命令有两个版本的接口,如JedisCommands和BinaryJedisCommands。JedisCommands是字符串参数版本命令,会在Jedis内部将参数转换成UTF-8编码的字节数组。BinaryJedisCommands提供的是字节参数版本,允许用户自己决定编码等细节。ClusterCommands和SentinelCommands与集群、高可用等相关的命令只有一个版本。

Jedis的初始化

JedisPool的构造函数:

public JedisPool(/** Commons Pool的参数 **/ final GenericObjectPoolConfig poolConfig,
                   /** Redis Server地址 **/ final URI uri,
                   /** 连接Redis Server超时时间**/ final int connectionTimeout,
                   /** 等待Response超时时间 **/ final int soTimeout) {
    super(poolConfig, new JedisFactory(uri, connectionTimeout, soTimeout, null));
  }

JedisFactory实现了PooledObjectFactory的makeObject方法,当调用 pool.getResource() 方法获取Jedis对象时,会通过makeObject方法创建了Jedis对象:

final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);

在Jedis的构造函数中,会创建真正与Redis Server通信的Client对象:

client = new Client(host, port);
    client.setConnectionTimeout(timeout);
    client.setSoTimeout(timeout);

初始化Jedis对象并不会与Redis Server建立连接,连接发生在第一次执行命令时。

Jedis执行命令

SET 命令为例,在Jedis中执行 SET 命令:

jedis.set("singleJedis", "hello jedis!");

调用Jedis的set方法,会委托给内部的client对象的set方法,在client的字符串版本set方法中,会进行String到byte[]的转换:

set(SafeEncoder.encode(key), SafeEncoder.encode(value));

SafeEncoder.encode 将字符串转换为UTF-8编码的字节数组,然后调用sendCommand方法:

sendCommand(Command.SET, key, value);

protected Connection sendCommand(final Command cmd, final byte[]... args) {
    try {
      connect();
      Protocol.sendCommand(outputStream, cmd, args);
      ...
    } catch (JedisConnectionException ex) {
      ...
    }
}

省略了一部分异常处理的代码,在sendCommand方法中,建立了与Server的连接,并发送命令。connect方法会按照JedisPool构造函数中的参数,初始化Socket:

socket = new Socket();
    //TIME_WAIT状态下可以复用端口
    socket.setReuseAddress(true);
    //空闲时发送数据包,确认服务端状态
    socket.setKeepAlive(true);
    //关闭Nagle算法,尽快发送
    socket.setTcpNoDelay(true);
    //调用close方法立即关闭socket,丢弃所有未发送的数据包
    socket.setSoLinger(true, 0);
    //连接server
    socket.connect(new InetSocketAddress(host, port), connectionTimeout);
    //设置读取时超时时间
    socket.setSoTimeout(soTimeout);

sendCommand方法,按照Redis协议,发送命令:

private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();

      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }

作者:yingzong

链接:https://www.jianshu.com/p/7913f9984765

來源:简书

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

JedisPool接受-Apache-common-pool的连接池技术

(https://blog.csdn.net/dlf123321/article/details/53907945)

这篇博客,就不怎么分析源码了,咱们尽量把观察的层次提高一下,主要分析流程。

下图是JedisPool里面用到的一些类(有些类我省略了,例如closeable)

Jedis客户端的调用流程-JedisPool技术-Apache-common-pool的连接池

PooledObjectFactory<T>,是一个泛型接口,里面有makeObjcet,destroyObject,validateObject等方法,就是生成一个对象,销毁一个对象,判定是否合法等的

PooledObject 是对基本对象的一个包装,包含基本对象的创建时间,状态等等

GenericObjectPool 里面包含了borrowObject与returnObject等方法

我们一般的从pool中取对象调用的是JedisPool的getResource方法

时序图如下:

Jedis客户端的调用流程-JedisPool技术-Apache-common-pool的连接池

在GenericObjectPool中有下面这个成员变量:

private final LinkedBlockingDeque<PooledObject<T>> idleObjects;

在上面时序图里第三步create前,会先在idleObjects里面找(idleObjects.pollFirst()),如果没有对象(这是一个队列),才去create,否则就直接返回idleobjects里面的。

释放连接,我们一般直接调用jedis的close方法。

如下图

Jedis客户端的调用流程-JedisPool技术-Apache-common-pool的连接池

最后那个LinkedBlockingDeque就是idleObjects。

当然,我上面的时序图,是我省略了很多很多的,但是主体就是上面的。

另外,pool的源码我自己看的也比较粗,如果文中有什么错误,还请各位大神一定指出。

有一个问题,如果我给pool设置的最少连接是20

那么这20个连接时连接池一开始就循环产生的,还是后面借一个,才生出一个?

换句话说就是勤生产还是懒生产。

答案是勤生产(准确的说是,自pool加载后timeBetweenEvictionRunsMillis毫秒开始生产这个连接对象,timeBetweenEvictionRunsMillis模式是30000毫秒)

timeBetweenEvictionRunsMillis毫秒秒检查一次连接池中空闲的连接,把空闲时间超过minEvictableIdleTimeMillis毫秒的连接断开,直到连接池中的连接数到minIdle为止

Jedis客户端的调用流程-JedisPool技术-Apache-common-pool的连接池

//GenericObjectPool.java  
   void ensureMinIdle() throws Exception {  
       ensureIdle(getMinIdle(), true);  
   }  
  
  
   private void ensureIdle(int idleCount, boolean always) throws Exception {  
       if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {  
           return;  
       }  
  
  
       while (idleObjects.size() < idleCount) {  
           PooledObject<T> p = create();  
           if (p == null) {  
               // Can't create objects, no reason to think another call to  
               // create will work. Give up.  
               break;  
           }  
           if (getLifo()) {  
               idleObjects.addFirst(p);  
           } else {  
               idleObjects.addLast(p);  
           }  
       }  
       if (isClosed()) {  
           // Pool closed while object was being added to idle objects.  
           // Make sure the returned object is destroyed rather than left  
           // in the idle object pool (which would effectively be a leak)  
           clear();  
       }  
   }  

=

=

=

原文  http://fantaxy025025.iteye.com/blog/2423894
正文到此结束
Loading...