转载

Dubbo注册中心(ZooKeeper、Redis)实现原理

在Dubbo微服务体系中,注册中心是其核心组件之一。Dubbo通过注册中心实现了分布式环境中各服务之间的注册和发现,是各分布式节点之间的纽带。其主要作用如下:

  1. 动态加入。一个服务提供者通过注册中心可以动态地把自己暴露给其他消费者,无需消费者逐个去更新配置文件。
  2. 动态发现。一个消费者可以动态的感知新的配置,路由规则和新的服务提供者,无需重启服务使之生效。
  3. 动态调整。注册中心支持参数的动态调整,新参数自动更新到所有相关服务节点。
  4. 统一配置。避免了本地配置导致每个服务的配置不一致问题。

工作流程

  1. 服务提供者启动时,会向注册中心写入自己的元数据信息,并订阅配置元数据信息。
  2. 消费者启动时,会向注册中心写入自己的元数据信息,并订阅服务提供者,路由和配置元数据信息。
  3. 服务治理中心(dubbo-admin)启动时,会同时订阅所有消费者、服务提供者、路由和配置元数据信息。
  4. 当有服务提供者离开或者加入时,注册中心服务提供者目录会发生变化,变化信息会动态通知消费者、服务治理中心。
  5. 当消费者发起服务调用时,会异步将统计信息等上报给监控中心(dubbo-monitor-simple)。

原理概述

ZooKeeper原理概述

目录结构:

+ /dubbo                  // 根目录,默认dubbo
+-- service               // 服务目录,如:com.example.dubbo.demo.spi.EchoService
	+-- providers         // 服务提供者目录,下面包含的接口有多个服务者URL元数据信息(IP、端口、权重和应用名等信息)
	+-- consumers         // 服务消费者目录,下面包含的接口有多个消费者URL元数据信息(IP、端口、权重和应用名等信息)
	+-- routers           // 路由配置目录,下面包含多个用于消费者路由策略URL元数据信息
	+-- configurators     // 动态配置目录,下面包含多个用于服务者动态配置URL元数据信息
复制代码

目录包含信息:

目录名称 储存值样例
/dubbo/service/providers dubbo://192.168.0.1.20880/com.alibaba.demo.Service?category=providers&key=value&...
/dubbo/service/consumers dubbo://192.168.0.1.5002/com.alibaba.demo.Service?category=consumers&key=value&...
/dubbo/service/routers dubbo://0.0.0.0/com.alibaba.demo.Service?category=routers&key=value&...
/dubbo/service/configurators dubbo://0.0.0.0/com.alibaba.demo.Service?category=configurators&key=value&...

Redis原理概述

Redis也沿用了Dubbo抽象的Root、Service、Type、URL四层结构。采用Hash结构存储。

key field timeout
/dubbo/com.alibaba.demo.Service URL 10000

订阅/发布

ZooKeeper

发布的实现

服务提供者和消费者都需要把自己注册到注册中心。服务提供者的注册是为了让消费者感知服务的存在,从而发起远程调用;也让服务治理中心感知有新的服务提供者上线。消费者的发布是为了让服务治理中心可以发现自己。

// ZookeeperRegistry 
public class ZookeeperRegistry extends FailbackRegistry {

	// 注册即调用ZooKeeper客户端在注册中心创建了一个目录
	@Override
	public void doRegister(URL url) {
	    try {
	        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
	    } catch (Throwable e) {
	        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
	    }
	}
	
	// 取消发布即调用ZooKeeper客户端在注册中心删除对应目录
	@Override
	public void doUnregister(URL url) {
	    try {
	        zkClient.delete(toUrlPath(url));
	    } catch (Throwable e) {
	        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
	    }
	}
}
复制代码

订阅的实现

  1. 订阅通常有pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送数据给客户端。这两种方式各有利弊,目前Dubbo采用的是第一次启动pull的方式,后续接收事件重新pull数据。
  2. ZooKeeper注册中心采用的是“事件通知” + “客户端拉取”的方式,客户端第一次连接上注册中心的时候会获取对应目录下的全量数据。并在订阅的节点上注册一个watcher,客户端与注册中心保持TCP长连接,后续每个节点有任何数据变化的时候,注册中心会根据watcher的回调主动通知客户端(事件通知),客户端接收到通知后,会把对应节点下的全量数据都拉取下来(客户端拉取)。
  3. ZooKeeper每个节点都有一个版本号,当某个节点数据发生变化(事务操作)的时候,该节点对应的版本号就会发生变化,并触发watcher事件,推送数据给订阅方。版本号强调的是 变更次数 ,即使该节点的值没有变化,只有更新操作,依然会是版本号变化。

事务操作 客户端任何新增、修改、删除、会话创建和失效操作,都会被认为是事务操作,会由ZooKeeper集群中的leader执行,即使客户端连接的是非leader节点,请求也会被转发给leader执行,以此来保证所有事务操作的全局时序性。由于每个节点都有一个版本号,因此可以通过CAS操作比较版本号来保证该节点数据操作的原子性。

  1. 客户端第一次连上注册中心,订阅时会获取全量的数据,后续则通过监听器事件进行更新。服务治理中心会处理所有service层的订阅,service被设置成特殊值*。此外,服务治理中心除了订阅当前节点,还会订阅这个节点下的所有子节点。
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 判断是否为全量订阅
        if (ANY_VALUE.equals(url.getServiceInterface())) {
        	// 获取根路径
            String root = toRootPath();
            // listeners 为空说明缓存没有命中,这里把listeners 放入缓存
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            // zkListener 为空则证明是第一次,新建一个listener
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
            	// 遍历所有子节点
                for (String child : currentChilds) {
                    child = URL.decode(child);
                    // 如果存在子节点还未被订阅,说明是新的节点,则订阅
                    if (!anyServices.contains(child)) {
                        anyServices.add(child);
                        subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                Constants.CHECK_KEY, String.valueOf(false)), k);
                    }
                }
            });
            // 创建持久节点,开始订阅持久节点下的直接子节点
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
            	// 遍历所有子节点进行订阅
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 增加当前节点的订阅,并返回该节点下所有子节点列表
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
        	// 非全量订阅(普通消费者订阅场景)
            List<URL> urls = new ArrayList<>();
            // 根据url获取订阅路径
            for (String path : toCategoriesPath(url)) {
            	// listeners 为空说明缓存没有命中,这里把listeners 放入缓存
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                // zkListener 为空则证明是第一次,新建一个listener
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                zkClient.create(path, false);
                // 订阅,返回该节点下的子路径并缓存
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 回调NotifyListener,更新本地缓存
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

// 根据URL获取订阅类型
private String[] toCategoriesPath(URL url) {
    String[] categories;
    if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
        categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
    } else {
    	// 非全量订阅指定类别provides,DEFAULT_CATEGORY = PROVIDERS_CATEGORY
        categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
    }
    String[] paths = new String[categories.length];
    for (int i = 0; i < categories.length; i++) {
        paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
    }
    return paths;
}
复制代码
  1. URL中的属性值类别
    1. provides:订阅方会更新本地Directory管理的Invoker服务列表;
    2. routers:订阅方会更新本地路由规则列表;
    3. configurators:订阅方会更新或覆盖本地动态参数列表。

Redis

发布/订阅机制

Redis订阅发布使用的是过期机制和publish/subscribe通道。服务提供者发布服务,首先会在Redis中创建一个key,然后在通道中发布一条register时间消息。但服务的key写入到Redis后,发布者需要周期性地刷新key的过期时间,在RedisRegistry构造方法中会启动一个expireExecutor定时调度线程池,不断调用deferExpired()方法延续key的超时时间。如果服务提供者服务宕机,没有续期,则key会因为超时而被Redis删除,服务也就被认定为下线。

主动/被动下线

  1. 服务提供者主动下线:会在通道中广播一条unregister事件消息,订阅方收到后则从注册中心拉取数据,更新本地缓存的服务列表。
  2. 服务提供者被动下线:服务器宕机等原因没有续期,导致key过期,此时是不会有动态消息推送的,在使用Redis为注册中心的时候,会依赖于服务治理中心。如果服务治理中心定时调度,则还会触发清理逻辑:获取Redis上所有的key进行遍历,如果发现key已经超时了,则删除Redis上对应的key。清除完后,还会在通道中发布对应key的unregister事件,其他消费者监听到取消注册事件后会删除本地对应服务器的数据,从而保证数据的最终一致。
// 构造方法启动定时调度线程池以(过期时间 / 2)的频率续签自己
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
   try {
        deferExpired(); // Extend the expiration time
    } catch (Throwable t) { // Defensive fault tolerance
        logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
    }
    // 过期时间 / 2
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);

// 续期
private void deferExpired() {
    for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
        Pool<Jedis> jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                for (URL url : new HashSet<>(getRegistered())) {
                    if (url.getParameter(DYNAMIC_KEY, true)) {
                        String key = toCategoryPath(url);
                        // 不断续签自己
                        if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                            jedis.publish(key, REGISTER);
                        }
                    }
                }
                // 如果是服务治理中心,则只需clean操作
                if (admin) {
                    clean(jedis);
                }
                // 非replicate只需要写一个节点
                if (!replicate) {
                    break;
                }
            }
        } catch (Throwable t) {
            logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
        }
    }
}
复制代码

发布的实现

for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
	// 获取连接池
    Pool<Jedis> jedisPool = entry.getValue();
    try {
        try (Jedis jedis = jedisPool.getResource()) {
        	// 设置key和过期时间
            jedis.hset(key, value, expire);
            // 发布注册消息
            jedis.publish(key, REGISTER);
            success = true;
            // 非replicate只需要写一个节点
            if (!replicate) {
                break; 
            }
        }
    } catch (Throwable t) {
        exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
    }
}
复制代码

订阅的实现

如果是首次订阅,则会创建一个Notifier内部类,这是一个线程类,在启动时会异步进行通道的订阅。在启动Notifier线程的同时,主线程会继续往下执行,全量拉取一次注册中心上所有的服务信息。后续注册中心上的信息变更则通过Notifier线程订阅的通道推送时间来实现。

if (service.endsWith(ANY_VALUE)) {
	// 服务治理中心,订阅所有服务
    if (first) {
        first = false;
        Set<String> keys = jedis.keys(service);
        if (CollectionUtils.isNotEmpty(keys)) {
            for (String s : keys) {
            	// 首次触发通知设置本地缓存
                doNotify(jedis, s);
            }
        }
        resetSkip();
    }
    // 订阅服务
    jedis.psubscribe(new NotifySub(jedisPool), service);
} else {
    if (first) {
        first = false;
        // 首次触发通知设置本地缓存
        doNotify(jedis, service);
        resetSkip();
    }
    // 订阅服务
    jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE);
}
复制代码

缓存机制

消费者或者服务治理中心获取注册信息后会做本地缓存。内存中会有一份,保存在Properties对象里,磁盘上也会持久化一份文件,通过file对象引用。

class AbstractRegistry
// 本地磁盘缓存,其中特殊键value.registries记录注册表中心列表,其他是已通知服务提供者的列表
private final Properties properties = new Properties();
// 文件缓存异步定时写入
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// 是否同步保存文件
private boolean syncSaveFile;
// 本地缓存变更版本
private final AtomicLong lastCacheChanged = new AtomicLong();
// 内存中的服务缓存对象,与Redis存储方式相似,(key:url field:category value:服务列表)
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
// 本地磁盘缓存文件
private File file;
复制代码

缓存的加载

在服务初始化的时候,AbstractRegistry构造函数会从本地磁盘文件中把持久化的注册数据督导Properties对象里,并加载到内存缓存中。Properties保存了所有服务提供者的URL,使用URL#serviceKey()作为key,提供者列表、路由规则列表、配置规则列表等作为value。由于value是列表,当存在多个的时候使用空格隔开。还有一个特殊的 key.registies ,保存所有的注册中心的地址, 如果应用在启动过程中,注册中心无法连接或者宕机,则Dubbo框架会自动通过本地缓存加载Invokers

// 缓存文件命名规则:
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";

// 示例
dubbo-registry-demo-provider-127.0.0.1-2181.cache

// 文件内容示例
#Dubbo Registry Cache
#${time}
com.example.dubbo.demo.service.EchoService=empty/://192.168.0.113/:20880/com.example.dubbo.demo.service.EchoService?anyhost/=true&application/=demo-provider&bind.ip/=192.168.0.113&bind.port/=20880&category/=configurators✓/=false&deprecated/=false&dubbo/=2.0.2&dynamic/=true&generic/=false&interface/=com.example.dubbo.demo.service.EchoService&metadata-type/=remote&methods/=hello&pid/=10192&release/=2.7.6&side/=provider&timestamp/=${timestamp}
复制代码

缓存的保存和更新

缓存的保存分为同步/异步。异步会使用线程池异步保存(registryCacheExecutor)如果线程在执行过程中出现异常,则会再次调用线程池不断重试。 AbstractRegistry#notify中封装了内存缓存和更新文件缓存的逻辑,当客户端第一次订阅获取全量数据,或者后续由于订阅得到新数据时,都会调用该方法进行保存。

// 获取最后变更版本
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
	// 同步保存
    doSaveProperties(version);
} else {
	// 异步保存
    registryCacheExecutor.execute(new SaveProperties(version));
}
复制代码

重试机制

ZooKeeperRegistry和RedisRegistry均继承FailbackRegistry,FailbackRegistry继承AbstractRegistry。

Dubbo注册中心(ZooKeeper、Redis)实现原理
Dubbo注册中心(ZooKeeper、Redis)实现原理
FailbackRegistry在AbstractRegistry基础上增加了失败重试机制作为抽象能力,子类可以直接使用。 FailbackRegistry抽象类中定义了一个 ScheduledThreadPoolExecutor

,每经过固定间隔(默认5s)调用FailbackRegistry#retry()方法,对失败集合进行重试,成功则移出队列。FailbackRegistry实现了subscribe,unsubscribe等通用方法,里面调用了未实现的模板方法,会由子类实现。通用方法会调用这些模板方法,如果捕获到异常,则会把URL添加到对应的重试集合中,以供定时器去重试。

// 发起注册失败的URL集合
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();

// 取消注册失败的URL集合
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();

// 发起订阅失败的URL集合
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();

// 取消订阅失败的URL集合
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

// 通知失败的URL集合
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
复制代码
原文  https://juejin.im/post/5f0d6fbf5188252e4839baed
正文到此结束
Loading...