转载

三、Apache Dubbo学习整理---注册中心(2)

订阅和发布是注册中心的核心功能之一。当一个已有服务的提供者下线,或者新的提供者加入。订阅对应接口的消费者和服务治理中心能及时收到注册中心的通知,并更新本地配置信息。整个过程自动完成,不需要人工参与。

(一)Zookeeper的实现

1、发布的实现

  • 提供者和消费者都需要把自己注册到注册中心。

  • 提供者的注册为了让消费者感知服务的存在,从而发起远程调用。也让服务治理中心感知有新的服务加入。

  • 消费者的发布,为了让服务治理中心发现。

  • 发布代码:

zkClinet.create(toUrlPath(url))
url.getParamter(Contants.DYNAMIC_KEY,true)
复制代码
  • 取消发布代码
zkClinet.delete(toUrlPath(url))
复制代码

2、订阅的实现

  • 订阅通常有pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送给客户端。各有利弊,Dubbo采用的是第一次启动拉取,后续接收事件重新拉取数据。

  • 在服务暴露时,服务端会订阅configurators用于监听动态配置。在消费端启动时,消费端会订阅providers,routers和configurators这三个目录,分别对应服务提供者、路由和动态配置变更通知。

  • dubbo定义了两种两种连接zookeeper客户端:Apache Curator 和zkClient。默认是Curator.

  • Zookeeper采取的是 时间通知和客户端拉取方式。

  • ①客户端在第一次连接上注册中心时,会获取对应目录下的全量数据。
  • ②并在订阅的节点注册一个watcher,客户端与注册中心保持TCP长连接,
  • ③后续节点有变化,注册中心会根据watcher的回调主动通知客户端。
  • ④客户端接到通知后,会把对应节点下的全量数据都拉取过来。 当服务节点过多时,会对网络造成很大的压力。
  • zookeeper维护一个节点的版本号,只要有变化,版本号会响应变化,记录变更次数。

Zookeeper的全量订阅

三、Apache Dubbo学习整理---注册中心(2)
  • 1、客户端在第一次连接时,使用全量订阅。之后通过监听事件进行更新。服务治理中心会订阅service层,而且还会订阅这个节点下的所有节点。
  • 2、核心代码来自ZookeeperRegistry。 -3、 全量订阅主要支持dubbo服务治理平台,在平台启动时会订阅全量接口,它会感知每个服务的状态。
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        //全量订阅
        if ("*".equals(url.getServiceInterface())) {
            String root = this.toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
            if (listeners == null) {
                this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
                listeners = (ConcurrentMap)this.zkListeners.get(url);
            }

            ChildListener zkListener = (ChildListener)listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        Iterator var3 = currentChilds.iterator();

                        while(var3.hasNext()) {
                            String child = (String)var3.next();
                            child = URL.decode(child);
                            if (!ZookeeperRegistry.this.anyServices.contains(child)) {
                                ZookeeperRegistry.this.anyServices.add(child);
                                ZookeeperRegistry.this.subscribe(url.setPath(child).addParameters(new String[]{"interface", child, "check", String.valueOf(false)}), listener);
                            }
                        }

                    }
                });
                zkListener = (ChildListener)listeners.get(listener);
            }

            this.zkClient.create(root, false);
            List<String> services = this.zkClient.addChildListener(root, zkListener);
            if (services != null && services.size() > 0) {
                Iterator var7 = services.iterator();

                while(var7.hasNext()) {
                    String service = (String)var7.next();
                    service = URL.decode(service);
                    this.anyServices.add(service);
                    this.subscribe(url.setPath(service).addParameters(new String[]{"interface", service, "check", String.valueOf(false)}), listener);
                }
            }
        } else {
            //根据url的类别订阅
            List<URL> urls = new ArrayList();
            String[] var13 = this.toCategoriesPath(url);
            int var14 = var13.length;

            for(int var15 = 0; var15 < var14; ++var15) {
                String path = var13[var15];
                ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
                if (listeners == null) {
                    this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
                    listeners = (ConcurrentMap)this.zkListeners.get(url);
                }

                ChildListener zkListener = (ChildListener)listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = (ChildListener)listeners.get(listener);
                }

                this.zkClient.create(path, false);
                List<String> children = this.zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(this.toUrlsWithEmpty(url, path, children));
                }
            }

            this.notify(url, listener, urls);
        }

    } catch (Throwable var11) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + this.getUrl() + ", cause: " + var11.getMessage(), var11);
    }
}
复制代码
原文  https://juejin.im/post/5f0353b2f265da22b360724d
正文到此结束
Loading...