欢迎将公众号设置为星标,技术文章第一时间看到。我们将一如既往精选技术好文,提供有价值的阅读。如果文章对你有帮助,欢迎点个在看鼓励作者。
技术经验交流: 点击入群
Redis、Jedis的用途就不介绍了,不了解的可以先去官网:
https://www.redis.net.cn/tutorial/3501.html
本文章着重讲解如何手动实现一个类似jedis的工具。
经过源码的研究,可以发现Jedis的实现是基于Socket,可以从Jedis的set(key,value)方法开始追溯:
public void set(String key, String value) {
this.set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
public void set(byte[] key, byte[] value) {
this.sendCommand(Command.SET, new byte[][]{key, value});
}
public static void sendCommand(RedisOutputStream os, Protocol.Command command, byte[]... args) {
sendCommand(os, command.raw, args);
}
private static void sendCommand(RedisOutputStream os, byte[] command, byte[]... args) {
try {
os.write((byte)42);
os.writeIntCrLf(args.length + 1);
os.write((byte)36);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
byte[][] var3 = args;
int var4 = args.length;
for(int var5 = 0; var5 < var4; ++var5) {
byte[] arg = var3[var5];
os.write((byte)36);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException var7) {
throw new JedisConnectionException(var7);
}
}
System.out.println((char)42);
System.out.println(((char)36));
控制台:
*
$
可以大体的看出他得实现过程。把sendCommand方法翻译一下就是:
os.write("*".getBytes());
os.write(//数组长度/r/n);
os.write("$".getBytes());
os.write(//命令长度/r/n)
os.write(命令/r/n);
然后进入循环写入:
$+参数长度/r/n
参数/r/n
整理一下:就是Redis的通信协议,官方文档:https://redis.io/topics/protocol 。
*3 // 数据一共有三个数组
//数组1
$6 //下行为6个长度的字符串
APPEND
//数组2
$5 // 下行为5个长度的字符串
fantj
//数组3
$3 // 下行为3个长度的字符串
666
通过源码可以看到,它在Connection类中进行sendCommend,那我们也一样:
负责与Redis的Server端建立连接并获取反馈信息。
package com.fantj.jedis.connect;
import com.fantj.jedis.protocol.Protocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* 连接类
* 在这里进行创建连接并处理IO请求,用inputStream进行数据回显,
* 提供OutputStream给协议层,以便让其给服务端发送命令
*/
public class Connection {
private String host = "localhost";
private int port = 6379;
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
public Connection() {
}
public Connection(String host) {
this.host = host;
}
public Connection(String host, int port) {
this.host = host;
this.port = port;
}
public Connection sendCommand(Protocol.Command cmd, byte[]... args) {
connect();
Protocol.sendCommand(outputStream, cmd, args);
return this;
}
private void connect() {
try {
if (socket == null) { //IO复用
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 操作状态的返回
* 比如:SET 操作成功返回 +OK
*/
public String getStatus() {
byte[] bytes = new byte[1024];
try {
socket.getInputStream().read(bytes);
} catch (IOException e) {
e.printStackTrace();
}
return new String(bytes);
}
}
而真正的sendCommend实现是在Protocol类中实现(Jedis源码也是这样):
负责提供RESP协议支持和拼接。
package com.fantj.jedis.protocol;
import java.io.IOException;
import java.io.OutputStream;
/**
* RESP协议
* 详见; https://redis.io/topics/protocol
*/
public class Protocol {
// jedis后来将这些常量优化为byte,在os进行写出的时候对其进行char转型
private static final String DOLLAR_BYTE = "$";
private static final String ASTERISK_BYTE = "*";
public static final byte PLUS_BYTE = 43;
public static final byte MINUS_BYTE = 45;
public static final byte COLON_BYTE = 58;
private static final String BLANK_BYTE = "/r/n";
/**
* 拼接RESP 并 发送write
*/
public static void sendCommand(OutputStream os,Protocol.Command cmd, byte[] ... args){
// 1. 生成协议 *3 $3 SET $3 key $5 value
StringBuffer stringBuffer = new StringBuffer();
// 1.1 数组长度 *3
stringBuffer.append(ASTERISK_BYTE).append(args.length+1).append(BLANK_BYTE);
// 1.2 命令长度 $3
stringBuffer.append(DOLLAR_BYTE).append(cmd.name().length()).append(BLANK_BYTE);
// 1.3 命令 SET / GET
stringBuffer.append(cmd).append(BLANK_BYTE);
for (byte[] arg: args){
// 1.4 key/value 长度
stringBuffer.append(DOLLAR_BYTE).append(arg.length).append(BLANK_BYTE);
// 1.5 key/value
stringBuffer.append(new String(arg)).append(BLANK_BYTE);
}
// 写出到服务端
try {
os.write(stringBuffer.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 定义一个枚举类 存放命令
*/
public static enum Command{
SET , GET , KEYS, APPEND
}}
对外提供API。
package com.fantj.jedis.client;
import com.fantj.jedis.connect.Connection;
import com.fantj.jedis.protocol.Protocol;
/**
* 客户端
* 给开发人员使用提供API
*/
public class Client {
private Connection connection;
public Client(String host, int port){
connection = new Connection(host,port);
}
public String set(String key, String value) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes());
return connection.getStatus();
}
public String get(String key) {
connection.sendCommand(Protocol.Command.GET, key.getBytes());
return connection.getStatus();
}
public void set(String key, String value, String nx, String ex, int i) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes(), nx.getBytes(), ex.getBytes(), String.valueOf(i).getBytes());
}
public void append(String key, String value){
connection.sendCommand(Protocol.Command.APPEND, key.getBytes(),value.getBytes());
}
}
对Client类的进一层封装,留给开发人员使用。
package com.fantj.jedis.client;
public class Jedis extends Client{
public Jedis(String host, int port) {
super(host, port);
}
@Override
public String set(String key, String value) {
return super.set(key, value);
}
@Override
public String get(String key) {
return super.get(key);
}
@Override
public void set(String key, String value, String nx, String ex, int i) {
super.set(key, value, nx, ex, i);
}
@Override
public void append(String key, String value) {
super.append(key, value);
}
}
/**
* 测试我们自己写的客户端
*/
public class Main {
private Jedis client = new Jedis("www.xxx.top",6380);
@Test
public void set(){
client.set("fantj","fantj");
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void setNx(){
client.set("fantj","fantj","NX","EX",10000);
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void append(){
// client.append("fantj","-2019");
String fantj = client.get("fantj");
System.out.println(fantj);
}
@Test
public void testChar(){
System.out.println((char)42);
System.out.println(((char)36));
}
}
测试都可以通过。
假设我执行append("fantj","666")这个命令,那客户端的操作过程:
*3 // 数据一共有三个数组
//数组1
$6 //下行为6个长度的字符串
APPEND
//数组2
$5 // 下行为5个长度的字符串
fantj
//数组3
$3 // 下行为3个长度的字符串
666
StringBuffer sb = new StringBuffer();
// 注意每个类型表示完后都进行换行
sb.append("*").append("3").append("/r/n");
sb.append("$").append("6").append("/r/n");
sb.append("APPEND").append("/r/n");
sb.append("$").append("5").append("/r/n");
sb.append("fantj").append("/r/n");
sb.append("$").append("3").append("/r/n");
sb.append("666").append("/r/n");
os.write(sb.toString().getBytes());
/**
* 连接池契约
*/
public interface Pool<T> {
/**
* 初始化连接池
* @param maxTotal 最大连接数
* @param maxWaitMillis 最大等待时间
*/
public void init(int maxTotal, long maxWaitMillis);
/**
* 获取连接
* @return 返回jedis对象
*/
public Jedis getResource() throws Exception;
/**
* 释放连接
*/
public void release(T t);
}
实现方式与ExcutorThreadPool工作流程类似,注释写的挺全的就不做详细解读了。
package com.fantj.jedis.pool;
import com.fantj.jedis.client.Jedis;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class JedisPool implements Pool<Jedis>{
public JedisPool(String url, int port) {
this.url = url;
this.port = port;
init(maxTotal,maxWaitMillis);
}
public JedisPool(String url, int port, int maxTotal, long maxWaitMillis) {
this.url = url;
this.port = port;
this.maxTotal = maxTotal;
this.maxWaitMillis = maxWaitMillis;
}
private String url;
private int port;
private int maxTotal = 20;
private long maxWaitMillis = 1000;
// 空闲的连接queue
private LinkedBlockingQueue<Jedis> idleWorkQueue = null;
private Queue<Jedis> activeWorkQueue = null;
// 当前连接数量
private AtomicInteger count = new AtomicInteger(0);
@Override
public void init(int maxTotal, long maxWaitMillis) {
maxTotal = maxTotal;
maxWaitMillis = maxWaitMillis;
idleWorkQueue = new LinkedBlockingQueue<>(maxTotal);
activeWorkQueue = new LinkedBlockingQueue<>(maxTotal);
}
@Override
public Jedis getResource() throws Exception {
Jedis jedis = null;
// 1. 记录开始时间,检测超时
long startTime = System.currentTimeMillis();
while (true){
// 2. 从空闲队列中获取连接,如果拿到,一式两份存放到活动队列
jedis = idleWorkQueue.poll();
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
// 3. 如果失败,判断池是否满,没满则创建
if (count.get() < maxTotal){
if (count.incrementAndGet() <= maxTotal){
jedis = new Jedis(url,port);
activeWorkQueue.offer(jedis);
System.out.printf("创建了一个新的连接: %s /r/n", jedis.toString());
return jedis;
}else {
count.decrementAndGet();
}
}
// 4. 如果连接池满了,则在超时时间内进行等待
try {
jedis = idleWorkQueue.poll(maxWaitMillis-(System.currentTimeMillis()-startTime), TimeUnit.MILLISECONDS);
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 5. poll可能被中断,所以在这里再进行超时判断
if (maxWaitMillis < (System.currentTimeMillis()-startTime)){
throw new RuntimeException("JedisPool: jedis connect timeout");
}
}
}
@Override
public void release(Jedis jedis) {
if (activeWorkQueue.remove(jedis)){
idleWorkQueue.offer(jedis);
}
}
}
GitHub地址: https://github.com/fantj2016/easy-jedis
推 荐 阅 读 (点击文字可跳转)
1. 深究Spring中Bean的生命周期
2. 深入SpringBoot核心注解原理
3. 线上环境部署概览
4. Springboot Vue shiro 实现前后端分离、权限控制