转载

手写一个 Jedis 以及 JedisPool

欢迎将公众号设置为星标,技术文章第一时间看到。我们将一如既往精选技术好文,提供有价值的阅读。如果文章对你有帮助,欢迎点个在看鼓励作者。

技术经验交流: 点击入群

Redis、Jedis的用途就不介绍了,不了解的可以先去官网:

https://www.redis.net.cn/tutorial/3501.html

本文章着重讲解如何手动实现一个类似jedis的工具。

1. 源码探索

经过源码的研究,可以发现Jedis的实现是基于Socket,可以从Jedis的set(key,value)方法开始追溯:

public void set(String key, String value) {
this.set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
public void set(byte[] key, byte[] value) {
this.sendCommand(Command.SET, new byte[][]{key, value});
}
public static void sendCommand(RedisOutputStream os, Protocol.Command command, byte[]... args) {
sendCommand(os, command.raw, args);
}

private static void sendCommand(RedisOutputStream os, byte[] command, byte[]... args) {
try {
os.write((byte)42);
os.writeIntCrLf(args.length + 1);
os.write((byte)36);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
byte[][] var3 = args;
int var4 = args.length;

for(int var5 = 0; var5 < var4; ++var5) {
byte[] arg = var3[var5];
os.write((byte)36);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}

} catch (IOException var7) {
throw new JedisConnectionException(var7);
}
}
System.out.println((char)42);
System.out.println(((char)36));

控制台:

*

$

可以大体的看出他得实现过程。把sendCommand方法翻译一下就是:

os.write("*".getBytes());
os.write(//数组长度/r/n);
os.write("$".getBytes());
os.write(//命令长度/r/n)
os.write(命令/r/n);
然后进入循环写入:
$+参数长度/r/n
参数/r/n

整理一下:就是Redis的通信协议,官方文档:https://redis.io/topics/protocol 。

总结一下就是以下几点:

*3   // 数据一共有三个数组
//数组1
$6 //下行为6个长度的字符串
APPEND
//数组2
$5 // 下行为5个长度的字符串
fantj
//数组3
$3 // 下行为3个长度的字符串
666

手写Jedis

通过源码可以看到,它在Connection类中进行sendCommend,那我们也一样:

Connection.java

负责与Redis的Server端建立连接并获取反馈信息。

package com.fantj.jedis.connect;

import com.fantj.jedis.protocol.Protocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
* 连接类
* 在这里进行创建连接并处理IO请求,用inputStream进行数据回显,
* 提供OutputStream给协议层,以便让其给服务端发送命令
*/

public class Connection {
private String host = "localhost";
private int port = 6379;
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;

public Connection() {
}

public Connection(String host) {
this.host = host;
}

public Connection(String host, int port) {
this.host = host;
this.port = port;
}
public Connection sendCommand(Protocol.Command cmd, byte[]... args) {
connect();
Protocol.sendCommand(outputStream, cmd, args);
return this;
}

private void connect() {

try {
if (socket == null) { //IO复用
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 操作状态的返回
* 比如:SET 操作成功返回 +OK
*/

public String getStatus() {
byte[] bytes = new byte[1024];
try {
socket.getInputStream().read(bytes);
} catch (IOException e) {
e.printStackTrace();
}
return new String(bytes);
}
}

而真正的sendCommend实现是在Protocol类中实现(Jedis源码也是这样):

Protocol.java

负责提供RESP协议支持和拼接。

package com.fantj.jedis.protocol;

import java.io.IOException;
import java.io.OutputStream;

/**
* RESP协议
* 详见; https://redis.io/topics/protocol
*/

public class Protocol {
// jedis后来将这些常量优化为byte,在os进行写出的时候对其进行char转型
private static final String DOLLAR_BYTE = "$";
private static final String ASTERISK_BYTE = "*";
public static final byte PLUS_BYTE = 43;
public static final byte MINUS_BYTE = 45;
public static final byte COLON_BYTE = 58;
private static final String BLANK_BYTE = "/r/n";

/**
* 拼接RESP 并 发送write
*/

public static void sendCommand(OutputStream os,Protocol.Command cmd, byte[] ... args){
// 1. 生成协议 *3 $3 SET $3 key $5 value
StringBuffer stringBuffer = new StringBuffer();
// 1.1 数组长度 *3
stringBuffer.append(ASTERISK_BYTE).append(args.length+1).append(BLANK_BYTE);
// 1.2 命令长度 $3
stringBuffer.append(DOLLAR_BYTE).append(cmd.name().length()).append(BLANK_BYTE);
// 1.3 命令 SET / GET
stringBuffer.append(cmd).append(BLANK_BYTE);
for (byte[] arg: args){
// 1.4 key/value 长度
stringBuffer.append(DOLLAR_BYTE).append(arg.length).append(BLANK_BYTE);
// 1.5 key/value
stringBuffer.append(new String(arg)).append(BLANK_BYTE);
}
// 写出到服务端
try {
os.write(stringBuffer.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}

}


/**
* 定义一个枚举类 存放命令
*/

public static enum Command{
SET , GET , KEYS, APPEND
}}

Client.java

对外提供API。

package com.fantj.jedis.client;

import com.fantj.jedis.connect.Connection;
import com.fantj.jedis.protocol.Protocol;

/**
* 客户端
* 给开发人员使用提供API
*/
public class Client {
private Connection connection;
public Client(String host, int port){
connection = new Connection(host,port);
}

public String set(String key, String value) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes());
return connection.getStatus();
}

public String get(String key) {
connection.sendCommand(Protocol.Command.GET, key.getBytes());
return connection.getStatus();
}


public void set(String key, String value, String nx, String ex, int i) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes(), nx.getBytes(), ex.getBytes(), String.valueOf(i).getBytes());
}
public void append(String key, String value){
connection.sendCommand(Protocol.Command.APPEND, key.getBytes(),value.getBytes());
}
}

Jedis.java

对Client类的进一层封装,留给开发人员使用。

package com.fantj.jedis.client;

public class Jedis extends Client{

public Jedis(String host, int port) {
super(host, port);
}

@Override
public String set(String key, String value) {
return super.set(key, value);
}

@Override
public String get(String key) {
return super.get(key);
}

@Override
public void set(String key, String value, String nx, String ex, int i) {
super.set(key, value, nx, ex, i);
}

@Override
public void append(String key, String value) {
super.append(key, value);
}
}

测试

/**
* 测试我们自己写的客户端
*/

public class Main {
private Jedis client = new Jedis("www.xxx.top",6380);
@Test
public void set()
{
client.set("fantj","fantj");
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void setNx()
{
client.set("fantj","fantj","NX","EX",10000);
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void append()
{
// client.append("fantj","-2019");
String fantj = client.get("fantj");
System.out.println(fantj);
}
@Test
public void testChar()
{
System.out.println((char)42);
System.out.println(((char)36));
}
}

测试都可以通过。

总结一下原理

假设我执行append("fantj","666")这个命令,那客户端的操作过程:

1、RESP协议对命令的分析

*3   // 数据一共有三个数组
//数组1
$6 //下行为6个长度的字符串
APPEND
//数组2
$5 // 下行为5个长度的字符串
fantj
//数组3
$3 // 下行为3个长度的字符串
666

2、然后我们进行拼接

StringBuffer sb = new StringBuffer();
// 注意每个类型表示完后都进行换行
sb.append("*").append("3").append("/r/n");
sb.append("$").append("6").append("/r/n");
sb.append("APPEND").append("/r/n");
sb.append("$").append("5").append("/r/n");
sb.append("fantj").append("/r/n");
sb.append("$").append("3").append("/r/n");
sb.append("666").append("/r/n");

3、然后用os进行写出

os.write(sb.toString().getBytes());

连接池的实现

Pool<T> .java

/**
* 连接池契约
*/

public interface Pool<T> {
/**
* 初始化连接池
* @param maxTotal 最大连接数
* @param maxWaitMillis 最大等待时间
*/

public void init(int maxTotal, long maxWaitMillis);

/**
* 获取连接
* @return 返回jedis对象
*/

public Jedis getResource() throws Exception;

/**
* 释放连接
*/

public void release(T t);
}

JedisPool.java

实现方式与ExcutorThreadPool工作流程类似,注释写的挺全的就不做详细解读了。

package com.fantj.jedis.pool;

import com.fantj.jedis.client.Jedis;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class JedisPool implements Pool<Jedis>{

public JedisPool(String url, int port) {
this.url = url;
this.port = port;
init(maxTotal,maxWaitMillis);
}

public JedisPool(String url, int port, int maxTotal, long maxWaitMillis) {
this.url = url;
this.port = port;
this.maxTotal = maxTotal;
this.maxWaitMillis = maxWaitMillis;
}

private String url;
private int port;
private int maxTotal = 20;
private long maxWaitMillis = 1000;
// 空闲的连接queue
private LinkedBlockingQueue<Jedis> idleWorkQueue = null;
private Queue<Jedis> activeWorkQueue = null;
// 当前连接数量
private AtomicInteger count = new AtomicInteger(0);
@Override
public void init(int maxTotal, long maxWaitMillis) {
maxTotal = maxTotal;
maxWaitMillis = maxWaitMillis;
idleWorkQueue = new LinkedBlockingQueue<>(maxTotal);
activeWorkQueue = new LinkedBlockingQueue<>(maxTotal);
}

@Override
public Jedis getResource() throws Exception {
Jedis jedis = null;
// 1. 记录开始时间,检测超时
long startTime = System.currentTimeMillis();
while (true){
// 2. 从空闲队列中获取连接,如果拿到,一式两份存放到活动队列
jedis = idleWorkQueue.poll();
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
// 3. 如果失败,判断池是否满,没满则创建
if (count.get() < maxTotal){
if (count.incrementAndGet() <= maxTotal){
jedis = new Jedis(url,port);
activeWorkQueue.offer(jedis);
System.out.printf("创建了一个新的连接: %s /r/n", jedis.toString());
return jedis;
}else {
count.decrementAndGet();
}
}
// 4. 如果连接池满了,则在超时时间内进行等待
try {
jedis = idleWorkQueue.poll(maxWaitMillis-(System.currentTimeMillis()-startTime), TimeUnit.MILLISECONDS);
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 5. poll可能被中断,所以在这里再进行超时判断
if (maxWaitMillis < (System.currentTimeMillis()-startTime)){
throw new RuntimeException("JedisPool: jedis connect timeout");
}
}
}

@Override
public void release(Jedis jedis) {
if (activeWorkQueue.remove(jedis)){
idleWorkQueue.offer(jedis);
}
}
}

GitHub地址: https://github.com/fantj2016/easy-jedis

(点击文字可跳转)

1.   深究Spring中Bean的生命周期

2.  深入SpringBoot核心注解原理

3. 线上环境部署概览

4. Springboot Vue shiro 实现前后端分离、权限控制

手写一个 Jedis 以及 JedisPool

原文  https://mp.weixin.qq.com/s/4RjyyYVU95I7R7v9MKutEA
正文到此结束
Loading...