订阅和发布是注册中心的核心功能之一。当一个已有服务的提供者下线,或者新的提供者加入。订阅对应接口的消费者和服务治理中心能及时收到注册中心的通知,并更新本地配置信息。整个过程自动完成,不需要人工参与。
提供者和消费者都需要把自己注册到注册中心。
提供者的注册为了让消费者感知服务的存在,从而发起远程调用。也让服务治理中心感知有新的服务加入。
消费者的发布,为了让服务治理中心发现。
发布代码:
zkClinet.create(toUrlPath(url)) url.getParamter(Contants.DYNAMIC_KEY,true) 复制代码
zkClinet.delete(toUrlPath(url)) 复制代码
订阅通常有pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送给客户端。各有利弊,Dubbo采用的是第一次启动拉取,后续接收事件重新拉取数据。
在服务暴露时,服务端会订阅configurators用于监听动态配置。在消费端启动时,消费端会订阅providers,routers和configurators这三个目录,分别对应服务提供者、路由和动态配置变更通知。
dubbo定义了两种两种连接zookeeper客户端:Apache Curator 和zkClient。默认是Curator.
Zookeeper采取的是 时间通知和客户端拉取方式。
protected void doSubscribe(final URL url, final NotifyListener listener) { try { //全量订阅 if ("*".equals(url.getServiceInterface())) { String root = this.toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners == null) { this.zkListeners.putIfAbsent(url, new ConcurrentHashMap()); listeners = (ConcurrentMap)this.zkListeners.get(url); } ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { Iterator var3 = currentChilds.iterator(); while(var3.hasNext()) { String child = (String)var3.next(); child = URL.decode(child); if (!ZookeeperRegistry.this.anyServices.contains(child)) { ZookeeperRegistry.this.anyServices.add(child); ZookeeperRegistry.this.subscribe(url.setPath(child).addParameters(new String[]{"interface", child, "check", String.valueOf(false)}), listener); } } } }); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(root, false); List<String> services = this.zkClient.addChildListener(root, zkListener); if (services != null && services.size() > 0) { Iterator var7 = services.iterator(); while(var7.hasNext()) { String service = (String)var7.next(); service = URL.decode(service); this.anyServices.add(service); this.subscribe(url.setPath(service).addParameters(new String[]{"interface", service, "check", String.valueOf(false)}), listener); } } } else { //根据url的类别订阅 List<URL> urls = new ArrayList(); String[] var13 = this.toCategoriesPath(url); int var14 = var13.length; for(int var15 = 0; var15 < var14; ++var15) { String path = var13[var15]; ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners == null) { this.zkListeners.putIfAbsent(url, new ConcurrentHashMap()); listeners = (ConcurrentMap)this.zkListeners.get(url); } ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(path, false); List<String> children = this.zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(this.toUrlsWithEmpty(url, path, children)); } } this.notify(url, listener, urls); } } catch (Throwable var11) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + this.getUrl() + ", cause: " + var11.getMessage(), var11); } } 复制代码