转载

Redis学习之旅~基础应用篇

这一年以来,写了太多的业务代码。是时候要总结一下自己的积累了。本文是redis深度历险的读书笔记,做个记录以及分享给大家。

docker redis

数据结构

字符串

  • 字符串是一个字符数组

  • 常见用途就是信息JSON序列化成为字符串之后,存入redis,取信息会经过一次反序列化

  • 字符串是动态字符串,可以进行扩容;当字符串长度小于1m的时候,扩容是加倍。当大于1m,每次扩容就增加1m。字符串最大的长度为512m

  • 使用字符串计数的时候,单个值的最大值为signed long,超过就会报错

  • 常用命令

    set name a
    
    get name
    
    exists a
    
    del a
    
    mset a 1 b 2
    
    mget a b
    
    expire a 5
    
    setex a 5 1
    
    setnx a 1
    
    incr a
    
    incrby a 10

列表

  • 相当于java中的LinkedList,一个双向链表。插入和删除操作快,但是索引定位很慢。

    节点删除操作
    
    p.pre.next = p.next
    
    p.next.pre = p.pre
    
    p = null
  • 列表可以作为队列,又进左出即可

    rpush books java python golang kotlin
    
    4
    
    llen books
    
    4
    
    lpop books
    
    java
  • 列表可以作为栈,右进右出即可。就是,调整了队列使用的方向

    rpush books java python golang kotlin
    4
    llen books
    4
    rpop books
    kotlin
  • 使用队列会存在的慢操作有get(index)方法,需要遍历整个链表,性能为O(n)

    rpush books java python golang kotlin
    4
    lindex 1            # 时间为O(n)
    lindex books 1
    "python"
    lrange books 0 -1
    "java"
    "python"
    "golang"
    "kotlin"
    ltrim books 1 -1      # 设置长度为3的数组,区间外的全部被清除
    OK
    lrange books 0 -1    # 获取所有元素
    "python"
    "golang"
    "kotlin"
    llen books
    3
    ltrim books 1 0        # 删除所有元素
    OK
  • 快速列表:在列表元素较少的时候,会使用一块连续的内存,叫ziplist(压缩列表)。当数据量较多的时候,就要变成quicklist(快速列表),多个ziplist使用双向指针串起来使用。

// 快速列表

struct quicklist {

  quicklistNode* head;

  quicklistNode* tail;

  long count; // 元素总数

  int nodes; // ziplist 节点的个数

  int compressDepth; // LZF 算法压缩深度

  ...

}

// 快速列表节点

struct quicklistNode {

  quicklistNode* prev;

  quicklistNode* next;

  ziplist* zl; // 指向压缩列表

  int32 size; // ziplist 的字节总数

  int16 count; // ziplist 中的元素数量

  int2 encoding; // 存储形式 2bit,原生字节数组还是 LZF 压缩存储

}

struct ziplist_compressed {

  int32 size;

  byte[] compressed_data;

}

struct ziplist {

  ...

}

Hash字典

  • 内部实现结构就是数组+链表的方式进行实现。相同hash值的元素使用链表串联起来。java的链表数量超过6个会变成红黑树,不知道redis的会不会有这个查询优化。

  • redis里面的值就只能是字符串,便于比较。不像java的hashmap可以存储对象,equals方法的编写可谓五花八门

  • java的hashmap在进行rehash的时候,需要一次性进行rehash,是个耗时操作,多线程还会出现环。redis的是渐进式rehash。rehash的时候,会保留两个hash结构,查询时候会查询两个hash结构,在后续的定时任务和hash指令操作的时候,渐进地将旧的内容迁移到新的hash结构中。当内容迁移完毕,删除旧的hash结构

    hset books java "think in java"
    1
    hset books golang "concurrency in go"
    1
    hset books python "python cookbook"
    1
    hgetall books  # 返回所有entryies,key和value间隔返回
    "java"
    "think in java"
    "golang"
    "concurrency in go"
    "python"
    "python cookbook"
     hlen books
     3
    hget books java
    "think in java"
    hset books java "my java book"
    0
    hmset books java "" golang ""
    ok
    hset user-lixinkun age 25
    1
    hincrby user-lixinkun 1
    26

set

  • 相当于java的hashset,确保唯一性

    sadd books java
    1
    sadd books java
    0
    sadd books python golang
    smembers books
    java
    golang
    python    #顺序可能被打乱

Zset 有序列表

  • 类似于java的sortedset和hashmap的结合体。具备set的唯一性原理,也可以给每个值赋一个score,代表它的排序权重。它的内部实现是一个跳跃列表。貌似java也有这个数据结构

*内部排序通过skiplist实现。zset要支持睡觉的插入和删除,不能使用数组来表示。普通的链表数据结构,有新元素插入的时候,需要进行定位,这样才能保证是有序的。通常会通过二分查找来确定位置,但是只有数组才支持快速查找。这时候就需要一个多层级的列表结构,最下面一层的元素都是串联起来的。某个元素存在于所有的层。定位插入点的时候,先在顶层进行定位,然后下潜到下一层,然后一直到最底下一层。redia采用随机策略来决定元素可以兼职到第几层。L0层的概率是100%,L1的是50%。这样去类推

  • 数据结构,具体介绍链接 redis跳跃列表介绍

    /* ZSETs use a specialized version of Skiplists */
    typedef struct zskiplistNode {
    //成员对象
    robj *obj;
    //分值
    double score;
    //后退指针
    struct zskiplistNode *backward;
    //层
      struct zskiplistLevel {
          struct zskiplistNode *forward;//前进指针
          unsigned int span;//跨度
      } level[];
    } zskiplistNode;
    typedef struct zskiplist {
    //表头节点和表尾节点
    struct zskiplistNode *header, *tail;
    //表中节点的的数量
    unsigned long length;
    //表中层数最大的节点层数
    int level;
    } zskiplist;

容器的通用规则

  • create if not exists.如果容器不存在,则先创建容器。

  • drop if no elements.如果容器没有元素了,就立即删除该容器,释放内存。

  • 容器的过期时间是针对整个对象的,hash的过期是整个进行过期,而不是单个元素

分布式锁

  • 对于不是原子性的操作,就需要通过分布式锁来保证只有一个线程可以获取到锁。具体使用的是setnx和del命令。锁处理完了以后,就要被释放。中间需要解决的问题就是,如果业务逻辑执行之间,出现了异常,可能导致del无法执行,整个业务瘫痪。

  • 或许,可以加一个过期时间,5s以后自动释放。但是逻辑还有问题的,如果expire的时候,命令丢失了,也会死锁。出现这种问题的原因就是setnx和expire两个命令不是原子性的。

  • 在redis2.8的版本中,作者加入了set指令的扩展餐厨,setnx和expire指令可以一起执行了,两个指令是原子性的。

set lock:book true ex 5 nx
del lock:book
  • 超时问题是redis分布式锁不能解决的问题,如果加锁和释放锁之间的逻辑太长,超出了锁的超时时限,那么第二个线程可能获得redis分布式锁。所以,redis分布式锁不要用于较长时间的任务

  • 可重入性。java里面有一个ReentrantLock。Redis分布式锁如果要支持重入,需要对客户端的set方法进行封装,使用线程的ThreadLocal遍历存储当前持有锁的计数。

  • 这个是redis分布式锁的使用和错误案例介绍 redis分布式锁使用与错误示例

//错误使用就是使用setnx和expire两个指令

/**

* 尝试获取分布式锁

* @param jedis Redis客户端

* @param lockKey 锁

* @param requestId 请求标识

* @param expireTime 超期时间

* @return 是否获取成功

*/

public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

    if (LOCK_SUCCESS.equals(result)) {

        return true;

    }

    return false;

}
  • java版本的可重入锁,核心就是用threadlocal去记录每个线程,某个key的锁计数
public class RedisWithReentrantLock{
  //线程锁计数器8
  private ThreadLocal<Map<String,Integer>> lockers = new ThreadLocal<>();

  private Jedis jedis;

  //内部使用的lock
  private boolean _lock(String key) {
    return jedis.set(key, "", "nx", "ex", 5L) != null;
  }

  //内部使用的unlock
  private void _unlock(String key) {
    jedis.del(key);
  }

  //获取计数器
  private Map<String, Integer> currentLockers() {
    Map<String, Integer> refs = lockers.get();
    if (refs != null) {
      return refs;
    }
    lockers.set(new HashMap<>());
    return lockers.get();
  }
  //可重入加锁
  public boolean lock(String key) {
    Map<String, Integer> refs = currentLockers();
    Integer refCnt = refs.get(key);
    if (refCnt != null) {
      refs.put(key, refCnt + 1);
      return true;
    }
    boolean ok = _lock(key);
    if (!ok) {
      return false;
    }
    refs.put(key, 1);
    return true;
  }
  //可重入释放锁
  public boolean unlock(String key) {
    Map<String, Integer> refs = currentLockers();
    Integer refCnt = refs.get(key);
    if (refCnt == null) {
      return false;
    }
    refCnt = -1;
    if (refCnt > 0) {
      refs.put(key, refCnt);
    } else {
      refs.remove(key);
      _unlock(key);
    }
    return true;
  }
}

redis的消息队列,只做特性介绍,实际使用我们目前用rocketmq

  • 使用简单,只有一个消费者

  • 没有ack保证,如果对消息可靠性有极高的要求,推荐使用rocketmq的事务消息

  • 使用list作为异步消息的发送结构,rpush,rpop

延时队列的实现

  • 延时队列可以使用zset实现,将消息序列化成为一个字符串,消息的到期时间为score。然后使用多个线程去轮询这个队列。使用多线程是为了保障可用性。一个线程挂了,还有其他线程可以进行处理

位图,节约存储空间

  • 平时进行开发的时候,要对一些bool类型的数据进行存取,如用户一年的签到记录。如果使用key/value方式进行存取,就要记录365个,当用户量上亿的时候,需要的存储结构是非常惊人的。

  • redis提供了位图数据结构,每个用户每天的签到就只占据一个位。365天大概需要46个字节,大大节约空间

  • 位图就是byte数组,可以使用getbit和setbit进行存储,如果字节是不可以打印字符,redis会展示其16进制。实际来操作一波

零存整取的方式,一个个bit去设置,然后一次性拿出值
通过python获取字母`h`的hash值为0b1101000
setbit s 1 1
setbit s 2 1
setbit s 4 1
get s
"h"      # 这样就得到了h的值

零存零取的方式,就是对单个bit进行操作,没有设置的位就是默认为0
set bit w 1 1
get bit w 0
0

整存零取
set w h
getbit w 1
1

不可打印字符
setbit x 0 1
setbit x 1 1
get x
"/xc0"
  • 位图的统计和查找:redis提供了对位图的基本统计算

    set w hello
    ok
    bitcount w
    21
    bitcount w 0 0
    3
    bitcount w 0 1  # 统计1的数量
    7
    bitpos w 0  # 第一个0
    0
    
    bitpos w 1 1 1 # 第二个字符算起,第一个1位
    魔术指令bitfield
    bitfield w get u4 0

高级数据结构

HyberLogLog,提供不精确的统计数据

  • 页面的uv,不能使用一个key的incr,因为要进行去重

  • 使用zset也可以做到这个,但是消耗空间太大

  • HyperLogLog的标准误差是0.81%。对于不精确的统计,可以使用这个数据结构

  • 提供pdadd,pfcount两个指令,pf是Philippe Flajolet教授的名字缩写,这个数据结构是他发明的。这个数据结构占用的内存是12k,使用了2^14个桶去进行计算

pfadd book java
1
pfadd book java
0
pfadd book python
1
pfadd book python
pfcount book
2

布隆过滤器Bloom Filter

  • 这个数据结构主要实现去重的功能

  • 场景使用:广告推送的时候,如果我们看过了一条广告,下一条推送的时候,要对看过的广告进行去重。如果使用关系型数据库,每次推送都要去数据库查询,这样数据库很难抗住这个压力。况且,浏览记录会随着时间线性增长,空间也跟不上

  • 布隆过滤器会有一定的误判概率。这可以比喻成为一个不精确的set结构,调用contains的时候,有可能误判。如果说这个值不存在,那就肯定不存在。如果说这个值存在,优肯是误判,可能,两者长得相像?这样的话,对于实际业务也没有影响,我们可以保证推送给用户的都是最新的内容,只有极少数用户没看过的内容会被误判。经过测试,误判率大概在1%左右。调整参数可以优化这个误判率

  • 布隆过滤器作为redis的一个插件,需要版本为4.0以后,才能使用。

  • 基本指令为bf.add,bf.exists.bf.madd(批量),安装docker,然后去重新安装redis,带这个插件的版本。 centos7安装docker方法 ,安装好以后再安装插件,使用docker启动redis

  • 用户可以通过bf.reserve命令显式制定错误率,key和初始空间。错误率越低,占用空间越大。初始值越大,空间越大,所以,在使用的时候,需要准确预估一下元素的大小。

  • 布隆过滤器是一个大型的位数组。向布隆过滤器添加key的时候,会使用多个hash函数对key进行hash,算得一个整数索引。然后对位数组的长度取模运算得到一个位置,每个hash函数都得到一个不同的位置,再把这几个位置都设置为1,完成了add操作。巧妙地共享可多个key的存储空间。

  • 为何会出现误判呢?就是由于判断key是否存在的时候,如果某个位是0,那就一定不存在。但是,如果各个位置都是1的话,也不能说明是存在的,有可能是其他key存在导致。如果位数组比较稀疏,出现误判的概率就低了。所以,使用的时候,不要让实际的元素大于初始值。如果大于的时候,要对布隆过滤器进行重建。

  • set存储的是所有的元素,布隆过滤器是存储元素的指纹(hash)。空间节省还是比较明显的.

  • docker加载插件的命令,docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest

  • docker exec -it redis-redisbloom bash

bf.add book java
1
bf.add book java   #再次加入java,就被排出了
0
bf.exists book python
0

redis限流

  • 后端屏蔽用户的过于频繁的请求,比如1秒内不能请求超过5次。这个限流存在一个滑动时间窗口,zset的score圈出一个时间窗口,我们保留这个时间窗口,对于窗口外面的数据,则可以统统砍掉。zset的value可以为毫秒时间戳。
public class SimpleRateLimiter{
  private Jedis jedis;

  //用户请求是否允许

  //使用乐观锁

  //value保证唯一性就好

  public boolean isActionAllow(String userId, String actionKey, int period, int maxCount) {
    String key = String.format("hist:%s:%s", userId, actionKey);
    long now = System.currentTimeMillis();
    Pipeline pipe = jedis.pipelined();
    pipe.multi();
    pipe.zadd(key, now, "", now);

    //截取
    pipe.zremrangeByScore(key, 0, now - period * 1000);
    Response<Long> count = pipe.zcard(key);
    pipe.expire(key, period + 1);
    pipe.exec();
    pipe.close();
    return count.get() <= maxCount;
  }
}
  • 漏斗限流,灵感来源于漏斗,就是漏斗有一个容量,也有一个消耗速率。当灌水速率小于漏水速率,漏斗超过容量以后。灌水行为就要丢弃或者暂停
//单机版的漏斗限流

public class FunnelRateLimiter{
  class Funnel{
    //容量
    int capacity;
    //漏水速率
    float leakingRate;
    //剩余空间
    int leftQuota;
    //上次触发时间
    long leakingTs;
  }

  //....省略构造方法

  void makeSpace() {
    long now = System.currentTimeMillis();
    long deltaTs = nowTs - leakingTs;
    int deltaQuota = (int)(deltaTs * leakingRate);
    //如果时间间隔太久,可能会导致int溢出
    if (deltaQuota < 0) {
      //重设
      this.leftQuota = capacity;
      this.leakingTs = 0;
      return;
    }

    if (deltaQuota < 1) {
      return;
    }

    this.leftQuota += leftQuota;
    this.leakingTs = nowTs;
    if (this.leftQuota > this.capacity) {
      this.leftQuota = this.capacity;
    }

    //是否可以接收请求

    boolean watering(int quota) {
      makeSpace();
      if (leftQuota >= quota) {
        this.leftQuota -= quota;
        return true;
      }
      return false;
    }
  }

  private Map<String, Funnel> funnels = new HashMap<>();
  public boolean isActionAllow(String userId, String actionKey, int capacity, fload leakingRate) {
    String key = String.format("funnels:%s:%s", userId, actionKey);
    Funnel funnel = funnels.get(key);
    if (funnel == null) {
      funnel = new Funnel(capacity, leakingRate);
      funnels.put(key, funnel);
    }
    return funnel.watering(1);
  }
}
  • 这里是redis提供的分布式限流,redis4.0提供了redis-cell。这是一个差价,需要去github下载redis-cell的源码解压以后。进入redis-cli后,执行redis的 module load命令即可加载
wget https://github.com/brandur/redis-cell/releases/download/v0.2.4/redis-cell-v0.2.4-x86_64-unknown-linux-gnu.tar.gz

tar -xzvf redis-cell-v0.2.4-x86_64-unknown-linux-gnu.tar.gz

解压后得到libredis_cess.so

redis-cli

module load /data/libredis_cell.so

我这里是放到了data目录下面,运行redis后加载就行了

//各个参数    key    容量          计算漏斗速率              可选参数

cl.throttle <key> <max_burst> <count per period> <period> [<quantity>]

cl.throttle book 15 30 60 1  # 这个表示容量为15,2秒一个漏水速率(30/60)

0          # 0 允许 -1 拒绝

16        # 漏斗容量

15        # 剩余空间

-1        # 当第一个参数为-1师,这里的值表示需要经过多少时间以后才能再试

2          # 多久时间以后,漏斗可以完全空出来
原文  https://studygolang.com/articles/21217
正文到此结束
Loading...