在Dubbo微服务体系中,注册中心是其核心组件之一。Dubbo通过注册中心实现了分布式环境中各服务之间的注册和发现,是各分布式节点之间的纽带。其主要作用如下:
目录结构:
+ /dubbo // 根目录,默认dubbo +-- service // 服务目录,如:com.example.dubbo.demo.spi.EchoService +-- providers // 服务提供者目录,下面包含的接口有多个服务者URL元数据信息(IP、端口、权重和应用名等信息) +-- consumers // 服务消费者目录,下面包含的接口有多个消费者URL元数据信息(IP、端口、权重和应用名等信息) +-- routers // 路由配置目录,下面包含多个用于消费者路由策略URL元数据信息 +-- configurators // 动态配置目录,下面包含多个用于服务者动态配置URL元数据信息 复制代码
目录包含信息:
目录名称 | 储存值样例 |
---|---|
/dubbo/service/providers | dubbo://192.168.0.1.20880/com.alibaba.demo.Service?category=providers&key=value&... |
/dubbo/service/consumers | dubbo://192.168.0.1.5002/com.alibaba.demo.Service?category=consumers&key=value&... |
/dubbo/service/routers | dubbo://0.0.0.0/com.alibaba.demo.Service?category=routers&key=value&... |
/dubbo/service/configurators | dubbo://0.0.0.0/com.alibaba.demo.Service?category=configurators&key=value&... |
Redis也沿用了Dubbo抽象的Root、Service、Type、URL四层结构。采用Hash结构存储。
key | field | timeout |
---|---|---|
/dubbo/com.alibaba.demo.Service | URL | 10000 |
服务提供者和消费者都需要把自己注册到注册中心。服务提供者的注册是为了让消费者感知服务的存在,从而发起远程调用;也让服务治理中心感知有新的服务提供者上线。消费者的发布是为了让服务治理中心可以发现自己。
// ZookeeperRegistry public class ZookeeperRegistry extends FailbackRegistry { // 注册即调用ZooKeeper客户端在注册中心创建了一个目录 @Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } // 取消发布即调用ZooKeeper客户端在注册中心删除对应目录 @Override public void doUnregister(URL url) { try { zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } } 复制代码
事务操作 客户端任何新增、修改、删除、会话创建和失效操作,都会被认为是事务操作,会由ZooKeeper集群中的leader执行,即使客户端连接的是非leader节点,请求也会被转发给leader执行,以此来保证所有事务操作的全局时序性。由于每个节点都有一个版本号,因此可以通过CAS操作比较版本号来保证该节点数据操作的原子性。
@Override public void doSubscribe(final URL url, final NotifyListener listener) { try { // 判断是否为全量订阅 if (ANY_VALUE.equals(url.getServiceInterface())) { // 获取根路径 String root = toRootPath(); // listeners 为空说明缓存没有命中,这里把listeners 放入缓存 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); // zkListener 为空则证明是第一次,新建一个listener ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> { // 遍历所有子节点 for (String child : currentChilds) { child = URL.decode(child); // 如果存在子节点还未被订阅,说明是新的节点,则订阅 if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), k); } } }); // 创建持久节点,开始订阅持久节点下的直接子节点 zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (CollectionUtils.isNotEmpty(services)) { // 遍历所有子节点进行订阅 for (String service : services) { service = URL.decode(service); anyServices.add(service); // 增加当前节点的订阅,并返回该节点下所有子节点列表 subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { // 非全量订阅(普通消费者订阅场景) List<URL> urls = new ArrayList<>(); // 根据url获取订阅路径 for (String path : toCategoriesPath(url)) { // listeners 为空说明缓存没有命中,这里把listeners 放入缓存 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); // zkListener 为空则证明是第一次,新建一个listener ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); // 订阅,返回该节点下的子路径并缓存 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 回调NotifyListener,更新本地缓存 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } // 根据URL获取订阅类型 private String[] toCategoriesPath(URL url) { String[] categories; if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) { categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY}; } else { // 非全量订阅指定类别provides,DEFAULT_CATEGORY = PROVIDERS_CATEGORY categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY}); } String[] paths = new String[categories.length]; for (int i = 0; i < categories.length; i++) { paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i]; } return paths; } 复制代码
Redis订阅发布使用的是过期机制和publish/subscribe通道。服务提供者发布服务,首先会在Redis中创建一个key,然后在通道中发布一条register时间消息。但服务的key写入到Redis后,发布者需要周期性地刷新key的过期时间,在RedisRegistry构造方法中会启动一个expireExecutor定时调度线程池,不断调用deferExpired()方法延续key的超时时间。如果服务提供者服务宕机,没有续期,则key会因为超时而被Redis删除,服务也就被认定为下线。
// 构造方法启动定时调度线程池以(过期时间 / 2)的频率续签自己 this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> { try { deferExpired(); // Extend the expiration time } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t); } // 过期时间 / 2 }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS); // 续期 private void deferExpired() { for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { try (Jedis jedis = jedisPool.getResource()) { for (URL url : new HashSet<>(getRegistered())) { if (url.getParameter(DYNAMIC_KEY, true)) { String key = toCategoryPath(url); // 不断续签自己 if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { jedis.publish(key, REGISTER); } } } // 如果是服务治理中心,则只需clean操作 if (admin) { clean(jedis); } // 非replicate只需要写一个节点 if (!replicate) { break; } } } catch (Throwable t) { logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); } } } 复制代码
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { // 获取连接池 Pool<Jedis> jedisPool = entry.getValue(); try { try (Jedis jedis = jedisPool.getResource()) { // 设置key和过期时间 jedis.hset(key, value, expire); // 发布注册消息 jedis.publish(key, REGISTER); success = true; // 非replicate只需要写一个节点 if (!replicate) { break; } } } catch (Throwable t) { exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } 复制代码
如果是首次订阅,则会创建一个Notifier内部类,这是一个线程类,在启动时会异步进行通道的订阅。在启动Notifier线程的同时,主线程会继续往下执行,全量拉取一次注册中心上所有的服务信息。后续注册中心上的信息变更则通过Notifier线程订阅的通道推送时间来实现。
if (service.endsWith(ANY_VALUE)) { // 服务治理中心,订阅所有服务 if (first) { first = false; Set<String> keys = jedis.keys(service); if (CollectionUtils.isNotEmpty(keys)) { for (String s : keys) { // 首次触发通知设置本地缓存 doNotify(jedis, s); } } resetSkip(); } // 订阅服务 jedis.psubscribe(new NotifySub(jedisPool), service); } else { if (first) { first = false; // 首次触发通知设置本地缓存 doNotify(jedis, service); resetSkip(); } // 订阅服务 jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); } 复制代码
消费者或者服务治理中心获取注册信息后会做本地缓存。内存中会有一份,保存在Properties对象里,磁盘上也会持久化一份文件,通过file对象引用。
class AbstractRegistry // 本地磁盘缓存,其中特殊键value.registries记录注册表中心列表,其他是已通知服务提供者的列表 private final Properties properties = new Properties(); // 文件缓存异步定时写入 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); // 是否同步保存文件 private boolean syncSaveFile; // 本地缓存变更版本 private final AtomicLong lastCacheChanged = new AtomicLong(); // 内存中的服务缓存对象,与Redis存储方式相似,(key:url field:category value:服务列表) private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>(); // 本地磁盘缓存文件 private File file; 复制代码
在服务初始化的时候,AbstractRegistry构造函数会从本地磁盘文件中把持久化的注册数据督导Properties对象里,并加载到内存缓存中。Properties保存了所有服务提供者的URL,使用URL#serviceKey()作为key,提供者列表、路由规则列表、配置规则列表等作为value。由于value是列表,当存在多个的时候使用空格隔开。还有一个特殊的 key.registies ,保存所有的注册中心的地址, 如果应用在启动过程中,注册中心无法连接或者宕机,则Dubbo框架会自动通过本地缓存加载Invokers 。
// 缓存文件命名规则: String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache"; // 示例 dubbo-registry-demo-provider-127.0.0.1-2181.cache // 文件内容示例 #Dubbo Registry Cache #${time} com.example.dubbo.demo.service.EchoService=empty/://192.168.0.113/:20880/com.example.dubbo.demo.service.EchoService?anyhost/=true&application/=demo-provider&bind.ip/=192.168.0.113&bind.port/=20880&category/=configurators✓/=false&deprecated/=false&dubbo/=2.0.2&dynamic/=true&generic/=false&interface/=com.example.dubbo.demo.service.EchoService&metadata-type/=remote&methods/=hello&pid/=10192&release/=2.7.6&side/=provider×tamp/=${timestamp} 复制代码
缓存的保存分为同步/异步。异步会使用线程池异步保存(registryCacheExecutor)如果线程在执行过程中出现异常,则会再次调用线程池不断重试。 AbstractRegistry#notify中封装了内存缓存和更新文件缓存的逻辑,当客户端第一次订阅获取全量数据,或者后续由于订阅得到新数据时,都会调用该方法进行保存。
// 获取最后变更版本 long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { // 同步保存 doSaveProperties(version); } else { // 异步保存 registryCacheExecutor.execute(new SaveProperties(version)); } 复制代码
ZooKeeperRegistry和RedisRegistry均继承FailbackRegistry,FailbackRegistry继承AbstractRegistry。
FailbackRegistry在AbstractRegistry基础上增加了失败重试机制作为抽象能力,子类可以直接使用。 FailbackRegistry抽象类中定义了一个 ScheduledThreadPoolExecutor,每经过固定间隔(默认5s)调用FailbackRegistry#retry()方法,对失败集合进行重试,成功则移出队列。FailbackRegistry实现了subscribe,unsubscribe等通用方法,里面调用了未实现的模板方法,会由子类实现。通用方法会调用这些模板方法,如果捕获到异常,则会把URL添加到对应的重试集合中,以供定时器去重试。
// 发起注册失败的URL集合 private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>(); // 取消注册失败的URL集合 private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>(); // 发起订阅失败的URL集合 private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>(); // 取消订阅失败的URL集合 private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>(); // 通知失败的URL集合 private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>(); 复制代码