Spymemcached 是 Memcached 的一个流行的Java client库(另一个比较著名的是原淘宝的 伯岩/庄晓丹 开发的 XMemcached , 他也开发另一个Taobao开源的项目 Metamorphosis ),性能表现出色,广泛应用于Java + Memcached 项目中。
Spymemcached 最早由 Dustin Sallings 开发,Dustin 后来和别人一起创办了Couchbase (原NorthScale),职位为首席架构师。2014加入Google。
本身Memcached没有集群的功能,客户端可以根据不同的key值set/get到不同的Memcached的节点上。 一致性Hash算法可以将数据均衡地分配到各个节点,并且在节点加入和退出的时候可以很好地将失效节点上的数据均衡的分配给其它节点。 Spymemcached使用Ketama算法。
但是,当 memcached 集群的一个节点因为某种原因宕机的时候,spymemcached 并没有正确的选择到另外一个live的节点,而是直接失败:
2015-11-2305:56:20.942WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108182. 2015-11-2305:56:20.944WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108254. 2015-11-2305:56:20.946WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108341. 2015-11-2305:56:20.947WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108352. 2015-11-2305:56:20.947WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108381. 2015-11-2305:56:20.948WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108407. 2015-11-2305:56:20.950WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108480. 2015-11-2305:56:20.952WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108552. 2015-11-2305:56:20.954WARN net.spy.memcached.MemcachedConnection: Couldnotredistributetoanother node, retrying primary nodeforff-108608.
如果使用XMemcached,则没有这种现象。spymemcached已经设置为一致性Hash的模式:
...... ConnectionFactoryBuilder builder = newConnectionFactoryBuilder(); builder.setHashAlg(DefaultHashAlgorithm.KETAMA_HASH); builder.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT); builder.setFailureMode(FailureMode.Redistribute); ......
原因在于当key对应的节点(称之为primary node宕机的时候),spymemcached会有限地选择另外一个节点:
if(primary.isActive() || failureMode == FailureMode.Retry) { placeIn = primary; } elseif(failureMode == FailureMode.Cancel) { o.cancel(); } else{ Iterator<MemcachedNode> i = locator.getSequence(key); while(placeIn ==null&& i.hasNext()) { MemcachedNode n = i.next(); if(n.isActive()) { placeIn = n; } } if(placeIn ==null) { placeIn = primary; this.getLogger().warn("Could not redistribute to another node, "+"retrying primary node for %s.", key); } }
其中 locator.getSequence(key)
最多会提供7个备选虚拟节点。
publicIterator<MemcachedNode>getSequence(String k) { // Seven searches gives us a 1 in 2^7 chance of hitting the // same dead node all of the time. returnnewKetamaIterator(k,7, getKetamaNodes(), hashAlg); }
但是,依照他的注释,在宕机的情况下,大约会有1/128的几率这七个虚拟节点都会指向这个宕机的primary node。实际上,下面的代码百分百会选择不到那个live ("127.0.0.1:11211")的节点:
@Test publicvoidtestMissingNode2() { List<MemcachedNode> nodes = newArrayList<MemcachedNode>(); nodes.add(createMockNode(newInetSocketAddress("127.0.0.1",11211))); nodes.add(createMockNode(newInetSocketAddress("127.0.0.1",11311))); KetamaNodeLocator locator = newKetamaNodeLocator(nodes, DefaultHashAlgorithm.KETAMA_HASH); Iterator<MemcachedNode> i = locator.getSequence("ff-108552"); Set<MemcachedNode> foundNodes = newHashSet<MemcachedNode>(); while(i.hasNext()) { foundNodes.add(i.next()); } // This fails. 127.0.0.1:11211 is never found. for(MemcachedNode node: nodes) { Assert.assertTrue(foundNodes.contains(node)); } } privateMemcachedNodecreateMockNode(InetSocketAddress sock) { MemcachedNode mockNode = EasyMock.createMock(MemcachedNode.class); EasyMock.expect(mockNode.getSocketAddress()).andReturn(sock).anyTimes(); EasyMock.replay(mockNode); returnmockNode; }
事实上,google groups上也有 讨论 , 原spymemcached的bug管理系统上也有相关的 bug ,但是问题并没有解决。
导致的问题是,某些缓存项在某个memcached节点宕机的时候,不能利用缓存系统,只能从其它持久化系统比如数据库中获取值。
问题找到,解决办法也就有了,修改getSequence方法,提供更多的节点共选择:
publicIterator<MemcachedNode>getSequence(String k) { // return new KetamaIterator(k, 7, getKetamaNodes(), hashAlg); intmaxTry = config.getNodeRepetitions() +1; if(maxTry <20) { maxTry = 20; } returnnewKetamaIterator(k, maxTry, getKetamaNodes(), hashAlg); }
另外,如何在运行时动态地增加新的memcached节点? 这篇文章给出了一个 解决方案
你不得不重载MemcachedClient, MemcachedConnection 和 DefaultConnectionFactory。作者未测试,不保证work。不管怎样,倒是一个思路。
ExtMemCachedConnection.java publicclassExtMemCachedConnectionextendsMemcachedConnection{ protectedfinalOperationFactory opFact; /** * Construct a memcached connection. * *@parambufSize the size of the buffer used for reading from the server *@paramf the factory that will provide an operation queue *@parama the addresses of the servers to connect to *@throwsjava.io.IOException if a connection attempt fails early */ publicExtendableMemcachedConnection(intbufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) throwsIOException { super(bufSize, f, a, obs, fm, opfactory); this.opFact = opfactory; } publicvoidadd(InetSocketAddress nodeAddress)throwsIOException { finalList<InetSocketAddress> nodeToAdd =newArrayList<InetSocketAddress>(1); nodeToAdd.add(nodeAddress); List<MemcachedNode> newNodesList = createConnections(nodeToAdd); newNodesList.addAll(getLocator().getAll()); getLocator().updateLocator(newNodesList); } //The node should be obtain from locator to ensure currentNode.equals(node) will return true publicvoidremove(MemcachedNode node)throwsIOException { for(MemcachedNode currentNode : getLocator().getAll()) { if(currentNode.equals(node)) { Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue(); if(currentNode.getChannel() !=null) { currentNode.getChannel().close(); currentNode.setSk(null); if(currentNode.getBytesRemainingToWrite() >0) { getLogger().warn("Shut down with %d bytes remaining to write", currentNode.getBytesRemainingToWrite()); } getLogger().debug("Shut down channel %s", currentNode.getChannel()); } //Unfortunatelly, redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation redistributeOperations(notCompletedOperations); } } } protectedvoidredistributeOperations(Collection<Operation> ops) { for(Operation op : ops) { if(op.isCancelled() || op.isTimedOut()) { continue; } if(opinstanceofKeyedOperation) { KeyedOperation ko = (KeyedOperation) op; intadded =0; for(String k : ko.getKeys()) { for(Operation newop : opFact.clone(ko)) { addOperation(k, newop); added++; } } assertadded >0:"Didn't add any new operations when redistributing"; } else{ // Cancel things that don't have definite targets. op.cancel(); } } } }
ExtMemcachedClient.java
publicvoidadd(InetSocketAddress nodeAddress) { if(mconninstanceofExtMemcachedConnection) { ((ExtMemcachedConnection)mconn).add(nodeAddress); } } publicbooleanremove(MemcachedNode node) { if(mconninstanceofExtMemcachedConnection) { ((ExtMemcachedConnection)mconn).remove(nodeAddress); } }
ExtMemcachedConnectionfactory.java
@Override publicMemcachedConnectioncreateConnection(List<InetSocketAddress> addrs)throwsIOException { returnnewExtendableMemcachedConnection(getReadBufSize(),this, addrs, getInitialObservers(), getFailureMode(), getOperationFactory()); }