这一年以来,写了太多的业务代码。是时候要总结一下自己的积累了。本文是redis深度历险的读书笔记,做个记录以及分享给大家。
docker
redis
字符串是一个字符数组
常见用途就是信息JSON序列化成为字符串之后,存入redis,取信息会经过一次反序列化
字符串是动态字符串,可以进行扩容;当字符串长度小于1m的时候,扩容是加倍。当大于1m,每次扩容就增加1m。字符串最大的长度为512m
使用字符串计数的时候,单个值的最大值为signed long,超过就会报错
常用命令
set name a get name exists a del a mset a 1 b 2 mget a b expire a 5 setex a 5 1 setnx a 1 incr a incrby a 10
相当于java中的LinkedList,一个双向链表。插入和删除操作快,但是索引定位很慢。
节点删除操作 p.pre.next = p.next p.next.pre = p.pre p = null
列表可以作为队列,又进左出即可
rpush books java python golang kotlin 4 llen books 4 lpop books java
列表可以作为栈,右进右出即可。就是,调整了队列使用的方向
rpush books java python golang kotlin 4 llen books 4 rpop books kotlin
使用队列会存在的慢操作有get(index)方法,需要遍历整个链表,性能为O(n)
rpush books java python golang kotlin 4 lindex 1 # 时间为O(n) lindex books 1 "python" lrange books 0 -1 "java" "python" "golang" "kotlin" ltrim books 1 -1 # 设置长度为3的数组,区间外的全部被清除 OK lrange books 0 -1 # 获取所有元素 "python" "golang" "kotlin" llen books 3 ltrim books 1 0 # 删除所有元素 OK
快速列表:在列表元素较少的时候,会使用一块连续的内存,叫ziplist(压缩列表)。当数据量较多的时候,就要变成quicklist(快速列表),多个ziplist使用双向指针串起来使用。
// 快速列表 struct quicklist { quicklistNode* head; quicklistNode* tail; long count; // 元素总数 int nodes; // ziplist 节点的个数 int compressDepth; // LZF 算法压缩深度 ... } // 快速列表节点 struct quicklistNode { quicklistNode* prev; quicklistNode* next; ziplist* zl; // 指向压缩列表 int32 size; // ziplist 的字节总数 int16 count; // ziplist 中的元素数量 int2 encoding; // 存储形式 2bit,原生字节数组还是 LZF 压缩存储 } struct ziplist_compressed { int32 size; byte[] compressed_data; } struct ziplist { ... }
内部实现结构就是数组+链表的方式进行实现。相同hash值的元素使用链表串联起来。java的链表数量超过6个会变成红黑树,不知道redis的会不会有这个查询优化。
redis里面的值就只能是字符串,便于比较。不像java的hashmap可以存储对象,equals方法的编写可谓五花八门
java的hashmap在进行rehash的时候,需要一次性进行rehash,是个耗时操作,多线程还会出现环。redis的是渐进式rehash。rehash的时候,会保留两个hash结构,查询时候会查询两个hash结构,在后续的定时任务和hash指令操作的时候,渐进地将旧的内容迁移到新的hash结构中。当内容迁移完毕,删除旧的hash结构
hset books java "think in java" 1 hset books golang "concurrency in go" 1 hset books python "python cookbook" 1 hgetall books # 返回所有entryies,key和value间隔返回 "java" "think in java" "golang" "concurrency in go" "python" "python cookbook" hlen books 3 hget books java "think in java" hset books java "my java book" 0 hmset books java "" golang "" ok hset user-lixinkun age 25 1 hincrby user-lixinkun 1 26
相当于java的hashset,确保唯一性
sadd books java 1 sadd books java 0 sadd books python golang smembers books java golang python #顺序可能被打乱
*内部排序通过skiplist实现。zset要支持睡觉的插入和删除,不能使用数组来表示。普通的链表数据结构,有新元素插入的时候,需要进行定位,这样才能保证是有序的。通常会通过二分查找来确定位置,但是只有数组才支持快速查找。这时候就需要一个多层级的列表结构,最下面一层的元素都是串联起来的。某个元素存在于所有的层。定位插入点的时候,先在顶层进行定位,然后下潜到下一层,然后一直到最底下一层。redia采用随机策略来决定元素可以兼职到第几层。L0层的概率是100%,L1的是50%。这样去类推
数据结构,具体介绍链接 redis跳跃列表介绍
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { //成员对象 robj *obj; //分值 double score; //后退指针 struct zskiplistNode *backward; //层 struct zskiplistLevel { struct zskiplistNode *forward;//前进指针 unsigned int span;//跨度 } level[]; } zskiplistNode; typedef struct zskiplist { //表头节点和表尾节点 struct zskiplistNode *header, *tail; //表中节点的的数量 unsigned long length; //表中层数最大的节点层数 int level; } zskiplist;
create if not exists.如果容器不存在,则先创建容器。
drop if no elements.如果容器没有元素了,就立即删除该容器,释放内存。
容器的过期时间是针对整个对象的,hash的过期是整个进行过期,而不是单个元素
对于不是原子性的操作,就需要通过分布式锁来保证只有一个线程可以获取到锁。具体使用的是setnx和del命令。锁处理完了以后,就要被释放。中间需要解决的问题就是,如果业务逻辑执行之间,出现了异常,可能导致del无法执行,整个业务瘫痪。
或许,可以加一个过期时间,5s以后自动释放。但是逻辑还有问题的,如果expire的时候,命令丢失了,也会死锁。出现这种问题的原因就是setnx和expire两个命令不是原子性的。
在redis2.8的版本中,作者加入了set指令的扩展餐厨,setnx和expire指令可以一起执行了,两个指令是原子性的。
set lock:book true ex 5 nx del lock:book
超时问题是redis分布式锁不能解决的问题,如果加锁和释放锁之间的逻辑太长,超出了锁的超时时限,那么第二个线程可能获得redis分布式锁。所以,redis分布式锁不要用于较长时间的任务
可重入性。java里面有一个ReentrantLock。Redis分布式锁如果要支持重入,需要对客户端的set方法进行封装,使用线程的ThreadLocal遍历存储当前持有锁的计数。
这个是redis分布式锁的使用和错误案例介绍 redis分布式锁使用与错误示例
//错误使用就是使用setnx和expire两个指令 /** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; }
public class RedisWithReentrantLock{ //线程锁计数器8 private ThreadLocal<Map<String,Integer>> lockers = new ThreadLocal<>(); private Jedis jedis; //内部使用的lock private boolean _lock(String key) { return jedis.set(key, "", "nx", "ex", 5L) != null; } //内部使用的unlock private void _unlock(String key) { jedis.del(key); } //获取计数器 private Map<String, Integer> currentLockers() { Map<String, Integer> refs = lockers.get(); if (refs != null) { return refs; } lockers.set(new HashMap<>()); return lockers.get(); } //可重入加锁 public boolean lock(String key) { Map<String, Integer> refs = currentLockers(); Integer refCnt = refs.get(key); if (refCnt != null) { refs.put(key, refCnt + 1); return true; } boolean ok = _lock(key); if (!ok) { return false; } refs.put(key, 1); return true; } //可重入释放锁 public boolean unlock(String key) { Map<String, Integer> refs = currentLockers(); Integer refCnt = refs.get(key); if (refCnt == null) { return false; } refCnt = -1; if (refCnt > 0) { refs.put(key, refCnt); } else { refs.remove(key); _unlock(key); } return true; } }
使用简单,只有一个消费者
没有ack保证,如果对消息可靠性有极高的要求,推荐使用rocketmq的事务消息
使用list作为异步消息的发送结构,rpush,rpop
平时进行开发的时候,要对一些bool类型的数据进行存取,如用户一年的签到记录。如果使用key/value方式进行存取,就要记录365个,当用户量上亿的时候,需要的存储结构是非常惊人的。
redis提供了位图数据结构,每个用户每天的签到就只占据一个位。365天大概需要46个字节,大大节约空间
位图就是byte数组,可以使用getbit和setbit进行存储,如果字节是不可以打印字符,redis会展示其16进制。实际来操作一波
零存整取的方式,一个个bit去设置,然后一次性拿出值 通过python获取字母`h`的hash值为0b1101000 setbit s 1 1 setbit s 2 1 setbit s 4 1 get s "h" # 这样就得到了h的值 零存零取的方式,就是对单个bit进行操作,没有设置的位就是默认为0 set bit w 1 1 get bit w 0 0 整存零取 set w h getbit w 1 1 不可打印字符 setbit x 0 1 setbit x 1 1 get x "/xc0"
位图的统计和查找:redis提供了对位图的基本统计算
set w hello ok bitcount w 21 bitcount w 0 0 3 bitcount w 0 1 # 统计1的数量 7 bitpos w 0 # 第一个0 0 bitpos w 1 1 1 # 第二个字符算起,第一个1位 魔术指令bitfield bitfield w get u4 0
页面的uv,不能使用一个key的incr,因为要进行去重
使用zset也可以做到这个,但是消耗空间太大
HyperLogLog的标准误差是0.81%。对于不精确的统计,可以使用这个数据结构
提供pdadd,pfcount两个指令,pf是Philippe Flajolet教授的名字缩写,这个数据结构是他发明的。这个数据结构占用的内存是12k,使用了2^14个桶去进行计算
pfadd book java 1 pfadd book java 0 pfadd book python 1 pfadd book python pfcount book 2
这个数据结构主要实现去重的功能
场景使用:广告推送的时候,如果我们看过了一条广告,下一条推送的时候,要对看过的广告进行去重。如果使用关系型数据库,每次推送都要去数据库查询,这样数据库很难抗住这个压力。况且,浏览记录会随着时间线性增长,空间也跟不上
布隆过滤器会有一定的误判概率。这可以比喻成为一个不精确的set结构,调用contains的时候,有可能误判。如果说这个值不存在,那就肯定不存在。如果说这个值存在,优肯是误判,可能,两者长得相像?这样的话,对于实际业务也没有影响,我们可以保证推送给用户的都是最新的内容,只有极少数用户没看过的内容会被误判。经过测试,误判率大概在1%左右。调整参数可以优化这个误判率
布隆过滤器作为redis的一个插件,需要版本为4.0以后,才能使用。
基本指令为bf.add,bf.exists.bf.madd(批量),安装docker,然后去重新安装redis,带这个插件的版本。 centos7安装docker方法 ,安装好以后再安装插件,使用docker启动redis
用户可以通过bf.reserve命令显式制定错误率,key和初始空间。错误率越低,占用空间越大。初始值越大,空间越大,所以,在使用的时候,需要准确预估一下元素的大小。
布隆过滤器是一个大型的位数组。向布隆过滤器添加key的时候,会使用多个hash函数对key进行hash,算得一个整数索引。然后对位数组的长度取模运算得到一个位置,每个hash函数都得到一个不同的位置,再把这几个位置都设置为1,完成了add操作。巧妙地共享可多个key的存储空间。
为何会出现误判呢?就是由于判断key是否存在的时候,如果某个位是0,那就一定不存在。但是,如果各个位置都是1的话,也不能说明是存在的,有可能是其他key存在导致。如果位数组比较稀疏,出现误判的概率就低了。所以,使用的时候,不要让实际的元素大于初始值。如果大于的时候,要对布隆过滤器进行重建。
set存储的是所有的元素,布隆过滤器是存储元素的指纹(hash)。空间节省还是比较明显的.
docker加载插件的命令,docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest
docker exec -it redis-redisbloom bash
bf.add book java 1 bf.add book java #再次加入java,就被排出了 0 bf.exists book python 0
public class SimpleRateLimiter{ private Jedis jedis; //用户请求是否允许 //使用乐观锁 //value保证唯一性就好 public boolean isActionAllow(String userId, String actionKey, int period, int maxCount) { String key = String.format("hist:%s:%s", userId, actionKey); long now = System.currentTimeMillis(); Pipeline pipe = jedis.pipelined(); pipe.multi(); pipe.zadd(key, now, "", now); //截取 pipe.zremrangeByScore(key, 0, now - period * 1000); Response<Long> count = pipe.zcard(key); pipe.expire(key, period + 1); pipe.exec(); pipe.close(); return count.get() <= maxCount; } }
//单机版的漏斗限流 public class FunnelRateLimiter{ class Funnel{ //容量 int capacity; //漏水速率 float leakingRate; //剩余空间 int leftQuota; //上次触发时间 long leakingTs; } //....省略构造方法 void makeSpace() { long now = System.currentTimeMillis(); long deltaTs = nowTs - leakingTs; int deltaQuota = (int)(deltaTs * leakingRate); //如果时间间隔太久,可能会导致int溢出 if (deltaQuota < 0) { //重设 this.leftQuota = capacity; this.leakingTs = 0; return; } if (deltaQuota < 1) { return; } this.leftQuota += leftQuota; this.leakingTs = nowTs; if (this.leftQuota > this.capacity) { this.leftQuota = this.capacity; } //是否可以接收请求 boolean watering(int quota) { makeSpace(); if (leftQuota >= quota) { this.leftQuota -= quota; return true; } return false; } } private Map<String, Funnel> funnels = new HashMap<>(); public boolean isActionAllow(String userId, String actionKey, int capacity, fload leakingRate) { String key = String.format("funnels:%s:%s", userId, actionKey); Funnel funnel = funnels.get(key); if (funnel == null) { funnel = new Funnel(capacity, leakingRate); funnels.put(key, funnel); } return funnel.watering(1); } }
wget https://github.com/brandur/redis-cell/releases/download/v0.2.4/redis-cell-v0.2.4-x86_64-unknown-linux-gnu.tar.gz tar -xzvf redis-cell-v0.2.4-x86_64-unknown-linux-gnu.tar.gz 解压后得到libredis_cess.so redis-cli module load /data/libredis_cell.so 我这里是放到了data目录下面,运行redis后加载就行了 //各个参数 key 容量 计算漏斗速率 可选参数 cl.throttle <key> <max_burst> <count per period> <period> [<quantity>] cl.throttle book 15 30 60 1 # 这个表示容量为15,2秒一个漏水速率(30/60) 0 # 0 允许 -1 拒绝 16 # 漏斗容量 15 # 剩余空间 -1 # 当第一个参数为-1师,这里的值表示需要经过多少时间以后才能再试 2 # 多久时间以后,漏斗可以完全空出来