在上一章节的内容中,我们分析了服务引用的具体流程。在大多数情况下,为避免单点故障,我们的应用会部署在多台服务器上。对于我们的Dubbo而言,就会出现多个服务提供者。而且这些服务也并非是一成不变的,那么就有这样一个问题: 有新的服务提供者加入或者禁用、修改已有的服务提供者,那么服务消费者怎么及时感知它们的变化呢?
或许你还有印象 ,在服务引用的时候,我们曾经有用到它。这个就是服务目录。 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
那么,什么是服务目录呢?
简单来说, 服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。
服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。多个服务提供者,就构成了服务目录中的一个Invoker 集合。
假设我们有三个服务提供者,那么服务目录,RegistryDirectory对象中保存的Invoker如下:
我们再看下这个类的继承关系:
我们看到,RegistryDirectory实现了Directory接口和NotifyListener接口。那么,我们重点关注两个实现。
上面我们看到,RegistryDirectory会保存服务提供者Invocation的集合。那么,就得有获取的方法。获取的方法很简单,就是从本地缓存中查询即可。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { public List<Invoker<T>> doList(Invocation invocation) { List<Invoker<T>> invokers = null; //方法名和List<Invoker<T>>的映射 Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { //获取请求的方法名,比如insertInfoUser String methodName = RpcUtils.getMethodName(invocation); //参数列表 Object[] args = RpcUtils.getArguments(invocation); if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { invokers = localMethodInvokerMap.get(methodName + "." + args[0]); } if (invokers == null) { // 通过方法名获取 Invoker 列表 invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { // 通过星号 * 获取 Invoker 列表 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } if (invokers == null) { Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; } } 复制代码
服务目录并非是一成不变的,如果有的新的服务提供者加入,或者剔除已有的服务提供者,那么服务目录需要及时更新信息。所以它实现了NotifyListener接口,用于刷新Invoker集合。
既然是通知,首先我们就要弄清楚它是在哪里触发的。以zookeeper为例,在服务引用的时候,它会监听服务提供者数据节点的数据变化。
public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } 复制代码
如上代码, toUrlsWithEmpty
方法会将当前最新的子节点数据转换成List对象,然后调用 ZookeeperRegistry.this.notify
,此方法最终将调用到服务目录中的刷新方法。
refreshInvoker 方法是保证 RegistryDirectory 随注册中心变化而变化的关键所在。
private void refreshInvoker(List<URL> invokerUrls) { //如果invokerUrls中只要一个元素,且协议头为empty,就销毁所有的服务 if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && "empty".equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; this.methodInvokerMap = null; //销毁所有服务引用 destroyAllInvokers(); } else { this.forbidden = false; //原有的urlInvokerMap Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { // 添加缓存 url 到 invokerUrls 中 invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); // 缓存 invokerUrls this.cachedInvokerUrls.addAll(invokerUrls); } if (invokerUrls.isEmpty()) { return; } //将invokerUrls转换为Invoker //这里会根据url中的协议名称调用对应的Protocol实现 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls); //将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("......"); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { //销毁无用的服务引用Invoker destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } 复制代码
如上代码,我们分为这样几个步骤来理解。
invokerUrls
toInvokers(invokerUrls)
方法。此方法根据URL中的协议名称,调用对应的Protocol实现,来引用服务。比如 DubboProtocol.refer
,它返回已经构建好的Invoker对象集合。 toMethodInvokers(newUrlInvokerMap)
方法,根据最新的InvokerMap,重置方法名和Invoker对象的映射关系。 methodInvokerMap
缓存。 我们刚刚看到,上面获取Invocation集合的时候,就是从 methodInvokerMap
对象中获取数据。那么在这里改变这个对象的属性,就相当于更新了服务目录的Invoker信息。
在更新完服务目录Invoker之后,还要销毁无用的服务引用Invoker。
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) { //如果新的Invoker为空,则销毁全部 if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { destroyAllInvokers(); return; } List<String> deleted = null; if (oldUrlInvokerMap != null) { Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values(); //遍历旧的Invoker for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) { //判断新的newInvokers是否包含旧的Invoker对象 if (!newInvokers.contains(entry.getValue())) { if (deleted == null) { deleted = new ArrayList<String>(); } // 若不包含,则将旧的 Invoker 对应的 url 存入 deleted 列表中 deleted.add(entry.getKey()); } } } if (deleted != null) { for (String url : deleted) { if (url != null) { // 从 oldUrlInvokerMap 中移除 url 对应的 Invoker Invoker<T> invoker = oldUrlInvokerMap.remove(url); if (invoker != null) { try { //调用销毁方法 invoker.destroy(); if (logger.isDebugEnabled()) { logger.debug("destory invoker[" + invoker.getUrl() + "] success. "); } } } } } } } 复制代码
以上代码看起来比较长,其实也很简单。就是通过对比两个InvokerMap,如果新的InvokerMap中不包含旧的InvokerMap中的节点,那么这个Invoker就是要被销毁的。
我们在上一章节分析服务引用的时候,我们提到了可以对服务引用进行监听。那么这里,就是触发服务销毁监听方法的地方。
public class ListenerInvokerWrapper<T> implements Invoker<T> { public void destroy() { try { invoker.destroy(); } finally { //自定义监听器 if (listeners != null && !listeners.isEmpty()) { for (InvokerListener listener : listeners) { if (listener != null) { try { //调用监听器方法 listener.destroyed(invoker); } catch (Throwable t) { logger.error(t.getMessage(), t); } } } } } } } 复制代码
服务目录是集群容错和负载均衡机制的基础部分,为什么这样说呢?
当有多个服务提供者的时候:
我们怎么选取其中的一个服务去调用,这是负载均衡机制 当调用服务失败后,我们怎么处理当前的请求?抛出异常亦或是重试?这是集群容错机制
有了服务目录,我们才能获取所有的服务提供者列表,并感知注册中心的数据变化,及时更新目录中的Invoker对象信息。