转载

SpringCloud Eureka 源码解析 —— 应用实例注册发现(一)之注册

摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-register/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本

  • 1. 概述
  • 2. Eureka-Client 发起注册
    • 2.1 应用实例信息复制器
    • 2.2 刷新应用实例信息
    • 2.3 发起注册应用实例
  • 3. Eureka-Server 接收注册
    • 3.1 接收注册请求
    • 3.2 Lease
    • 3.3 注册应用实例信息
  • 666. 彩蛋

SpringCloud Eureka 源码解析 —— 应用实例注册发现(一)之注册

������关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 注册应用实例的过程

FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑

SpringCloud Eureka 源码解析 —— 应用实例注册发现(一)之注册
  • 蓝框 部分,为本文重点。
  • 蓝框 部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴。

推荐 Spring Cloud 书籍:

  • 请支持正版。下载盗版, 等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

2. Eureka-Client 发起注册

Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件:

  • 配置 eureka.registration.enabled = true ,Eureka-Client 向 Eureka-Server 发起注册应用实例的 开关
  • InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致。

每次 InstanceInfo 发生 属性变化 时,标记 isInstanceInfoDirty 属性为 true ,表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致,需要注册。另外,InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册。

当符合条件时,InstanceInfo 不会立即向 Eureka-Server 注册,而是后台线程 定时 注册。

当 InstanceInfo 的状态( status ) 属性发生变化时,并且配置 eureka.shouldOnDemandUpdateStatusChange = true 时,立即向 Eureka-Server 注册。 因为状态属性非常重要,一般情况下建议开启,当然默认情况也是开启的

Let's Go。让我们看看代码的实现。

2.1 应用实例信息复制器

// DiscoveryClient.java
public class DiscoveryClient implements EurekaClient{

 /**
* 应用实例状态变更监听器
*/
 private ApplicationInfoManager.StatusChangeListener statusChangeListener;
 /**
* 应用实例信息复制器
*/
 private InstanceInfoReplicator instanceInfoReplicator;

 private void initScheduledTasks(){
 // ... 省略无关代码
 
 if (clientConfig.shouldRegisterWithEureka()) {
 
 // ... 省略无关代码
 
 // 创建 应用实例信息复制器
 // InstanceInfo replicator
 instanceInfoReplicator = new InstanceInfoReplicator(
 this,
 instanceInfo,
 clientConfig.getInstanceInfoReplicationIntervalSeconds(),
 2); // burstSize

 // 创建 应用实例状态变更监听器
 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
 @Override
 public String getId(){
 return "statusChangeListener";
 }

 @Override
 public void notify(StatusChangeEvent statusChangeEvent){
 if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
 InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
 // log at warn level if DOWN was involved
 logger.warn("Saw local status change event {}", statusChangeEvent);
 } else {
 logger.info("Saw local status change event {}", statusChangeEvent);
 }
 instanceInfoReplicator.onDemandUpdate();
 }
 };

 // 注册 应用实例状态变更监听器
 if (clientConfig.shouldOnDemandUpdateStatusChange()) {
 applicationInfoManager.registerStatusChangeListener(statusChangeListener);
 }

 // 开启 应用实例信息复制器
 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
 }
 
 }

}
  • com.netflix.discovery.InstanceInfoReplicator ,应用实例信息复制器。

    • 调用 InstanceInfoReplicator#start(...) 方法, 开启 应用实例信息复制器。实现代码如下:

      // InstanceInfoReplicator.java
      class InstanceInfoReplicator implements Runnable{
      
       private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class);
      
       private final DiscoveryClient discoveryClient;
       /**
      * 应用实例信息
      */
       private final InstanceInfo instanceInfo;
       /**
      * 定时执行频率,单位:秒
      */
       private final int replicationIntervalSeconds;
       /**
      * 定时执行器
      */
       private final ScheduledExecutorService scheduler;
       /**
      * 定时执行任务的 Future
      */
       private final AtomicReference<Future> scheduledPeriodicRef;
       /**
      * 是否开启调度
      */
       private final AtomicBoolean started;
      
       private final RateLimiter rateLimiter; // 限流相关,跳过
       private final int burstSize; // 限流相关,跳过
       private final int allowedRatePerMinute; // 限流相关,跳过
      
       InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
       this.discoveryClient = discoveryClient;
       this.instanceInfo = instanceInfo;
       this.scheduler = Executors.newScheduledThreadPool(1,
       new ThreadFactoryBuilder()
       .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
       .setDaemon(true)
       .build());
      
       this.scheduledPeriodicRef = new AtomicReference<Future>();
      
       this.started = new AtomicBoolean(false);
       this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
       this.replicationIntervalSeconds = replicationIntervalSeconds;
       this.burstSize = burstSize;
      
       this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
       logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
       }
      
       public void start(int initialDelayMs){
       if (started.compareAndSet(false, true)) {
       // 设置 应用实例信息 数据不一致
       instanceInfo.setIsDirty(); // for initial register
       // 提交任务,并设置该任务的 Future
       Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
       scheduledPeriodicRef.set(next);
       }
       }
       
       // ... 省略无关方法
      }
      
      // InstanceInfo.java
      private volatile boolean isInstanceInfoDirty = false;
      private volatile Long lastDirtyTimestamp;
      
      public synchronized void setIsDirty(){
       isInstanceInfoDirty = true;
       lastDirtyTimestamp = System.currentTimeMillis();
      }
      
      • 执行 instanceInfo.setIsDirty() 代码块,因为 InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册
      • 调用 ScheduledExecutorService#schedule(...) 方法,延迟 initialDelayMs 毫秒执行 一次 任务。为什么此处设置 scheduledPeriodicRef ?在 InstanceInfoReplicator#onDemandUpdate() 方法会看到具体用途。
    • 定时检查 InstanceInfo 的状态( status ) 属性是否发生变化。 若是 ,发起注册。实现代码如下:

      // InstanceInfoReplicator.java
      @Override
      public void run(){
       try {
       // 刷新 应用实例信息
       discoveryClient.refreshInstanceInfo();
       // 判断 应用实例信息 是否数据不一致
       Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
       if (dirtyTimestamp != null) {
       // 发起注册
       discoveryClient.register();
       // 设置 应用实例信息 数据一致
       instanceInfo.unsetIsDirty(dirtyTimestamp);
       }
       } catch (Throwable t) {
       logger.warn("There was a problem with the instance info replicator", t);
       } finally {
       // 提交任务,并设置该任务的 Future
       Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
       scheduledPeriodicRef.set(next);
       }
      }
      
      // InstanceInfo.java
      public synchronized long setIsDirtyWithTime(){
       setIsDirty();
       return lastDirtyTimestamp;
      }
      
      public synchronized void unsetIsDirty(long unsetDirtyTimestamp){
       if (lastDirtyTimestamp <= unsetDirtyTimestamp) {
       isInstanceInfoDirty = false;
       } else {
       }
      }
      
      • 调用 DiscoveryClient#refreshInstanceInfo() 方法,刷新应用实例信息。 此处可能导致应用实例信息数据不一致 ,在「2.2」刷新应用实例信息详细解析。
      • 调用 DiscoveryClient#register() 方法, Eureka-Client 向 Eureka-Server 注册应用实例
      • 调用 ScheduledExecutorService#schedule(...) 方法,再次延迟执行任务,并设置 scheduledPeriodicRef 。通过这样的方式,不断 循环 定时执行任务。
  • com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener 内部类 ,监听应用实例信息状态的变更。

    • 调用 ApplicationInfoManager#registerStatusChangeListener(...) 方法,注册应用实例状态变更监听器。实现代码如下:

      public class ApplicationInfoManager{
      
       /**
      * 状态变更监听器
      */
       protected final Map<String, StatusChangeListener> listeners;
       
       public void registerStatusChangeListener(StatusChangeListener listener){
       listeners.put(listener.getId(), listener);
       }
      }
      
    • 业务里,调用 ApplicationInfoManager#setInstanceStatus(...) 方法,设置应用实例信息的状态,从而 通知 InstanceInfoReplicator#onDemandUpdate() 方法的调用。实现代码如下:

      // ApplicationInfoManager.java
      public synchronized void setInstanceStatus(InstanceStatus status){
       InstanceStatus next = instanceStatusMapper.map(status);
       if (next == null) {
       return;
       }
       InstanceStatus prev = instanceInfo.setStatus(next);
       if (prev != null) {
       for (StatusChangeListener listener : listeners.values()) {
       try {
       listener.notify(new StatusChangeEvent(prev, next));
       } catch (Exception e) {
       logger.warn("failed to notify listener: {}", listener.getId(), e);
       }
       }
       }
      }
      
      // InstanceInfo.java
      public synchronized InstanceStatus setStatus(InstanceStatus status){
       if (this.status != status) {
       InstanceStatus prev = this.status;
       this.status = status;
       // 设置 应用实例信息 数据一致
       setIsDirty();
       return prev;
       }
       return null;
      }
      
    • InstanceInfoReplicator#onDemandUpdate() ,实现代码如下:

      // InstanceInfoReplicator.java
      public boolean onDemandUpdate(){
       if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过
       scheduler.submit(new Runnable() {
       @Override
       public void run(){
       logger.debug("Executing on-demand update of local InstanceInfo");
       // 取消任务
       Future latestPeriodic = scheduledPeriodicRef.get();
       if (latestPeriodic != null && !latestPeriodic.isDone()) {
       logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
       latestPeriodic.cancel(false);
       }
       // 再次调用
       InstanceInfoReplicator.this.run();
       }
       });
       return true;
       } else {
       logger.warn("Ignoring onDemand update due to rate limiter");
       return false;
       }
      }
      
      • 调用 Future#cancel(false) 方法,取消定时任务, 避免无用的注册
      • 调用 InstanceInfoReplicator#run() 方法,发起注册。

2.2 刷新应用实例信息

调用 DiscoveryClient#refreshInstanceInfo() 方法,刷新应用实例信息。 此处可能导致应用实例信息数据不一致 ,实现代码如下:

void refreshInstanceInfo(){
 // 刷新 数据中心信息
 applicationInfoManager.refreshDataCenterInfoIfRequired();
 // 刷新 租约信息
 applicationInfoManager.refreshLeaseInfoIfRequired();
 // 健康检查
 InstanceStatus status;
 try {
 status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
 } catch (Exception e) {
 logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
 status = InstanceStatus.DOWN;
 }
 if (null != status) {
 applicationInfoManager.setInstanceStatus(status);
 }
}
  • 调用 ApplicationInfoManager#refreshDataCenterInfoIfRequired() 方法,刷新数据中心相关信息,实现代码如下:

    // ApplicationInfoManager.java
    public void refreshDataCenterInfoIfRequired(){
     // hostname
     String existingAddress = instanceInfo.getHostName();
     String newAddress;
     if (config instanceof RefreshableInstanceConfig) {
     // Refresh data center info, and return up to date address
     newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
     } else {
     newAddress = config.getHostName(true);
     }
     // ip
     String newIp = config.getIpAddress();
     if (newAddress != null && !newAddress.equals(existingAddress)) {
     logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
     // :( in the legacy code here the builder is acting as a mutator.
     // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
     // We will most likely re-write the client at sometime so not fixing for now.
     InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
     builder.setHostName(newAddress) // hostname
     .setIPAddr(newIp) // ip
     .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo
     instanceInfo.setIsDirty();
     }
    }
    
    public abstract class AbstractInstanceConfig implements EurekaInstanceConfig{
    
     private static final Pair<String, String> hostInfo = getHostInfo();
     
     @Override
     public String getHostName(boolean refresh){
     return hostInfo.second();
     }
     
     @Override
     public String getIpAddress(){
     return hostInfo.first();
     }
    
     private static Pair<String, String> getHostInfo(){
     Pair<String, String> pair;
     try {
     InetAddress localHost = InetAddress.getLocalHost();
     pair = new Pair<String, String>(localHost.getHostAddress(), localHost.getHostName());
     } catch (UnknownHostException e) {
     logger.error("Cannot get host info", e);
     pair = new Pair<String, String>("", "");
     }
     return pair;
     }
     
    }
    
    • 关注应用实例信息的 hostNameipAddrdataCenterInfo 属性的变化。
    • 一般情况下,我们使用的是非 RefreshableInstanceConfig 实现的配置类( 一般是 MyDataCenterInstanceConfig ),因为 AbstractInstanceConfig.hostInfo静态属性即使本机修改了 IP 等信息,Eureka-Client 进程也不会感知到 。TODO[0022]:看下springcloud 的实现
  • 调用 ApplicationInfoManager#refreshLeaseInfoIfRequired() 方法,刷新租约相关信息,实现代码如下:

    public void refreshLeaseInfoIfRequired(){
     LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
     if (leaseInfo == null) {
     return;
     }
     int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
     int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
     if (leaseInfo.getDurationInSecs() != currentLeaseDuration // 租约过期时间 改变
     || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { // 租约续约频率 改变
     LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
     .setRenewalIntervalInSecs(currentLeaseRenewal)
     .setDurationInSecs(currentLeaseDuration)
     .build();
     instanceInfo.setLeaseInfo(newLeaseInfo);
     instanceInfo.setIsDirty();
     }
    }
    
    • 关注应用实例信息的 renewalIntervalInSecsdurationInSecs 属性的变化。
  • 调用 HealthCheckHandler#getStatus() 方法,健康检查。这里先暂时跳过,我们在详细解析。

2.3 发起注册应用实例

调用 DiscoveryClient#register() 方法, Eureka-Client 向 Eureka-Server 注册应用实例 ,实现代码如下:

// DiscoveryClient.java
boolean register() throws Throwable{
 logger.info(PREFIX + appPathIdentifier + ": registering service...");
 EurekaHttpResponse<Void> httpResponse;
 try {
 httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
 } catch (Exception e) {
 logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
 throw e;
 }
 if (logger.isInfoEnabled()) {
 logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
 }
 return httpResponse.getStatusCode() == 204;
}

// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info){
 String urlPath = "apps/" + info.getAppName();
 ClientResponse response = null;
 try {
 Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
 addExtraHeaders(resourceBuilder);
 response = resourceBuilder
 .header("Accept-Encoding", "gzip")
 .type(MediaType.APPLICATION_JSON_TYPE)
 .accept(MediaType.APPLICATION_JSON)
 .post(ClientResponse.class, info);
 return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
 } finally {
 if (logger.isDebugEnabled()) {
 logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
 response == null ? "N/A" : response.getStatus());
 }
 if (response != null) {
 response.close();
 }
 }
}
  • 调用 AbstractJerseyEurekaHttpClient#register(...) 方法, POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口,参数为 InstanceInfo ,实现注册实例信息的注册。

3. Eureka-Server 接收注册

3.1 接收注册请求

com.netflix.eureka.resources.ApplicationResource ,处理 单个 应用的请求操作的 Resource ( Controller )。

注册应用实例信息的请求,映射 ApplicationResource#addInstance() 方法,实现代码如下:

@Produces({"application/xml", "application/json"})
public class ApplicationResource{

 @POST
 @Consumes({"application/json", "application/xml"})
 public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication){
 // 校验参数是否合法
 logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
 // validate that the instanceinfo contains all the necessary required fields
 if (isBlank(info.getId())) {
 return Response.status(400).entity("Missing instanceId").build();
 } else if (isBlank(info.getHostName())) {
 return Response.status(400).entity("Missing hostname").build();
 } else if (isBlank(info.getIPAddr())) {
 return Response.status(400).entity("Missing ip address").build();
 } else if (isBlank(info.getAppName())) {
 return Response.status(400).entity("Missing appName").build();
 } else if (!appName.equals(info.getAppName())) {
 return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
 } else if (info.getDataCenterInfo() == null) {
 return Response.status(400).entity("Missing dataCenterInfo").build();
 } else if (info.getDataCenterInfo().getName() == null) {
 return Response.status(400).entity("Missing dataCenterInfo Name").build();
 }

 // AWS 相关,跳过
 // handle cases where clients may be registering with bad DataCenterInfo with missing data
 DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
 if (dataCenterInfo instanceof UniqueIdentifier) {
 String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
 if (isBlank(dataCenterInfoId)) {
 boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
 if (experimental) {
 String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
 return Response.status(400).entity(entity).build();
 } else if (dataCenterInfo instanceof AmazonInfo) {
 AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
 String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
 if (effectiveId == null) {
 amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
 }
 } else {
 logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
 }
 }
 }

 // 注册应用实例信息
 registry.register(info, "true".equals(isReplication));

 // 返回 204 成功
 return Response.status(204).build(); // 204 to be backwards compatible
 }

}
  • 请求头 isReplication 参数,和 Eureka-Server 集群复制相关,暂时跳过。

  • 调用 PeerAwareInstanceRegistryImpl#register(...) 方法,注册应用实例信息。实现代码如下:

    @Override
    public void register(final InstanceInfo info, final boolean isReplication){
     // 租约过期时间
     int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
     if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
     leaseDuration = info.getLeaseInfo().getDurationInSecs();
     }
     // 注册应用实例信息
     super.register(info, leaseDuration, isReplication);
     // Eureka-Server 复制
     replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    
    • 调用父类 AbstractInstanceRegistry#register(...) 方法,注册应用实例信息。

3.2 Lease

在看具体的注册应用实例信息的逻辑之前,我们先来看下 com.netflix.eureka.lease.Lease ,租约。实现代码如下:

public class Lease<T>{

 /**
* 实体
*/
 private T holder;
 /**
* 注册时间戳
*/
 private long registrationTimestamp;
 /**
* 开始服务时间戳
*/
 private long serviceUpTimestamp;
 /**
* 取消注册时间戳
*/
 private long evictionTimestamp;
 /**
* 最后更新时间戳
*/
 // Make it volatile so that the expiration task would see this quicker
 private volatile long lastUpdateTimestamp;
 /**
* 租约持续时长,单位:毫秒
*/
 private long duration;

 public Lease(T r, int durationInSecs){
 holder = r;
 registrationTimestamp = System.currentTimeMillis();
 lastUpdateTimestamp = registrationTimestamp;
 duration = (durationInSecs * 1000);
 }
 
}
  • holder 属性,租约的持有者。在 Eureka-Server 里,暂时只有 InstanceInfo 使用。

  • registrationTimestamp 属性,注册( 创建 )租约时间戳。在 构造方法 里可以看租约对象的创建时间戳即为注册租约时间戳。

  • serviceUpTimestamp 属性,开始服务时间戳。注册应用实例信息会使用到它如下两个方法,实现代码如下:

    public void serviceUp(){
     if (serviceUpTimestamp == 0) { // 第一次有效
     serviceUpTimestamp = System.currentTimeMillis();
     }
    }
    
    public void setServiceUpTimestamp(long serviceUpTimestamp){
     this.serviceUpTimestamp = serviceUpTimestamp;
    }
    
  • lastUpdatedTimestamp 属性,最后更新租约时间戳。每次续租时,更新该时间戳。注册应用实例信息会使用到它如下方法,实现代码如下:

    public void setLastUpdatedTimestamp(){
     this.lastUpdatedTimestamp = System.currentTimeMillis();
    }
    
  • duration 属性,租约持续时间,单位:毫秒。当租约过久未续租,即当前时间 - lastUpdatedTimestamp > duration 时,租约过期。

  • evictionTimestamp 属性,租约过期时间戳。

3.3 注册应用实例信息

调用 AbstractInstanceRegistry#register(...) 方法,注册应用实例信息,实现代码如下:

 1: public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication){
 2: try {
 3: // 获取读锁
 4: read.lock();
 5: Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
 6: // 增加 注册次数 到 监控
 7: REGISTER.increment(isReplication);
 8: // 获得 应用实例信息 对应的 租约
 9: if (gMap == null) {
10: final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
11: gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
12: if (gMap == null) { // 添加 应用 成功
13: gMap = gNewMap;
14: }
15: }
16: Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
17: // Retain the last dirty timestamp without overwriting it, if there is already a lease
18: if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时,使用数据不一致的时间大的应用注册信息为有效的
19: Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
20: Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
21: logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
22: 
23: // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
24: // InstanceInfo instead of the server local copy.
25: if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
26: logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27: " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
28: logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
29: registrant = existingLease.getHolder();
30: }
31: } else {
32: // The lease does not exist and hence it is a new registration
33: // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
34: synchronized (lock) {
35: if (this.expectedNumberOfRenewsPerMin > 0) {
36: // Since the client wants to cancel it, reduce the threshold
37: // (1
38: // for 30 seconds, 2 for a minute)
39: this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
40: this.numberOfRenewsPerMinThreshold =
41: (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
42: }
43: }
44: logger.debug("No previous lease information found; it is new registration");
45: }
46: // 创建 租约
47: Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
48: if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳
49: lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
50: }
51: // 添加到 租约映射
52: gMap.put(registrant.getId(), lease);
53: // 添加到 最近注册的调试队列
54: synchronized (recentRegisteredQueue) {
55: recentRegisteredQueue.add(new Pair<Long, String>(
56: System.currentTimeMillis(),
57: registrant.getAppName() + "(" + registrant.getId() + ")"));
58: }
59: // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
60: // This is where the initial state transfer of overridden status happens
61: if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
62: logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
63: + "overrides", registrant.getOverriddenStatus(), registrant.getId());
64: if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
65: logger.info("Not found overridden id {} and hence adding it", registrant.getId());
66: overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
67: }
68: }
69: InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
70: if (overriddenStatusFromMap != null) {
71: logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
72: registrant.setOverriddenStatus(overriddenStatusFromMap);
73: }
74: 
75: // 获得应用实例最终状态,并设置应用实例的状态
76: // Set the status based on the overridden status rules
77: InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
78: registrant.setStatusWithoutDirty(overriddenInstanceStatus);
79: 
80: // 设置 租约的开始服务的时间戳(只有第一次有效)
81: // If the lease is registered with UP status, set lease service up timestamp
82: if (InstanceStatus.UP.equals(registrant.getStatus())) {
83: lease.serviceUp();
84: }
85: // 设置 应用实例信息的操作类型 为 添加
86: registrant.setActionType(ActionType.ADDED);
87: // 添加到 最近租约变更记录队列
88: recentlyChangedQueue.add(new RecentlyChangedItem(lease));
89: // 设置 租约的最后更新时间戳
90: registrant.setLastUpdatedTimestamp();
91: // 设置 响应缓存 过期
92: invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
93: logger.info("Registered instance {}/{} with status {} (replication={})",
94: registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
95: } finally {
96: // 释放锁
97: read.unlock();
98: }
99: }
  • 第 3 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。

  • 第 6 至 7 行 :增加注册次数到监控。配合 Netflix Servo 实现监控信息采集。

  • 第 5 至 16 行 :获得应用实例信息对应的 租约registry 实现代码如下:

    /**
    * 租约映射
    * key1 :应用名 {@link InstanceInfo#appName}
    * key2 :应用实例信息编号 {@link InstanceInfo#instanceId}
    * value :租约
    */
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    
  • 第 17 至 30 行 :当租约 已存在 ,判断 Server 已存在的 InstanceInfo 的 lastDirtyTimestamp 是否 大于 ( 不包括等于 ) Client 请求的 InstanceInfo , 若是,使用 Server 的 InstanceInfo 进行替代

  • 第 31 至 44 行 :增加 numberOfRenewsPerMinThresholdexpectedNumberOfRenewsPerMin ,自我保护机制相关,在 《Eureka 源码解析 —— 应用实例注册发现(四)之自我保护机制》 有详细解析。

  • 第 45 至 52 行 :创建租约,并添加到租约映射( registry )。

  • 第 53 至 58 行 :添加到最近注册的 调试 队列( recentRegisteredQueue ),用于 Eureka-Server 运维界面的显示,无实际业务逻辑使用。实现代码如下:

    /**
    * 最近注册的调试队列
    * key :添加时的时间戳
    * value :字符串 = 应用名(应用实例信息编号)
    */
    private final CircularQueue<Pair<Long, String>> recentRegisteredQueue;
    
    /**
    * 循环队列
    *
    * @param <E> 泛型
    */
    private class CircularQueue<E> extends ConcurrentLinkedQueue<E>{
    
     /**
    * 队列大小
    */
     private int size = 0;
    
     public CircularQueue(int size){
     this.size = size;
     }
    
     @Override
     public boolean add(E e){
     this.makeSpaceIfNotAvailable();
     return super.add(e);
    
     }
    
     /**
    * 保证空间足够
    *
    * 当空间不够时,移除首元素
    */
     private void makeSpaceIfNotAvailable(){
     if (this.size() == size) {
     this.remove();
     }
     }
    
     public boolean offer(E e){
     this.makeSpaceIfNotAvailable();
     return super.offer(e);
     }
    }
    
  • 第 59 至 68 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。

  • 第 69 至 73 行 :设置应用实例的覆盖状态( overridestatus ),避免注册应用实例后,丢失覆盖状态。在 《应用实例注册发现 (八)之覆盖状态》 详细解析。

  • 第 75 至 78 行 : 获得应用实例最终状态 ,并设置应用实例的状态。在 《应用实例注册发现 (八)之覆盖状态》 详细解析。

  • 第 80 至 84 行 :设置租约的开始服务的时间戳( 只有第一次有效 )。

  • 第 85 至 88 行 :设置应用实例信息的 操作类型为添加 ,并添加到最近租约变更记录队列( recentlyChangedQueue )。 recentlyChangedQueue 用于注册信息的 增量 获取,在 《应用实例注册发现 (七)之增量获取》 详细解析。实现代码如下:

    /**
    * 最近租约变更记录队列
    */
    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
    
  • 第 89 至 90 行 :设置租约的最后更新时间戳。

  • 第 91 至 92 行 :设置响应缓存( ResponseCache )过期,在 《Eureka 源码解析 —— 应用实例注册发现 (六)之全量获取》 详细解析。

  • 第 96 至 97 行 :释放锁。

666. 彩蛋

嘿嘿,蛮嗨的,比起前面几篇写配置相关的文章来说。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

原文  http://www.iocoder.cn/Eureka/instance-registry-register/?csdn&2017-11-08
正文到此结束
Loading...