转载

Redis内存回收:LRU算法

Redis: https://github.com/zwjlpeng/Redis_Deep_Read

Redis 中采用两种算法进行内存回收,引用计数算法以及 LRU 算法,在操作系统内存管理一节中,我们都学习过 LRU 算法( 最近最久未使用算法 ),那么什么是 LRU 算法呢

LRU 算法作为内存管理的一种有效算法,其含义是在内存有限的情况下,当内存容量不足时,为了保证程序的运行,这时就不得不淘汰内存中的一些对象,释放这些对象占用的空间,那么选择淘汰哪些对象呢? LRU 算法就提供了一种策略,告诉我们选择最近一段时间内,最久未使用的对象将其淘汰,至于为什么要选择最久未使用的,可以想想,最近一段时间内使用的东西,我们是不是可能一会又要用到呢~,而很长一段时间内都没有使用过的东西,也许永远都不会再使用~

在操作系统中 LRU 算法淘汰的不是内存中的对象,而是页,当内存中数据不足时,通过 LRU 算法,选择一页( 一般是4KB )将其交换到虚拟内存区( Swap 区)

LRU 算法演示

Redis内存回收:LRU算法

这张图应该画的还行吧,用的是 www.draw.io ,解释如下,假设前提,只有三块内存空间可以使用,每一块内存空间只能存放一个对象,如A、B、C...

1、最开始时,内存空间是空的,因此依次进入A、B、C是没有问题的

2、当加入D时,就出现了问题,内存空间不够了,因此根据LRU算法,内存空间中A待的时间最为久远,选择A,将其淘汰

3、当再次引用B时,内存空间中的B又处于活跃状态,而C则变成了内存空间中,近段时间最久未使用的

4、当再次向内存空间加入E时,这时内存空间又不足了,选择在内存空间中待的最久的C将其淘汰出内存,这时的内存空间存放的对象就是E->B->D

LRU 算法的整体思路就是这样的

算法实现应该采用怎样的数据结构 

队列?那不就是 FIFO 算法嘛~,LRU算法最为精典的实现,就是 HashMap+Double LinkedList ,时间复杂度为 O(1), 具体可以参考相关代码

REDIS中LRU算法的实际应用,在 Redis 1.0中 并未引入 LRU 算法,只是简单的使用引用计数法,去掉内存中不再引用的对象以及运行一个定时任务 serverCron 去掉内存中已经过期的对象占用的内存空间,以下是Redis 1.0中CT任务的释放内存中的部份代码

//去掉一些过期的KEYS for (j = 0; j < server.dbnum; j++) {  redisDb *db = server.db+j;  int num = dictSize(db->expires);//计算hash表中过期Key的数目  if (num) {   time_t now = time(NULL);   //#define REDIS_EXPIRELOOKUPS_PER_CRON 100   if (num > REDIS_EXPIRELOOKUPS_PER_CRON)    num = REDIS_EXPIRELOOKUPS_PER_CRON;   //循环100次,从过期Hash表中随机挑选出100个Key,判断Key是否过期,如果过期了,执行删除操作   while (num--) {    dictEntry *de;    time_t t;    //随机获取Key值(db->expires里面存储的均是即将过期的Keys)    if ((de = dictGetRandomKey(db->expires)) == NULL) break;    t = (time_t) dictGetEntryVal(de);    if (now > t) {     //不仅要从存放过期keys的Hash表中删除数据,还要从存放实际数据的Hash表中删除数据     deleteKey(db,dictGetEntryKey(de));    }   }  } } 

如果没有看过Redis 1.0源码,理解起来可能有些困难,但看看1.0源码中的这个结构体,估计有点数据结构基础的人,都明白上面这几行代码的意思了(注释部份我也已经写的很清楚了)~

typedef struct redisDb {     dict *dict;//用来存放实际Key->Value数据的位置      dict *expires;//用于记录Key的过期时间      int id;//表示选择的是第几个redis库 } redisDb;

没有查证是从什么版本开始, Redis 增加了 LRU 算法,以下是分析 Redis 2.9.11 代码中的 LRU 算法淘汰策略,在2.9.11版本中与LRU算法相关的代码主要位于 object.c 以及 redis.c 两个源文件中, 再分析这两个文件关于LRU源代码之前,让我 们先看一下, Redis 2.9.11 版本中关于 LRU 算法的配置,配置文件在 redis.conf 文件中,如下所示

# maxmemory <bytes>  # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: #  # volatile-lru -> remove the key with an expire set using an LRU algorithm # allkeys-lru -> remove any key accordingly to the LRU algorithm # volatile-random -> remove a random key with an expire set # allkeys-random -> remove a random key, any key # volatile-ttl -> remove the key with the nearest expire time (minor TTL) # noeviction -> don't expire at all, just return an error on write operations #  # Note: with any of the above policies, Redis will return an error on write #       operations, when there are not suitable keys for eviction. # #       At the date of writing this commands are: set setnx setex append #       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd #       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby #       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby #       getset mset msetnx exec sort # # The default is: # # maxmemory-policy noeviction  # LRU and minimal TTL algorithms are not precise algorithms but approximated # algorithms (in order to save memory), so you can tune it for speed or # accuracy. For default Redis will check five keys and pick the one that was # used less recently, you can change the sample size using the following # configuration directive. # # The default of 5 produces good enough results. 10 Approximates very closely # true LRU but costs a bit more CPU. 3 is very fast but not very accurate. # # maxmemory-samples 5

从上面的配置中,可以看出,高版本的Redis中当内存达到极限时,内存淘汰策略主要采用了6种方式进行内存对象的释放操作

1.volatile-lru:从设置了过期时间的数据集中,选择最近最久未使用的数据释放

2.allkeys-lru:从数据集中(包括设置过期时间以及未设置过期时间的数据集中),选择最近最久未使用的数据释放

3.volatile-random:从设置了过期时间的数据集中,随机选择一个数据进行释放

4.allkeys-random:从数据集中(包括了设置过期时间以及未设置过期时间)随机选择一个数据进行入释放

5.volatile-ttl:从设置了过期时间的数据集中,选择马上就要过期的数据进行释放操作

6.noeviction:不删除任意数据(但redis还会根据引用计数器进行释放呦~),这时如果内存不够时,会直接返回错误

默认的内存策略是noeviction,在 RedisLRU 算法是一个近似算法,默认情况下,Redis随机挑选5个键,并且从中选取一个最近最久未使用的key进行淘汰,在配置文件中可以通过 maxmemory-samples 的值来设置 redis 需要检查 key 的个数,但是栓查的越多,耗费的时间也就越久,但是结构越精确(也就是Redis从内存中淘汰的对象未使用的时间也就越久~),设置多少,综合权衡吧~~~

在redis.h中声明的redisObj定义的如下:

#define REDIS_LRU_BITS 24 #define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */ #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ typedef struct redisObject {
//存放的对象类型 unsigned type:4; //内容编码 unsigned encoding:4; //与server.lruclock的时间差值 unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) *// //引用计数算法使用的引用计数器 int refcount; //数据指针 void *ptr; } robj;

从redisObject结构体的定义中可以看出,在Redis中存放的对象不仅会有一个引用计数器,还会存在一个 server.lruclock,这个变量会在定时器中每次刷新时,调用getLRUClock获取当前系统的毫秒数,作为LRU时钟数 , 该计数器总共占用24位,最大可以表示的值为24个1即 ((1<<REDIS_LRU_BITS) - 1)=2^24 - 1 ,单位是毫秒,你可以算一下这么多毫秒,可以表示多少年~~

server.lruclockredis.c 中运行的定时器中进行更新操作,代码如下( redis.c 中的定时器被配置中 100ms 执行一次)

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {     .....     run_with_period(100) trackOperationsPerSecond();      /* We have just REDIS_LRU_BITS bits per object for LRU information.      * So we use an (eventually wrapping) LRU clock.      *      * Note that even if the counter wraps it's not a big problem,      * everything will still work but some object will appear younger      * to Redis. However for this to happen a given object should never be      * touched for all the time needed to the counter to wrap, which is      * not likely.      *      * Note that you can change the resolution altering the      * REDIS_LRU_CLOCK_RESOLUTION define. */     server.lruclock = getLRUClock();     ....     return 1000/server.hz; }

看到这,再看看 Redis 中创建对象时,如何对 redisObj 中的 unsigned lru 进行赋值操作的,代码位于 object.c 中,如下所示

robj *createObject(int type, void *ptr) {  robj *o = zmalloc(sizeof(*o));  o->type = type;  o->encoding = REDIS_ENCODING_RAW;  o->ptr = ptr;  o->refcount = 1;  //很关键的一步,Redis中创建的每一个对象,都记录下该对象的LRU时钟  /* Set the LRU to the current lruclock (minutes resolution). */  o->lru = LRU_CLOCK();  return o; } 

该代码中最为关键的一句就是 o->lru=LRU_CLOCK() ,这是一个定义,看一下这个宏定义的实现,代码如下所示

#define LRU_CLOCK() ((1000/server.hz <= REDIS_LRU_CLOCK_RESOLUTION) ? server.lruclock : getLRUClock())

其中 REDIS_LRU_CLOCK_RESOLUTION 为1000,可以自已在配置文件中进行配置,表示的是 LRU 算法的精度,在这里我们就可以看到 server.lruclock 的用处了,如果定时器执行的频率高于LRU算法的精度时,可以直接将 server.lruclock 直接在对象创建时赋值过去,避免了函数调用的内存开销以及时间开销~

有了上述的基础,下面就是最为关键的部份了, REDISLRU 算法,这里以 volatile-lru 为例(选择有过期时间的数据集进行淘汰),在Redis中命令的处理时,会调用 processCommand 函数,在ProcessCommand函数中,当在配置文件中配置了 maxmemory 时,会调用freeMemoryIfNeeded函数,释放不用的内存空间

以下是freeMemoryIfNeeded函数的关于LRU相关部份的源代码,其他代码类似

//不同的策略,操作的数据集不同 if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||  server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) {  dict = server.db[j].dict; } else {//操作的是设置了过期时间的key集  dict = server.db[j].expires; } if (dictSize(dict) == 0) continue; /* volatile-random and allkeys-random policy */ //随机选择进行淘汰 if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||  server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) {  de = dictGetRandomKey(dict);  bestkey = dictGetKey(de); } /* volatile-lru and allkeys-lru policy */ //具体的LRU算法 else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||  server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) {  struct evictionPoolEntry *pool = db->eviction_pool;  while(bestkey == NULL) {   //选择随机样式,并从样本中作用LRU算法选择需要淘汰的数据   evictionPoolPopulate(dict, db->dict, db->eviction_pool);   /* Go backward from best to worst element to evict. */   for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) {    if (pool[k].key == NULL) continue;    de = dictFind(dict,pool[k].key);    sdsfree(pool[k].key);    //将pool+k+1之后的元素向前平移一个单位    memmove(pool+k,pool+k+1,     sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));    /* Clear the element on the right which is empty     * since we shifted one position to the left.  */    pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL;    pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0;    //选择了需要淘汰的数据    if (de) {     bestkey = dictGetKey(de);     break;    } else {     /* Ghost... */     continue;    }   }  } } 

看了上面的代码,也许你还在奇怪,说好的,LRU算法去哪去了呢,再看看这个函数 evictionPoolPopulate 的实现吧

#define EVICTION_SAMPLES_ARRAY_SIZE 16 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {  int j, k, count;  //EVICTION_SAMPLES_ARRAY_SIZE最大样本数,默认16  dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];  dictEntry **samples;  //如果我们在配置文件中配置的samples小于16,则直接使用EVICTION_SAMPLES_ARRAY_SIZE  if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {   samples = _samples;  } else {   samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples);  } #if 1 /* Use bulk get by default. */  //从样本集中随机获取server.maxmemory_samples个数据,存放在  count = dictGetRandomKeys(sampledict,samples,server.maxmemory_samples); #else  count = server.maxmemory_samples;  for (j = 0; j < count; j++) samples[j] = dictGetRandomKey(sampledict); #endif  for (j = 0; j < count; j++) {   unsigned long long idle;   sds key;   robj *o;   dictEntry *de;   de = samples[j];   key = dictGetKey(de);   if (sampledict != keydict) de = dictFind(keydict, key);   o = dictGetVal(de);   //计算LRU时间   idle = estimateObjectIdleTime(o);   k = 0;   //选择de在pool中的正确位置,按升序进行排序,升序的依据是其idle时间   while (k < REDIS_EVICTION_POOL_SIZE &&       pool[k].key &&       pool[k].idle < idle) k++;   if (k == 0 && pool[REDIS_EVICTION_POOL_SIZE-1].key != NULL) {    /* Can't insert if the element is < the worst element we have     * and there are no empty buckets. */    continue;   } else if (k < REDIS_EVICTION_POOL_SIZE && pool[k].key == NULL) {    /* Inserting into empty position. No setup needed before insert. */   } else {    //移动元素,memmove,还有空间可以插入新元素    if (pool[REDIS_EVICTION_POOL_SIZE-1].key == NULL) {     memmove(pool+k+1,pool+k,      sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));    } else {//已经没有空间插入新元素时,将第一个元素删除     /* No free space on right? Insert at k-1 */     k--;     /* Shift all elements on the left of k (included) to the      * left, so we discard the element with smaller idle time. */     //以下操作突出了第K个位置     sdsfree(pool[0].key);     memmove(pool,pool+1,sizeof(pool[0])*k);    }   }   //在第K个位置插入   pool[k].key = sdsdup(key);   pool[k].idle = idle;  }  //执行到此之后,pool中存放的就是按idle time升序排序  if (samples != _samples) zfree(samples); } 

看了上面的代码, LRU 时钟的计算并没有包括在内,那么在看一下 LRU 算法的时钟计算代码吧, LRU 时钟计算代码在 object.c 中的 estimateObjectIdleTime 这个函数中,代码如下~~

//精略估计LRU时间  unsigned long long estimateObjectIdleTime(robj *o) {  unsigned long long lruclock = LRU_CLOCK();  if (lruclock >= o->lru) {   return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;  } else {//这种情况一般不会发生,发生时证明redis中键的保存时间已经wrap了   return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) *      REDIS_LRU_CLOCK_RESOLUTION;  } } 

好了,先到此吧~~~

正文到此结束
Loading...