Yedis 是一款高性能的nosql数据库,旨在能在某些方面替代Redis。它由不著名码农、秦汉史历史学家、本站站长Yebangyu同学在业余时间独立开发完成。
Github请访问 这里 ,Python客户端请点击 这里
Yedis也同样支持rank功能,但是所使用的数据结构是Treap(Redis使用的是Skip List)。这两种都是概率性的高级数据结构,具体性能依赖于随机数的选择。
那么,什么是Treap呢?
(图片来源于WikiMedia)
简单说来,Treap是这样的树:Treap中的每个节点至少包含key和优先级两个字段,其中key满足搜索树性质,优先级满足堆序性。如上图所示,数字是优先级字段,字母是key字段。其中优先级构成了一个大根堆。
也就是说,在Treap中,key组成了一个二叉搜索树,优先级组成了一个堆。所谓Treap = Tree + Heap。
插入一个节点时,随机生成一个优先级。这时候可能堆序性被破坏,而这可以通过旋转来恢复。由于只可能有左右单旋转两种情形,因此它的代码编写比AVL树、Red-Black Tree要简单的多,并且可以证明旋转的期望次数小于2。
在Treap上的删除、插入、查找的期望时间复杂度都是O($lgn$)。
/* * treap.h */ #ifndef YEDIS_TREAP_H_ #define YEDIS_TREAP_H_ #include <stdlib.h> #include <time.h> #define YEDIS_INT32_MAX 2147483647 #define nullptr 0 struct YedisTreapNode { int key;//binary search tree int value; YedisTreapNode *left; YedisTreapNode *right; int priority;//max heap }; class YedisTreap { public: int init(); YedisTreapNode *find(const int key); int insert(const int key, const int value); private: int insert_(const int key, const int value,YedisTreapNode *&treap); YedisTreapNode *left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2); YedisTreapNode *right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2); private: YedisTreapNode *root_; YedisTreapNode *sentinel_; }; #define INSERT(subtree1, substree2) / ret = insert_(key, value, treap->subtree1); / if (treap->subtree1->priority > treap->priority) { / treap = substree2##_rotation_(treap, treap->subtree1); / } int YedisTreap::init() { int ret = 0; sentinel_ = root_ = nullptr; YedisTreapNode *tmp = nullptr; if (0 == (tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) { ret = -1; } else { tmp->key = 0; tmp->value = 0; tmp->left = tmp->right = nullptr; tmp->priority = YEDIS_INT32_MAX; sentinel_ = tmp; } return 0; } int YedisTreap::insert(const int key, const int value) { int ret = 0; if (nullptr == sentinel_) { ret = -2; } else if (nullptr == root_) { YedisTreapNode *tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))); if (nullptr == tmp) { ret = -1; } else { tmp->key = key; tmp->value = value; tmp->left = tmp->right = nullptr; tmp->priority = key; sentinel_->left = tmp; root_ = tmp; } } else { ret = insert_(key, value, root_); } return ret; } int YedisTreap::insert_(const int key, const int value, YedisTreapNode *&treap) { int ret = 0; if (nullptr == treap) { if (nullptr == (treap = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) { ret = -1; } else { treap->key = key; treap->value = value; treap->left = treap->right = nullptr; srand((int)time(NULL)); treap->priority = (int)rand(); } } else if(treap->key > key) { INSERT(left, right); } else if(treap->key < key) { INSERT(right, left); } else if (treap->priority & 1) { INSERT(right, left);//duplicate elements } else { INSERT(left, right);//duplicate elements } return ret; } YedisTreapNode *YedisTreap::left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2) { k1->right = k2->left; k2->left = k1; return k2; } YedisTreapNode *YedisTreap::right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2) { k1->left = k2->right; k2->right = k1; return k2; } YedisTreapNode *YedisTreap::find(const int key) { YedisTreapNode *tmp = root_; while(tmp) { if (tmp->key > key) { tmp = tmp->left; } else if (tmp->key < key) { tmp = tmp->right; } else { break; } } return tmp; } #endif /* YEDIS_TREAP_H_ */
说明:
1,93-95行处理重复key的情形,为了不让某个子树过分倾斜,我们选择根据priority的奇偶性来决定把重复key插入到哪个子树。
2,通过设置一个具有最大优先级的sentinel节点,来简化处理和判断。
3,显然,递归地实现Treap比较方便和容易。
本次PK的对象Skip List的代码,来自我的朋友我的上铺叫路遥之前写的一个 实现
测试程序则是在他之前写的 文件 的基础上稍作修改。(主要是修改为更准确的计时方式)。
测试数据则是随机打乱的10,000,000个元素。
对于Treap的测试程序如下:
/* * test_treap.cpp */ #include "treap.h" #include <stdlib.h> #include <time.h> #include <iostream> #include <vector> #include <algorithm> using namespace std; #define N 10000000 int main() { int i; int *key = (int*) malloc(N * sizeof(int)); vector<int> tmp; cout<<"generating data for test..."<<endl; for (i = 0; i < N; i++) { tmp.push_back(i); } //通过random_shuffle打乱数据 std::random_shuffle(tmp.begin(),tmp.end()); for (i = 0; i < N; i++) { key[i] = tmp[i]; } cout<<"generating data for test successfully..."<<endl; YedisTreap *p = new YedisTreap(); p->init(); timespec t1,t2; cout<<"starting to insert data to treap"<<endl; clock_gettime(CLOCK_MONOTONIC, &t1); for (i = 0; i < N; i++) { p->insert(key[i], key[i]); } clock_gettime(CLOCK_MONOTONIC, &t2); long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec; cout<<"Insertion takes "<<cost1<<"ns"<<endl; cout<<"starting to search data in treap"<<endl; clock_gettime(CLOCK_MONOTONIC, &t1); for (i = 0; i < N; i++) { YedisTreapNode *node = p->find(key[i]); if (!node) cout<<"so bad"<<endl; } clock_gettime(CLOCK_MONOTONIC, &t2); long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec; cout<<"Searching takes "<<cost2<<"ns"<<endl; return 0; }
Skip List的测试程序如下:
/* * test_skiplist.cpp */ #include<stdio.h> #include<stdlib.h> #include <stdlib.h> #include <time.h> #include <iostream> #include <vector> #include <algorithm> #include "skiplist.h" using namespace std; #define N 10000000 int main() { int i; int *key = (int*) malloc(N * sizeof(int)); vector<int> tmp; cout<<"generating data for test..."<<endl; for (i = 0; i < N; i++) { tmp.push_back(i); } std::random_shuffle(tmp.begin(),tmp.end()); for (i = 0; i < N; i++) { key[i] = tmp[i]; } cout<<"generating data for test successfully..."<<endl; struct skiplist *list = skiplist_new(); cout<<"starting to insert data to skip list"<<endl; timespec t1,t2; clock_gettime(CLOCK_MONOTONIC, &t1); for (i = 0; i < N; i++) { skiplist_insert(list, key[i], key[i]); } clock_gettime(CLOCK_MONOTONIC, &t2); long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec; cout<<"Insertion takes "<<cost1<<"ns"<<endl; cout<<"starting to search data in skip list"<<endl; clock_gettime(CLOCK_MONOTONIC, &t1); for (i = 0; i < N; i++) { struct skipnode *node = skiplist_search(list, key[i]); if (!node) cout<<"so bad"<<endl; } clock_gettime(CLOCK_MONOTONIC, &t2); long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec; cout<<"Searching takes "<<cost2<<"ns"<<endl; skiplist_delete(list); return 0; }
编译链接这两个测试程序时,请记得加 -lrt
选项。
测试环境是Ubuntu 14.04 64位系统 + 16GB内存 + gcc4.8
未开启优化:
SkipList:插入,27783017437ns = 27s,查找,30369272721ns = 30s
Treap:插入,36651505605ns = 36s,查找,15831823811ns = 15s
开启O3优化后:
SkipList:插入,25877643715ns = 25s,查找,28741237934ns = 28s
Treap:插入,35317025584ns = 35s,查找,14883111063ns = 14s
可以看出在查找上,Treap的性能确实是可圈可点的。
1,为什么Treap的查找会这么高效?
针对本次测试数据,对find函数进行统计分析,发现查找的平均深度是29,而lg(10000000) = 23.2。也就是说本次测试时平均深度是$1.26lgn$。BTW,根据分析,AVL树的最大深度不会超过$1.44lgn$,对于Red-Black Tree,这个值大概在$2lg(n+1)$。
2,如何优化Treap的插入性能?
可能可以实施的点包括优化随机数生成程序、将递归改为非递归实现、优化分支预测等。具体怎么做?无从下手?建议您阅读我的这篇博客。
1,Mark Allen Weiss的《Data Structures & Algorithm Analysis in C++》中介绍了Treap,这也是我第一次接触和知道Treap的地方。
2,《Introduction to Algorithms》中在某个章节里,以习题的形式介绍了Treap。
3,之前陈利人童鞋在微博上推荐了某大学的某学生写的Treap资料,他们都说好。我没看,也没兴趣。