一致性哈希最早由 MIT的 Karger 提出,在发表于1997年的论文 Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web , Karger et al 和合作者们提出了一致性哈希的概念(consistent hash),用来解决分布式Cache的问题。这篇论文中提出在动态变化的Cache环境中,哈希算法应该满足的4个适应条件::Balance(均衡)、Monotonicity(单调性)、Spread(分散性)、Load(负载)。
在分布式缓存系统中使用一致性哈希算法时,某个节点的添加和移除不会重新分配全部的缓存,而只会影响小部分的缓存系统,如果均衡性做的好的话,当添加一个节点时,会均匀地从其它节点移一部分缓存到新的节点上;当删除一个节点的时候,这个节点上的缓存会均匀地分配到其它活着的节点上。
一致性哈希缓存还被扩展到分布式存储系统上。数据被分成一组 Shard ,每个Shard由一个节点管理,当需要扩容时,我们可以添加新的节点,然后将其它Shard的一部分数据移动到这个节点上。比如我们有10个Shard的分布式存储系统,当前存储了120个数据,每个Shard存储了12个数据。当扩容成12个Shard时,我们从每个Shard上拿走2个数据,存入到新的两个Shard上,这样每个Shard都存储了10个数据,而整个过程中我们只移动了20/120=1/6的数据。
Karger 一致性哈希算法将每个节点(bucket)关联一个圆环上的一些随机点,对于一个键值,将其映射到圆环中的一个点上,然后按照顺时针方向找到第一个关联bucket的点,将值放入到这个bucke中。因此你需要存储一组bucket和它们的关联点,当bucket以及每个bucket的关联点很多的时候,你就需要多一点的内存来记录它。这个你经常在网上看到的介绍一致性哈希的算法(有些文章将节点均匀地分布在环上,比如节点1节点2节点3节点4节点1节点2节点3节点4……, 这是不对的,在这种情况下节点2挂掉后它上面的缓存全部转移给节点3了)。
其它的一致性算法还有 Rendezvous hashing , 计算一个key应该放入到哪个bucket时,它使用哈希函数h(key,bucket)计算每个候选bucket的值,然后返回值最大的bucket。buckets比较多的时候耗时也较长,有人也提出了一些改进的方法,比如将bucket组织成tree的结构,但是在reblance的时候花费时间又长了。
Java程序员熟悉的Memcached的客户端Spymemcached、Xmemcached以及Folsom都提供了Ketama算法。其实Ketama算法最早于2007年用c 实现(libketama),很多其它语言也实现了相同的算法,它是基于Karger 一致性哈希算法实现:
以上两种算法可以处理节点增加和移除的情况。对于分布式存储系统,当一个节点失效时,我们并不期望它被移除,而是使用备份节点替换它,或者将它恢复起来,因为我们不期望丢掉它上面的数据。对于这种情况(节点可以扩容,但是不会移除节点),Google的 John Lamping, Eric Veach提供一个高效的几乎不占用持久内存的算法: Jump Consistent Hash 。
Jump Consistent Hash算法的特点是:
C语言实现的Jump Consistent Hash如下:
1 2 3 4 5 6 7 8 9 10 | int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) { int64_t b = -1, j = 0; while (j < num_buckets) { b = j; key = key * 2862933555777941757ULL + 1; j = (b + 1) * (double(1LL << 31)/double((key >> 33) + 1)); } return b; } |
可以看出这个算法非常的简单,因此也很容易的用Go来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | func JumpHash(key uint64, buckets int) int { var b, j int64 if buckets <= 0 { buckets = 1 } for j < int64(buckets) { b = j key = key*2862933555777941757 + 1 j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1))) } return int(b) } |
我们可以写段代码测试它,看看它的分布是否均匀,在新增加一个节点的时候,是否只移动了一部分的数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | package main import "fmt" func main() { buckets := make(map[int]int, 10) count := 10 for i := uint64(0); i < 120000; i++ { b := JumpHash(i, count) buckets[b] = buckets[b] + 1 } fmt.Printf("buckets: %v/n", buckets) //add two buckets count = 12 for i := uint64(0); i < 120000; i++ { oldBucket := JumpHash(i, count-2) newBucket := JumpHash(i, count) //如果对象需要移动到新的bucket中,则首先从原来的bucket删除,再移动 if oldBucket != newBucket { buckets[oldBucket] = buckets[oldBucket] - 1 buckets[newBucket] = buckets[newBucket] + 1 } } fmt.Printf("buckets after add two servers: %v/n", buckets) } |
因为Jump consistent hash算法不使用节点挂掉,如果你真的有这种需求,比如你要做一个缓存系统,你可以考虑使用ketama算法,或者对Jump consistent hash算法改造一下:节点挂掉时我们不移除节点,只是标记这个节点不可用。当选择节点时,如果选择的节点不可用,则再一次Hash,尝试选择另外一个节点,比如下面的算法将key加1再进行选择。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | func JumpHash(key uint64, buckets int, checkAlive func(int) bool) int { var b, j int64 = -1, 0 if buckets <= 0 { buckets = 1 } for j < int64(buckets) { b = j key = key*2862933555777941757 + 1 j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1))) } if checkAlive != nil && !checkAlive(int(b)) { return JumpHash(key+1, buckets, checkAlive) //最好设置深度,避免key+1一直返回当掉的服务器 } return int(b) } |
上面的算法有一点问题,就是没有设定重试的测试,如果所有的节点都挂掉,则会进入死循环,所以最好设置一下重试次数(递归次数),超过n次还没有选择到则返回失败。
转载请注明文章来源: