转载

SpringCloud源码阅读1-EurekaServer源码的秘密

Spring Cloud Netflix 作为springcloud 我们常用的一个项目,其子项目Eureka,zuul,Rebbion是我熟悉的。但是Spring Cloud Netflix 被宣布进入了维护模式, 意思不再添加新特性了,这对于我们来说很不友好了。 大家纷纷寻找相应的替代工具。(具体可以网上搜索)

但这不影响我们学习一些组件的框架思想。我对注册发现,负载均衡这块比较感兴趣。所以在此记录下自己的阅读心得。

版本说明:Finchley.SR1

1.组件的配置:

1.1 启用Eureka注册中心

当我们在springboot的启动类上加上 @EnableEurekaServer ,一个基本的注册中心就可以生效了。

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
}
}
复制代码

@EnableEurekaServer 仅仅是引入 EurekaServerMarkerConfiguration 类。 Marker的英文意思是标记的意思,spring相关框架中有很多类似 xxxMarkerxxx 这样的注解.其实他们的意思就是一个 开关 。会在其他地方进行开关的判断,有对应 xxxMarkerxxx 类就表示打开,没有表示关闭。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
复制代码

EurekaServerMarkerConfiguration 开关打开的是哪个类呢??

org.springframework.cloud.netflix.eureka.server 项目spring.factories资源文件中自动注入类 EurekaServerAutoConfiguration ,此类在自动注入的过程中,会判断开关是否打开来决定是否自动注入相关类

org.springframework.boot.autoconfigure.EnableAutoConfiguration=/
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
复制代码
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
	.....
}
复制代码

由此看出 EurekaServerMarkerConfiguration 开关打开的 EurekaServerAutoConfiguration

1.2 组件的配置。

下面我们看看 EurekaServerAutoConfiguration 配置了什么东西。 (1.先看注解上相关配置

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
	...
}
复制代码
  • 引入 EurekaServerInitializerConfiguration 类,此类继承了 SmartLifecycle 接口,所以会在spring启动完毕时回调此类的start()方法
  • EurekaDashboardProperties 表示Euerka面板相关配置属性。例如:是否打开面板;面板的访问路径
  • InstanceRegistryProperties 表示实例注册相关配置属性。例如:每分钟最大的续约数量,默认打开的通信数量 等
  • 加载 /eureka/server.properties 的配置属性。

(2.再看类内部相关配置(代码比较长,这里只讲内容,建议打开源码看) 寻找类中的Bean

  • HasFeatures 注册HasFeatures表示Eureka特征,
  • EurekaServerConfigBean配置类,表示EurekaServer的配置信息。通过 @ConfigurationProperties(“eureka.server”) 映射我们的配置文件中的 eureka.server.xxxx 格式的配置信息(此类很重要啊,我们想修改EurekaServer的配置信息,可以配置 eureka.server.xxxx 覆盖此类中的默认配置)
  • EurekaController: 面板的访问配置默认是“/”
  • 注册编码器 (ServerCodecs)CloudServerCodecs
  • PeerAwareInstanceRegistry:对等节点同步器。 多个节点下复制相关。 与注册中心高可用有关的组件。此处注册的是 InstanceRegistry ( 注意PeerAwareInstanceRegistry实现了AbstractInstanceRegistry,这里准确的说是 对等节点+当前节点同步器
  • PeerEurekaNodes: Eureka-Server 集群节点的集合。存储了集群下各个节点信息。也是与高可用有关。
  • EurekaServerContext : 上下文。默认注册的 DefaultEurekaServerContext
  • EurekaServerBootstrap: EurekaServer启动器。EurekaServerBootstrap
  • FilterRegistrationBean: 注册 Jersey filter过滤器。这里有必要讲一下。Eureka也是servlet应用。不过他是通过Jersey 框架来提供接口的。Jersey 框架是一个类Springmvc的web框架。我们项目中大多都是使用springmvc来处理。所以注册 Jersey filter过滤器,把 /eureka 开头的请求都交给Jersey 框架去解析。容器是 com.sun.jersey.spi.container.servlet.ServletContainer
  • ApplicationResource: 暴漏 com.netflix.discovery","com.netflix.eureka" 包路径下的接口。通常我们再springmvc中通过Controller概念来表示接口,Jersey框架下用ApplicationResource的概念来表示接口。暴露的接口其实就是eureka各个应用通信的接口。(下面再说这些接口)

EurekaServerAutoConfiguration 基本上就做了这些工作。我们来归类总结下

针对当前Eureka实例的相关组件:

  • EurekaDashboardProperties:面板属性
  • EurekaController: 面板的访问的处理器。
  • InstanceRegistryProperties:实例注册相关属性
  • (EurekaServerConfig)EurekaServerConfigBean:当前ErekekaServer相关配置
  • EurekaServerContext : 当前Eureka 注册中心上下文
  • 请求相关组件:注册 /eureka 路径的相关接口,注册拦截 /eureka 的拦截器,注册 com.sun.jersey.spi.container.servlet.ServletContainer 容器来处理对应的请求

两个针对集群下相关组件:

  • PeerAwareInstanceRegistry:用于集群下的节点相关复制信息用
  • PeerEurekaNodes:集群下的所有节点信息

两个针对启动相关类:

  • EurekaServerInitializerConfiguration: 对接spring,再spring启动完成后,调用
  • EurekaServerBootstrap:启动器,用于启动当前Eureak实例的上下文

至此:我们也可以大致了解了一个EurekaServer大致长什么样子了。

2.EurekaServerContext初始化:

EurekaServerContext作为上下文,应该是核心所在。上文讲过注册 DefaultEurekaServerContext 。此类中有 @Inject,@PostConstruct, @PreDestroy 注解的方法,重点来看看。

@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
                               ServerCodecs serverCodecs,
                               PeerAwareInstanceRegistry registry,
                               PeerEurekaNodes peerEurekaNodes,
                               ApplicationInfoManager applicationInfoManager) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.registry = registry;
        this.peerEurekaNodes = peerEurekaNodes;
        this.applicationInfoManager = applicationInfoManager;
}
复制代码

2.1 @Inject注解的构造方法

@Inject 注解的方法,参数由IOC容器注入。 serverConfig ,serverCodecs ,registry ,peerEurekaNodes 我们已经认识了。ApplicationInfoManager 是用来管理应用信息的,也就是实例注册信息,由ApplicationInfoManager统一管理。

2.2 @PostConstruct注解的initialize()方法

@PostConstruct 修饰的方法会在服务器加载Servle的时候运行,并且只会被服务器执行一次,被 @PostConstruct 修饰的方法会在构造函数之后,init()方法之前运行.

@PostConstruct
@Override
public void initialize() {
        logger.info("Initializing ...");
        peerEurekaNodes.start();
        try {
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
}
复制代码

这个方法很简明,主要有两个重要的的点:

  • peerEurekaNodes.start();
  • registry.init(peerEurekaNodes);

2.2.1 peerEurekaNodes.start()

PeerEurekaNodes: 用于管理PeerEurekaNode节点集合。 peerEurekaNodes.start();

public void start() {
		//创建一个单线程定时任务线程池:线程的名称叫做Eureka-PeerNodesUpdater
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
        	// 解析Eureka Server URL,并更新PeerEurekaNodes列表
            updatePeerEurekaNodes(resolvePeerUrls());
            //创建任务
            //任务内容为:解析Eureka Server URL,并更新PeerEurekaNodes列表
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            //交给线程池执行,执行间隔10min
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
}
复制代码

resolvePeerUrls():

解析配置的对等体URL。就是在配置文件中配置的多个Eureka注册中心的URL.

updatePeerEurekaNodes:

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        //计算需要移除的url= 原来-新配置。
        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
        //计算需要增加的url= 新配置-原来的。
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);
        //没有变化就不更新
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }
       
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
         // 删除需要移除url对应的节点。
        if (!toShutdown.isEmpty()) {
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }
        // 添加需要增加的url对应的节点
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
        //更新节点列表
        this.peerEurekaNodes = newNodeList;
        //更新节点url列表
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }
复制代码

总结:start()方法,其实就是完成新配置的eureka集群信息的初始化更新工作。

2.2.2 registry.init(peerEurekaNodes)

对等节点同步器的初始化。

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        //统计最近X秒内的来自对等节点复制的续约数量(默认1秒)
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        //初始化返回结果缓存
        initializedResponseCache();
        //更新续约阀值
        scheduleRenewalThresholdUpdateTask();
        //初始化远程区域注册 相关信息
        initRemoteRegionRegistry();
        ...
}
复制代码

numberOfReplicationsLastMin.start():

启动一个定时任务,任务名称为 Eureka-MeasureRateTimer ,每1秒统计从对等节点复制的续约数,将当前的桶的统计数据放到lastBucket,当前桶置为0

this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
--
this.timer = new Timer("Eureka-MeasureRateTimer", true);
---
timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    try {
                        // Zero out the current bucket.
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
}, sampleInterval, sampleInterval);
复制代码

注意:此统计器用于节点之间复制的统计。

initializedResponseCache():

精辟,缓存来实现返回结果的缓存,优秀设计啊。

使用goole cache初始化一个缓存类 ResponseCacheImpl ,缓存( all applications, delta changes and for individual applications )请求的结果, 此类中有两个缓存:

  • readWriteCacheMap : 读写缓存。初始化容量1000,失效时间3分钟。
  • readOnlyCacheMap :只读缓存, shouldUseReadOnlyResponseCache 属性控制是否启用,默认是启用的。此缓存会使用,名为 Eureka-CacheFillTimer 的timer,每30s更新从 readWriteCacheMap 中更新 readOnlyCacheMap 中的缓存值。

取值逻辑: 先从 readOnlyCacheMap 取值,没有去 readWriteCacheMap ,没有去通过CacheLoader加载,而CacheLoader会到维护应用实例注册信息的Map中获取。

这里就产生了一个疑问,为啥有搞个二级缓存来缓存结果呢?不是很理解。

scheduleRenewalThresholdUpdateTask()

使用名为 ReplicaAwareInstanceRegistry - RenewalThresholdUpdater 的timer,每15(900s)分钟执行 updateRenewalThreshold() 任务,更新续约阀值。

private void updateRenewalThreshold() {
        try {
            Applications apps = eurekaClient.getApplications();
            int count = 0;
            //统计所有注册instance个数
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }
            synchronized (lock) {
                当总数》预期值时 或者 关闭了自我保护模式,更新
                if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
                        || (!this.isSelfPreservationModeEnabled())) {
                    this.expectedNumberOfRenewsPerMin = count * 2;
                    this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwable e) {
            logger.error("Cannot update renewal threshold", e);
        }
}
复制代码
  • expectedNumberOfRenewsPerMin 每分钟最大的续约数量 (30s/次,2次/s): =客户端数量count*2
  • numberOfRenewsPerMinThreshold 每分钟续约阈值。 serverConfig.getRenewalPercentThreshold()*expectedNumberOfRenewsPerMin serverConfig.getRenewalPercentThreshold()默认是0.85

当每分钟续约数小于 numberOfRenewsPerMinThreshold 阈值时,并且自我保护没有关闭的情况下,开启自我保护,此期间不剔除任何一个客户端。(下面的 EvictionTask() 驱逐任务会讲到如何利用)

  • InstanceRegistry初始化
  • 客户端cancle主动下线
  • 客户端注册
  • scheduleRenewalThresholdUpdateTask

此四个地方都会更新两个值

initRemoteRegionRegistry()

初始化 远程区域注册 相关信息

2.3 @PreDestroy注解的initialize方法

@PreDestroy 修饰的方法会在服务器加载Servle的时候运行,并且只会被服务器执行一次,被 @PreDestroy 修饰的方法会 Destroy 方法之后执行。.

public void shutdown() {
        registry.shutdown();
        peerEurekaNodes.shutdown();
}
复制代码

registry.shutdown();

停掉init()时启动的定时任务

peerEurekaNodes.shutdown()

清空集群url缓存,集群节点缓存。

2.4 小结

总结: EurekaServerContext 的初始化做了很多事情,很精辟,建议多阅读,多学习

3.EurekaServer启动:

EurekaServerInitializerConfiguration 实现了 SmartLifecycle 接口,在spring启动后,执行start()方法

eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//发布注册中心可以注册事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
//状态为运行状态
EurekaServerInitializerConfiguration.this.running = true;
//发布注册中心启动完成事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
复制代码

这里重点看先 EurekaServerBootstrap.contextInitialized EurekaServerBootstrap 的contextInitialized主要干了两件事

initEurekaEnvironment();初始化环境
initEurekaServerContext();初始化上下文
复制代码

3.1 initEurekaEnvironment

主要是数据中心等环境变量的初始化

3.2 initEurekaServerContext

此方法中最重要的是

从相邻eureka节点拷贝注册列表信息
int registryCount = this.registry.syncUp();
允许开始与客户端的数据传输,即开始作为Server服务
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
复制代码

3.1.1 registry.syncUp()

@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
		//重试次数
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            //从eurekaClient获取服务列表
            Applications apps = eurekaClient.getApplications();
            //遍历注册
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
复制代码

3.1.2 registry.openForTraffic

允许开始与客户端的数据传输,即开始作为Server服务

InstanceRegistry.openForTraffic
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
		super.openForTraffic(applicationInfoManager,
				count == 0 ? this.defaultOpenForTrafficCount : count);
}
PeerAwareInstanceRegistryImpl.openForTraffic
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
       //每分钟期待的续约数(默认30s续约,60s就是2次)
        this.expectedNumberOfRenewsPerMin = count * 2;
        // 每分钟续约的阀值:0.85 * expectedNumberOfRenewsPerMin
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
      	....
        logger.info("Changing status to UP");
        //applicationInfoManager设置状态为UP
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
}
复制代码

3.1.3 EvictionTask() 驱逐任务

protected void postInit() {
		//又启动了一个续约数统计器,此统计器用于配合驱逐任务
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
}
复制代码

创建一个名为 Eureka-EvictionTimer 的定时器来执行EvictionTask()任务。 EvictionTask()任务:

@Override
public void run() {
          	// 获取延迟秒数,就是延迟几秒下线时间。
            long compensationTimeMs = getCompensationTimeMs();
            //驱逐操作
            evict(compensationTimeMs);
}
复制代码

evict()驱逐操作:清理过期租约

public void evict(long additionalLeaseMs) {
        // 判断是否开启自我保护,自我保护期间不剔除任何任务
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        //循环获得 所有过期的租约
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                     // 判断是否过期
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
		// 计算 最大允许清理租约数量
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
		// 计算 清理租约数量
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            Random random = new Random(System.currentTimeMillis());
            // 遍历清理。
            for (int i = 0; i < toEvict; i++) { 
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                internalCancel(appName, id, false);
            }
        }
    }
复制代码

isLeaseExpirationEnabled():判断是否开启自我保护的两个条件

  1. 自我保护配置处于开启状态
  2. 当前单位续约数(renewsLastMin统计器统计的数据)<阈值

Lease.isExpire():是否过期的判断:

public boolean isExpired(long additionalLeaseMs) {
        return (
        //或者明确实例下线时间。
        evictionTimestamp > 0 
        //或者距离最后更新时间已经过去至少3分钟
        || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }
复制代码
  • evictionTimestamp : 实例下线时间,当客户端下线时,会更新这个时间
  • duration : 过期间隔,默认为90秒
  • lastUpdateTimestamp : 为最后更新时间
//续约时更新lastUpdateTimestamp,加上了过期间隔?
public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
复制代码

过期时间判断: System.currentTimeMillis()> lastUpdateTimestamp + duration + additionalLeaseMs 这里加了两次duration, 也就是180秒,加上延迟下线时间。也就是最少需要3分钟才判断下线。

3.3 小结

至此Eureka server的初始化就完成了。 这里通过debug模式来看看初始化过程中的定时任务。

SpringCloud源码阅读1-EurekaServer源码的秘密

4.API接口

Eureka Server 启动后,就是对外提供服务了。等待客户端来注册。

Eureka是一个基于REST(Representational State Transfer)服务,我们从官方文档中可以看到其对外提供的接口: 官方文档

SpringCloud源码阅读1-EurekaServer源码的秘密

可以推测,客户端注册时也是调用了这些接口来进行与服务端的通信的。

上文说过,Eureka 使用jersey框架来做MVC框架,暴露接口。 ApplicationResource 类似springmvc中的Controller。

com.netflix.eureka.resources 包下我们可以看到这些 ApplicationResource

SpringCloud源码阅读1-EurekaServer源码的秘密

4.1注册接口

ApplicationResource.addInstance 对应的就是服务注册接口

@POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
		....
		//使用PeerAwareInstanceRegistryImpl#register() 注册实例信息。
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
复制代码
InstanceRegistry
@Override
	public void register(final InstanceInfo info, final boolean isReplication) {
		//发布注册事件,
		handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
		super.register(info, isReplication);
}
PeerAwareInstanceRegistryImpl
@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        //租期90s
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //注册实例
        super.register(info, leaseDuration, isReplication);
        //复制到其他节点。
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
复制代码

4.1.1注册到当前Eureka

AbstractInstanceRegistry
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
	read.lock()读锁
	1.从缓存中获取实例名称对应的租约信息
	Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
    2.统计数+1
    REGISTER.increment(isReplication); 
    //gmap为null.则创建一个Map。
	
	3.租约的处理分两种情况:
	租约已经存在:
			比较新租约与旧租约的LastDirtyTimestamp,使用LastDirtyTimestamp最晚的租约
	租约不存在,即新注册:        
			synchronized (lock) {
				更新期待每分钟续约数
				更新续约阈值
			}
	将租约放入appname对应的map中。
	
	4.在最近注册队(recentRegisteredQueue)里添加一个当前注册信息
	5.状态的处理:
		将当前实例的OverriddenStatus状态,放到Eureka Server的overriddenInstanceStatusMap;
		根据OverriddenStatus状态,设置状态
	7.实例actionType=ADDED
	registrant.setActionType(ActionType.ADDED);
    8. 维护recentlyChangedQueue,保存最近操作
    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
    9.更新最后更新时间
    registrant.setLastUpdatedTimestamp();
    10.使当前实例的结果缓存ResponseCache失效()
    invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
	
}
复制代码

4.1.2复制到其他节点

此处可以看源码阅读,在此不讲了

4.2查询接口

我们获取的实例信息,其实都是从缓存中获取的 String payLoad = responseCache.get(cacheKey);

@GET
    public Response getApplication(@PathParam("version") String version,
                                   @HeaderParam("Accept") final String acceptHeader,
                                   @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
        if (!registry.shouldAllowAccess(false)) {
            return Response.status(Status.FORBIDDEN).build();
        }

        EurekaMonitors.GET_APPLICATION.increment();

        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        if (acceptHeader == null || !acceptHeader.contains("json")) {
            keyType = Key.KeyType.XML;
        }

        Key cacheKey = new Key(
                Key.EntityType.Application,
                appName,
                keyType,
                CurrentRequestVersion.get(),
                EurekaAccept.fromString(eurekaAccept)
        );

        String payLoad = responseCache.get(cacheKey);

        if (payLoad != null) {
            logger.debug("Found: {}", appName);
            return Response.ok(payLoad).build();
        } else {
            logger.debug("Not Found: {}", appName);
            return Response.status(Status.NOT_FOUND).build();
        }
    }
复制代码
原文  https://juejin.im/post/5dd367ce5188254eed5b279f
正文到此结束
Loading...