=
=
=
Jedis客户端的调用流程 ( https://www.jianshu.com/p/7913f9984765 )
Redis的各种语言客户端列表,请参见 Redis Client 。其中Java客户端在github上start最高的是Jedis和Redisson。Jedis提供了完整Redis命令,而Redisson有更多分布式的容器实现。
首先通过maven引入Jedis的依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
创建Jedis对象,调用set方法,并通过get方法获取到值并打印:
Jedis jedis = new Jedis("localhost", 6379);
jedis.set("singleJedis", "hello jedis!");
System.out.println(jedis.get("singleJedis"));
jedis.close();
注意Jedis对象并不是线程安全的,在多线程下使用同一个Jedis对象会出现并发问题。为了避免每次使用Jedis对象时都需要重新构建,Jedis提供了 JedisPool 。 JedisPool 是基于Commons Pool 2实现的一个线程安全的连接池。
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10);
JedisPool pool = new JedisPool(jedisPoolConfig, "localhost", 6379);
Jedis jedis = null;
try{
jedis = pool.getResource();
jedis.set("pooledJedis", "hello jedis pool!");
System.out.println(jedis.get("pooledJedis"));
}catch(Exception e){
e.printStackTrace();
}finally {
//还回pool中
if(jedis != null){
jedis.close();
}
}
pool.close();
使用完后记得主动调一下close方法,将Jedis对象归还到连接池中。Jedis的close方法:
public void close() {
//如果使用了连接池,检查客户端状态,归还到池中
if (dataSource != null) {
if (client.isBroken()) {
this.dataSource.returnBrokenResource(this);
} else {
this.dataSource.returnResource(this);
}
} else {
//未使用连接池,直接关闭
client.close();
}
}
从类结构、启动、执行命令三个方面分析一下Jedis客户端。
Jedis类是整个客户端的入口,通过Jedis可以建立与Redis Server的连接并发送命令。Jedis继承自BinaryJedis,同时Jedis和BinaryJedis都实现了很多接口,通过一张类图来描述:
每一个接口都代表了一类Redis命令,例如JedisCommands中包含了 SET GET 等命令,MultiKeyCommands中包含了针对多个Key的 MSET MGET 等命令。一部分命令有两个版本的接口,如JedisCommands和BinaryJedisCommands。JedisCommands是字符串参数版本命令,会在Jedis内部将参数转换成UTF-8编码的字节数组。BinaryJedisCommands提供的是字节参数版本,允许用户自己决定编码等细节。ClusterCommands和SentinelCommands与集群、高可用等相关的命令只有一个版本。
JedisPool的构造函数:
public JedisPool(/** Commons Pool的参数 **/ final GenericObjectPoolConfig poolConfig,
/** Redis Server地址 **/ final URI uri,
/** 连接Redis Server超时时间**/ final int connectionTimeout,
/** 等待Response超时时间 **/ final int soTimeout) {
super(poolConfig, new JedisFactory(uri, connectionTimeout, soTimeout, null));
}
JedisFactory实现了PooledObjectFactory的makeObject方法,当调用 pool.getResource() 方法获取Jedis对象时,会通过makeObject方法创建了Jedis对象:
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);
在Jedis的构造函数中,会创建真正与Redis Server通信的Client对象:
client = new Client(host, port);
client.setConnectionTimeout(timeout);
client.setSoTimeout(timeout);
初始化Jedis对象并不会与Redis Server建立连接,连接发生在第一次执行命令时。
以 SET 命令为例,在Jedis中执行 SET 命令:
jedis.set("singleJedis", "hello jedis!");
调用Jedis的set方法,会委托给内部的client对象的set方法,在client的字符串版本set方法中,会进行String到byte[]的转换:
set(SafeEncoder.encode(key), SafeEncoder.encode(value));
SafeEncoder.encode 将字符串转换为UTF-8编码的字节数组,然后调用sendCommand方法:
sendCommand(Command.SET, key, value);
protected Connection sendCommand(final Command cmd, final byte[]... args) {
try {
connect();
Protocol.sendCommand(outputStream, cmd, args);
...
} catch (JedisConnectionException ex) {
...
}
}
省略了一部分异常处理的代码,在sendCommand方法中,建立了与Server的连接,并发送命令。connect方法会按照JedisPool构造函数中的参数,初始化Socket:
socket = new Socket();
//TIME_WAIT状态下可以复用端口
socket.setReuseAddress(true);
//空闲时发送数据包,确认服务端状态
socket.setKeepAlive(true);
//关闭Nagle算法,尽快发送
socket.setTcpNoDelay(true);
//调用close方法立即关闭socket,丢弃所有未发送的数据包
socket.setSoLinger(true, 0);
//连接server
socket.connect(new InetSocketAddress(host, port), connectionTimeout);
//设置读取时超时时间
socket.setSoTimeout(soTimeout);
sendCommand方法,按照Redis协议,发送命令:
private static void sendCommand(final RedisOutputStream os, final byte[] command,
final byte[]... args) {
try {
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
for (final byte[] arg : args) {
os.write(DOLLAR_BYTE);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
作者:yingzong
链接:https://www.jianshu.com/p/7913f9984765
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
JedisPool接受-Apache-common-pool的连接池技术
(https://blog.csdn.net/dlf123321/article/details/53907945)
这篇博客,就不怎么分析源码了,咱们尽量把观察的层次提高一下,主要分析流程。
下图是JedisPool里面用到的一些类(有些类我省略了,例如closeable)
PooledObjectFactory<T>,是一个泛型接口,里面有makeObjcet,destroyObject,validateObject等方法,就是生成一个对象,销毁一个对象,判定是否合法等的
PooledObject 是对基本对象的一个包装,包含基本对象的创建时间,状态等等
GenericObjectPool 里面包含了borrowObject与returnObject等方法
我们一般的从pool中取对象调用的是JedisPool的getResource方法
时序图如下:
在GenericObjectPool中有下面这个成员变量:
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
在上面时序图里第三步create前,会先在idleObjects里面找(idleObjects.pollFirst()),如果没有对象(这是一个队列),才去create,否则就直接返回idleobjects里面的。
释放连接,我们一般直接调用jedis的close方法。
如下图
最后那个LinkedBlockingDeque就是idleObjects。
当然,我上面的时序图,是我省略了很多很多的,但是主体就是上面的。
另外,pool的源码我自己看的也比较粗,如果文中有什么错误,还请各位大神一定指出。
有一个问题,如果我给pool设置的最少连接是20
那么这20个连接时连接池一开始就循环产生的,还是后面借一个,才生出一个?
换句话说就是勤生产还是懒生产。
答案是勤生产(准确的说是,自pool加载后timeBetweenEvictionRunsMillis毫秒开始生产这个连接对象,timeBetweenEvictionRunsMillis模式是30000毫秒)
timeBetweenEvictionRunsMillis毫秒秒检查一次连接池中空闲的连接,把空闲时间超过minEvictableIdleTimeMillis毫秒的连接断开,直到连接池中的连接数到minIdle为止
//GenericObjectPool.java
void ensureMinIdle() throws Exception {
ensureIdle(getMinIdle(), true);
}
private void ensureIdle(int idleCount, boolean always) throws Exception {
if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
return;
}
while (idleObjects.size() < idleCount) {
PooledObject<T> p = create();
if (p == null) {
// Can't create objects, no reason to think another call to
// create will work. Give up.
break;
}
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
=
=
=