转载

Treap:一种高效的高级数据结构

写在最前

Yedis 是一款高性能的nosql数据库,旨在能在某些方面替代Redis。它由不著名码农、秦汉史历史学家、本站站长Yebangyu同学在业余时间独立开发完成。

Github请访问 这里 ,Python客户端请点击 这里

Treap

Yedis也同样支持rank功能,但是所使用的数据结构是Treap(Redis使用的是Skip List)。这两种都是概率性的高级数据结构,具体性能依赖于随机数的选择。

那么,什么是Treap呢?

Treap:一种高效的高级数据结构

(图片来源于WikiMedia)

简单说来,Treap是这样的树:Treap中的每个节点至少包含key和优先级两个字段,其中key满足搜索树性质,优先级满足堆序性。如上图所示,数字是优先级字段,字母是key字段。其中优先级构成了一个大根堆。

也就是说,在Treap中,key组成了一个二叉搜索树,优先级组成了一个堆。所谓Treap = Tree + Heap。

插入一个节点时,随机生成一个优先级。这时候可能堆序性被破坏,而这可以通过旋转来恢复。由于只可能有左右单旋转两种情形,因此它的代码编写比AVL树、Red-Black Tree要简单的多,并且可以证明旋转的期望次数小于2。

在Treap上的删除、插入、查找的期望时间复杂度都是O($lgn$)。

Treap PK SkipList

Treap实现

/*  * treap.h */ #ifndef YEDIS_TREAP_H_ #define YEDIS_TREAP_H_ #include <stdlib.h> #include <time.h> #define YEDIS_INT32_MAX 2147483647 #define nullptr 0  struct YedisTreapNode {   int key;//binary search tree   int value;   YedisTreapNode *left;   YedisTreapNode *right;   int priority;//max heap };  class YedisTreap { public:   int init();   YedisTreapNode *find(const int key);   int insert(const int key, const int value); private:   int insert_(const int key, const int value,YedisTreapNode *&treap);   YedisTreapNode *left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2);   YedisTreapNode *right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2); private:   YedisTreapNode *root_;   YedisTreapNode *sentinel_; };  #define INSERT(subtree1, substree2) /    ret = insert_(key, value, treap->subtree1); /    if (treap->subtree1->priority > treap->priority) { /      treap = substree2##_rotation_(treap, treap->subtree1); /    }   int YedisTreap::init()  {    int ret = 0;    sentinel_ = root_ = nullptr;    YedisTreapNode *tmp = nullptr;    if (0 == (tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) {      ret = -1;    } else {      tmp->key = 0;      tmp->value = 0;      tmp->left = tmp->right = nullptr;      tmp->priority = YEDIS_INT32_MAX;      sentinel_ = tmp;    }    return 0;  }  int YedisTreap::insert(const int key, const int value)  {    int ret = 0;    if (nullptr == sentinel_) {      ret = -2;    } else if (nullptr == root_) {      YedisTreapNode *tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode)));      if (nullptr == tmp) {        ret = -1;      } else {        tmp->key = key;        tmp->value = value;        tmp->left = tmp->right = nullptr;        tmp->priority = key;        sentinel_->left = tmp;        root_ = tmp;      }    } else {      ret = insert_(key, value, root_);    }    return ret;  }  int YedisTreap::insert_(const int key, const int value, YedisTreapNode *&treap)  {    int ret = 0;    if (nullptr == treap) {      if (nullptr == (treap = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) {        ret = -1;      } else {        treap->key = key;        treap->value = value;        treap->left = treap->right = nullptr;        srand((int)time(NULL));        treap->priority = (int)rand();      }    } else if(treap->key > key) {      INSERT(left, right);    } else if(treap->key < key) {      INSERT(right, left);    } else if (treap->priority & 1) {      INSERT(right, left);//duplicate elements    } else {      INSERT(left, right);//duplicate elements    }    return ret;  }  YedisTreapNode *YedisTreap::left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2)  {    k1->right = k2->left;    k2->left = k1;    return k2;  }  YedisTreapNode *YedisTreap::right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2)  {    k1->left = k2->right;    k2->right = k1;    return k2;  }  YedisTreapNode *YedisTreap::find(const int key)  {    YedisTreapNode *tmp = root_;    while(tmp) {      if (tmp->key > key) {        tmp = tmp->left;      } else if (tmp->key < key) {        tmp = tmp->right;      } else {        break;      }    }    return tmp;  }  #endif /* YEDIS_TREAP_H_ */ 

说明:

1,93-95行处理重复key的情形,为了不让某个子树过分倾斜,我们选择根据priority的奇偶性来决定把重复key插入到哪个子树。

2,通过设置一个具有最大优先级的sentinel节点,来简化处理和判断。

3,显然,递归地实现Treap比较方便和容易。

测试程序

本次PK的对象Skip List的代码,来自我的朋友我的上铺叫路遥之前写的一个 实现

测试程序则是在他之前写的 文件 的基础上稍作修改。(主要是修改为更准确的计时方式)。

测试数据则是随机打乱的10,000,000个元素。

对于Treap的测试程序如下:

/*  * test_treap.cpp */ #include "treap.h" #include <stdlib.h> #include <time.h> #include <iostream> #include <vector> #include <algorithm> using namespace std; #define N 10000000 int main() {   int i;   int *key = (int*) malloc(N * sizeof(int));   vector<int> tmp;   cout<<"generating data for test..."<<endl;   for (i = 0; i < N; i++) {     tmp.push_back(i);   }   //通过random_shuffle打乱数据   std::random_shuffle(tmp.begin(),tmp.end());   for (i = 0; i < N; i++) {     key[i] = tmp[i];   }   cout<<"generating data for test successfully..."<<endl;   YedisTreap *p = new YedisTreap();   p->init();   timespec t1,t2;   cout<<"starting to insert data to treap"<<endl;   clock_gettime(CLOCK_MONOTONIC, &t1);   for (i = 0; i < N; i++) {     p->insert(key[i], key[i]);   }   clock_gettime(CLOCK_MONOTONIC, &t2);   long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;   cout<<"Insertion takes "<<cost1<<"ns"<<endl;   cout<<"starting to search data in treap"<<endl;   clock_gettime(CLOCK_MONOTONIC, &t1);   for (i = 0; i < N; i++) {     YedisTreapNode *node = p->find(key[i]);     if (!node) cout<<"so bad"<<endl;   }   clock_gettime(CLOCK_MONOTONIC, &t2);   long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;   cout<<"Searching takes "<<cost2<<"ns"<<endl;   return 0; } 

Skip List的测试程序如下:

/*  * test_skiplist.cpp */ #include<stdio.h> #include<stdlib.h> #include <stdlib.h> #include <time.h> #include <iostream> #include <vector> #include <algorithm> #include "skiplist.h" using namespace std; #define N 10000000 int main() {   int i;   int *key = (int*) malloc(N * sizeof(int));   vector<int> tmp;   cout<<"generating data for test..."<<endl;   for (i = 0; i < N; i++) {     tmp.push_back(i);   }   std::random_shuffle(tmp.begin(),tmp.end());   for (i = 0; i < N; i++) {     key[i] = tmp[i];   }   cout<<"generating data for test successfully..."<<endl;   struct skiplist *list = skiplist_new();   cout<<"starting to insert data to skip list"<<endl;   timespec t1,t2;   clock_gettime(CLOCK_MONOTONIC, &t1);   for (i = 0; i < N; i++) {     skiplist_insert(list, key[i], key[i]);   }   clock_gettime(CLOCK_MONOTONIC, &t2);   long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;   cout<<"Insertion takes "<<cost1<<"ns"<<endl;   cout<<"starting to search data in skip list"<<endl;   clock_gettime(CLOCK_MONOTONIC, &t1);   for (i = 0; i < N; i++) {     struct skipnode *node = skiplist_search(list, key[i]);     if (!node) cout<<"so bad"<<endl;   }   clock_gettime(CLOCK_MONOTONIC, &t2);   long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;   cout<<"Searching takes "<<cost2<<"ns"<<endl;   skiplist_delete(list);   return 0; } 

编译链接这两个测试程序时,请记得加 -lrt 选项。

测试结果

测试环境是Ubuntu 14.04 64位系统 + 16GB内存 + gcc4.8

未开启优化:

SkipList:插入,27783017437ns = 27s,查找,30369272721ns = 30s

Treap:插入,36651505605ns = 36s,查找,15831823811ns = 15s

开启O3优化后:

SkipList:插入,25877643715ns = 25s,查找,28741237934ns = 28s

Treap:插入,35317025584ns = 35s,查找,14883111063ns = 14s

可以看出在查找上,Treap的性能确实是可圈可点的。

Further Thinking

1,为什么Treap的查找会这么高效?

针对本次测试数据,对find函数进行统计分析,发现查找的平均深度是29,而lg(10000000) = 23.2。也就是说本次测试时平均深度是$1.26lgn$。BTW,根据分析,AVL树的最大深度不会超过$1.44lgn$,对于Red-Black Tree,这个值大概在$2lg(n+1)$。

2,如何优化Treap的插入性能?

可能可以实施的点包括优化随机数生成程序、将递归改为非递归实现、优化分支预测等。具体怎么做?无从下手?建议您阅读我的这篇博客。

参考文献

1,Mark Allen Weiss的《Data Structures & Algorithm Analysis in C++》中介绍了Treap,这也是我第一次接触和知道Treap的地方。

2,《Introduction to Algorithms》中在某个章节里,以习题的形式介绍了Treap。

3,之前陈利人童鞋在微博上推荐了某大学的某学生写的Treap资料,他们都说好。我没看,也没兴趣。

原文  http://www.yebangyu.org/blog/2016/05/07/treappkskiplist/
正文到此结束
Loading...