摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-register/ 「芋道源码」欢迎转载,保留摘要,谢谢!
关注**微信公众号:【芋道源码】**有福利:
本文主要分享 Eureka-Client 向 Eureka-Server 注册应用实例的过程 。
FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑
Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件:
eureka.registration.enabled = true
,Eureka-Client 向 Eureka-Server 发起注册应用实例的 开关 。 每次 InstanceInfo 发生 属性变化 时,标记 isInstanceInfoDirty
属性为 true
,表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致,需要注册。另外,InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册。
当符合条件时,InstanceInfo 不会立即向 Eureka-Server 注册,而是后台线程 定时 注册。
当 InstanceInfo 的状态( status
) 属性发生变化时,并且配置 eureka.shouldOnDemandUpdateStatusChange = true
时,立即向 Eureka-Server 注册。 因为状态属性非常重要,一般情况下建议开启,当然默认情况也是开启的 。
Let's Go。让我们看看代码的实现。
// DiscoveryClient.java public class DiscoveryClient implements EurekaClient{ /** * 应用实例状态变更监听器 */ private ApplicationInfoManager.StatusChangeListener statusChangeListener; /** * 应用实例信息复制器 */ private InstanceInfoReplicator instanceInfoReplicator; private void initScheduledTasks(){ // ... 省略无关代码 if (clientConfig.shouldRegisterWithEureka()) { // ... 省略无关代码 // 创建 应用实例信息复制器 // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 创建 应用实例状态变更监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId(){ return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent){ if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; // 注册 应用实例状态变更监听器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 开启 应用实例信息复制器 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } } }
com.netflix.discovery.InstanceInfoReplicator
,应用实例信息复制器。
调用 InstanceInfoReplicator#start(...)
方法, 开启 应用实例信息复制器。实现代码如下:
// InstanceInfoReplicator.java class InstanceInfoReplicator implements Runnable{ private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class); private final DiscoveryClient discoveryClient; /** * 应用实例信息 */ private final InstanceInfo instanceInfo; /** * 定时执行频率,单位:秒 */ private final int replicationIntervalSeconds; /** * 定时执行器 */ private final ScheduledExecutorService scheduler; /** * 定时执行任务的 Future */ private final AtomicReference<Future> scheduledPeriodicRef; /** * 是否开启调度 */ private final AtomicBoolean started; private final RateLimiter rateLimiter; // 限流相关,跳过 private final int burstSize; // 限流相关,跳过 private final int allowedRatePerMinute; // 限流相关,跳过 InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this.discoveryClient = discoveryClient; this.instanceInfo = instanceInfo; this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") .setDaemon(true) .build()); this.scheduledPeriodicRef = new AtomicReference<Future>(); this.started = new AtomicBoolean(false); this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); } public void start(int initialDelayMs){ if (started.compareAndSet(false, true)) { // 设置 应用实例信息 数据不一致 instanceInfo.setIsDirty(); // for initial register // 提交任务,并设置该任务的 Future Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } // ... 省略无关方法 } // InstanceInfo.java private volatile boolean isInstanceInfoDirty = false; private volatile Long lastDirtyTimestamp; public synchronized void setIsDirty(){ isInstanceInfoDirty = true; lastDirtyTimestamp = System.currentTimeMillis(); }
instanceInfo.setIsDirty()
代码块,因为 InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册 。 ScheduledExecutorService#schedule(...)
方法,延迟 initialDelayMs
毫秒执行 一次 任务。为什么此处设置 scheduledPeriodicRef
?在 InstanceInfoReplicator#onDemandUpdate()
方法会看到具体用途。 定时检查 InstanceInfo 的状态( status
) 属性是否发生变化。 若是 ,发起注册。实现代码如下:
// InstanceInfoReplicator.java @Override public void run(){ try { // 刷新 应用实例信息 discoveryClient.refreshInstanceInfo(); // 判断 应用实例信息 是否数据不一致 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // 发起注册 discoveryClient.register(); // 设置 应用实例信息 数据一致 instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { // 提交任务,并设置该任务的 Future Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } // InstanceInfo.java public synchronized long setIsDirtyWithTime(){ setIsDirty(); return lastDirtyTimestamp; } public synchronized void unsetIsDirty(long unsetDirtyTimestamp){ if (lastDirtyTimestamp <= unsetDirtyTimestamp) { isInstanceInfoDirty = false; } else { } }
DiscoveryClient#refreshInstanceInfo()
方法,刷新应用实例信息。 此处可能导致应用实例信息数据不一致 ,在「2.2」刷新应用实例信息详细解析。 DiscoveryClient#register()
方法, Eureka-Client 向 Eureka-Server 注册应用实例 。 ScheduledExecutorService#schedule(...)
方法,再次延迟执行任务,并设置 scheduledPeriodicRef
。通过这样的方式,不断 循环 定时执行任务。 com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener
内部类 ,监听应用实例信息状态的变更。
调用 ApplicationInfoManager#registerStatusChangeListener(...)
方法,注册应用实例状态变更监听器。实现代码如下:
public class ApplicationInfoManager{ /** * 状态变更监听器 */ protected final Map<String, StatusChangeListener> listeners; public void registerStatusChangeListener(StatusChangeListener listener){ listeners.put(listener.getId(), listener); } }
业务里,调用 ApplicationInfoManager#setInstanceStatus(...)
方法,设置应用实例信息的状态,从而 通知 InstanceInfoReplicator#onDemandUpdate()
方法的调用。实现代码如下:
// ApplicationInfoManager.java public synchronized void setInstanceStatus(InstanceStatus status){ InstanceStatus next = instanceStatusMapper.map(status); if (next == null) { return; } InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } } // InstanceInfo.java public synchronized InstanceStatus setStatus(InstanceStatus status){ if (this.status != status) { InstanceStatus prev = this.status; this.status = status; // 设置 应用实例信息 数据一致 setIsDirty(); return prev; } return null; }
InstanceInfoReplicator#onDemandUpdate()
,实现代码如下:
// InstanceInfoReplicator.java public boolean onDemandUpdate(){ if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过 scheduler.submit(new Runnable() { @Override public void run(){ logger.debug("Executing on-demand update of local InstanceInfo"); // 取消任务 Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } // 再次调用 InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }
Future#cancel(false)
方法,取消定时任务, 避免无用的注册 。 InstanceInfoReplicator#run()
方法,发起注册。 调用 DiscoveryClient#refreshInstanceInfo()
方法,刷新应用实例信息。 此处可能导致应用实例信息数据不一致 ,实现代码如下:
void refreshInstanceInfo(){ // 刷新 数据中心信息 applicationInfoManager.refreshDataCenterInfoIfRequired(); // 刷新 租约信息 applicationInfoManager.refreshLeaseInfoIfRequired(); // 健康检查 InstanceStatus status; try { status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); status = InstanceStatus.DOWN; } if (null != status) { applicationInfoManager.setInstanceStatus(status); } }
调用 ApplicationInfoManager#refreshDataCenterInfoIfRequired()
方法,刷新数据中心相关信息,实现代码如下:
// ApplicationInfoManager.java public void refreshDataCenterInfoIfRequired(){ // hostname String existingAddress = instanceInfo.getHostName(); String newAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh data center info, and return up to date address newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true); } else { newAddress = config.getHostName(true); } // ip String newIp = config.getIpAddress(); if (newAddress != null && !newAddress.equals(existingAddress)) { logger.warn("The address changed from : {} => {}", existingAddress, newAddress); // :( in the legacy code here the builder is acting as a mutator. // This is hard to fix as this same instanceInfo instance is referenced elsewhere. // We will most likely re-write the client at sometime so not fixing for now. InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); builder.setHostName(newAddress) // hostname .setIPAddr(newIp) // ip .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo instanceInfo.setIsDirty(); } } public abstract class AbstractInstanceConfig implements EurekaInstanceConfig{ private static final Pair<String, String> hostInfo = getHostInfo(); @Override public String getHostName(boolean refresh){ return hostInfo.second(); } @Override public String getIpAddress(){ return hostInfo.first(); } private static Pair<String, String> getHostInfo(){ Pair<String, String> pair; try { InetAddress localHost = InetAddress.getLocalHost(); pair = new Pair<String, String>(localHost.getHostAddress(), localHost.getHostName()); } catch (UnknownHostException e) { logger.error("Cannot get host info", e); pair = new Pair<String, String>("", ""); } return pair; } }
hostName
、 ipAddr
、 dataCenterInfo
属性的变化。 AbstractInstanceConfig.hostInfo
是 静态属性 , 即使本机修改了 IP 等信息,Eureka-Client 进程也不会感知到 。TODO[0022]:看下springcloud 的实现 调用 ApplicationInfoManager#refreshLeaseInfoIfRequired()
方法,刷新租约相关信息,实现代码如下:
public void refreshLeaseInfoIfRequired(){ LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); if (leaseInfo == null) { return; } int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); if (leaseInfo.getDurationInSecs() != currentLeaseDuration // 租约过期时间 改变 || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { // 租约续约频率 改变 LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(currentLeaseRenewal) .setDurationInSecs(currentLeaseDuration) .build(); instanceInfo.setLeaseInfo(newLeaseInfo); instanceInfo.setIsDirty(); } }
renewalIntervalInSecs
、 durationInSecs
属性的变化。 调用 HealthCheckHandler#getStatus()
方法,健康检查。这里先暂时跳过,我们在详细解析。
调用 DiscoveryClient#register()
方法, Eureka-Client 向 Eureka-Server 注册应用实例 ,实现代码如下:
// DiscoveryClient.java boolean register() throws Throwable{ logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; } // AbstractJerseyEurekaHttpClient.java @Override public EurekaHttpResponse<Void> register(InstanceInfo info){ String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
AbstractJerseyEurekaHttpClient#register(...)
方法, POST
请求 Eureka-Server 的 apps/${APP_NAME}
接口,参数为 InstanceInfo ,实现注册实例信息的注册。 com.netflix.eureka.resources.ApplicationResource
,处理 单个 应用的请求操作的 Resource ( Controller )。
注册应用实例信息的请求,映射 ApplicationResource#addInstance()
方法,实现代码如下:
@Produces({"application/xml", "application/json"}) public class ApplicationResource{ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication){ // 校验参数是否合法 logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // AWS 相关,跳过 // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } // 注册应用实例信息 registry.register(info, "true".equals(isReplication)); // 返回 204 成功 return Response.status(204).build(); // 204 to be backwards compatible } }
请求头 isReplication
参数,和 Eureka-Server 集群复制相关,暂时跳过。
调用 PeerAwareInstanceRegistryImpl#register(...)
方法,注册应用实例信息。实现代码如下:
@Override public void register(final InstanceInfo info, final boolean isReplication){ // 租约过期时间 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 注册应用实例信息 super.register(info, leaseDuration, isReplication); // Eureka-Server 复制 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
AbstractInstanceRegistry#register(...)
方法,注册应用实例信息。 在看具体的注册应用实例信息的逻辑之前,我们先来看下 com.netflix.eureka.lease.Lease
,租约。实现代码如下:
public class Lease<T>{ /** * 实体 */ private T holder; /** * 注册时间戳 */ private long registrationTimestamp; /** * 开始服务时间戳 */ private long serviceUpTimestamp; /** * 取消注册时间戳 */ private long evictionTimestamp; /** * 最后更新时间戳 */ // Make it volatile so that the expiration task would see this quicker private volatile long lastUpdateTimestamp; /** * 租约持续时长,单位:毫秒 */ private long duration; public Lease(T r, int durationInSecs){ holder = r; registrationTimestamp = System.currentTimeMillis(); lastUpdateTimestamp = registrationTimestamp; duration = (durationInSecs * 1000); } }
holder
属性,租约的持有者。在 Eureka-Server 里,暂时只有 InstanceInfo 使用。
registrationTimestamp
属性,注册( 创建 )租约时间戳。在 构造方法 里可以看租约对象的创建时间戳即为注册租约时间戳。
serviceUpTimestamp
属性,开始服务时间戳。注册应用实例信息会使用到它如下两个方法,实现代码如下:
public void serviceUp(){ if (serviceUpTimestamp == 0) { // 第一次有效 serviceUpTimestamp = System.currentTimeMillis(); } } public void setServiceUpTimestamp(long serviceUpTimestamp){ this.serviceUpTimestamp = serviceUpTimestamp; }
lastUpdatedTimestamp
属性,最后更新租约时间戳。每次续租时,更新该时间戳。注册应用实例信息会使用到它如下方法,实现代码如下:
public void setLastUpdatedTimestamp(){ this.lastUpdatedTimestamp = System.currentTimeMillis(); }
duration
属性,租约持续时间,单位:毫秒。当租约过久未续租,即当前时间 - lastUpdatedTimestamp
> duration
时,租约过期。
evictionTimestamp
属性,租约过期时间戳。
调用 AbstractInstanceRegistry#register(...)
方法,注册应用实例信息,实现代码如下:
1: public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication){ 2: try { 3: // 获取读锁 4: read.lock(); 5: Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); 6: // 增加 注册次数 到 监控 7: REGISTER.increment(isReplication); 8: // 获得 应用实例信息 对应的 租约 9: if (gMap == null) { 10: final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); 11: gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用 12: if (gMap == null) { // 添加 应用 成功 13: gMap = gNewMap; 14: } 15: } 16: Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); 17: // Retain the last dirty timestamp without overwriting it, if there is already a lease 18: if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时,使用数据不一致的时间大的应用注册信息为有效的 19: Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo 20: Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo 21: logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 22: 23: // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted 24: // InstanceInfo instead of the server local copy. 25: if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 26: logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + 27: " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 28: logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); 29: registrant = existingLease.getHolder(); 30: } 31: } else { 32: // The lease does not exist and hence it is a new registration 33: // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin` 34: synchronized (lock) { 35: if (this.expectedNumberOfRenewsPerMin > 0) { 36: // Since the client wants to cancel it, reduce the threshold 37: // (1 38: // for 30 seconds, 2 for a minute) 39: this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; 40: this.numberOfRenewsPerMinThreshold = 41: (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); 42: } 43: } 44: logger.debug("No previous lease information found; it is new registration"); 45: } 46: // 创建 租约 47: Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); 48: if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳 49: lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 50: } 51: // 添加到 租约映射 52: gMap.put(registrant.getId(), lease); 53: // 添加到 最近注册的调试队列 54: synchronized (recentRegisteredQueue) { 55: recentRegisteredQueue.add(new Pair<Long, String>( 56: System.currentTimeMillis(), 57: registrant.getAppName() + "(" + registrant.getId() + ")")); 58: } 59: // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用) 60: // This is where the initial state transfer of overridden status happens 61: if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 62: logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " 63: + "overrides", registrant.getOverriddenStatus(), registrant.getId()); 64: if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { 65: logger.info("Not found overridden id {} and hence adding it", registrant.getId()); 66: overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); 67: } 68: } 69: InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); 70: if (overriddenStatusFromMap != null) { 71: logger.info("Storing overridden status {} from map", overriddenStatusFromMap); 72: registrant.setOverriddenStatus(overriddenStatusFromMap); 73: } 74: 75: // 获得应用实例最终状态,并设置应用实例的状态 76: // Set the status based on the overridden status rules 77: InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); 78: registrant.setStatusWithoutDirty(overriddenInstanceStatus); 79: 80: // 设置 租约的开始服务的时间戳(只有第一次有效) 81: // If the lease is registered with UP status, set lease service up timestamp 82: if (InstanceStatus.UP.equals(registrant.getStatus())) { 83: lease.serviceUp(); 84: } 85: // 设置 应用实例信息的操作类型 为 添加 86: registrant.setActionType(ActionType.ADDED); 87: // 添加到 最近租约变更记录队列 88: recentlyChangedQueue.add(new RecentlyChangedItem(lease)); 89: // 设置 租约的最后更新时间戳 90: registrant.setLastUpdatedTimestamp(); 91: // 设置 响应缓存 过期 92: invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 93: logger.info("Registered instance {}/{} with status {} (replication={})", 94: registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); 95: } finally { 96: // 释放锁 97: read.unlock(); 98: } 99: }
第 3 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。
第 6 至 7 行 :增加注册次数到监控。配合 Netflix Servo 实现监控信息采集。
第 5 至 16 行 :获得应用实例信息对应的 租约 。 registry
实现代码如下:
/** * 租约映射 * key1 :应用名 {@link InstanceInfo#appName} * key2 :应用实例信息编号 {@link InstanceInfo#instanceId} * value :租约 */ private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
第 17 至 30 行 :当租约 已存在 ,判断 Server 已存在的 InstanceInfo 的 lastDirtyTimestamp
是否 大于 ( 不包括等于 ) Client 请求的 InstanceInfo , 若是,使用 Server 的 InstanceInfo 进行替代 。
第 31 至 44 行 :增加 numberOfRenewsPerMinThreshold
、 expectedNumberOfRenewsPerMin
,自我保护机制相关,在 《Eureka 源码解析 —— 应用实例注册发现(四)之自我保护机制》 有详细解析。
第 45 至 52 行 :创建租约,并添加到租约映射( registry
)。
第 53 至 58 行 :添加到最近注册的 调试 队列( recentRegisteredQueue
),用于 Eureka-Server 运维界面的显示,无实际业务逻辑使用。实现代码如下:
/** * 最近注册的调试队列 * key :添加时的时间戳 * value :字符串 = 应用名(应用实例信息编号) */ private final CircularQueue<Pair<Long, String>> recentRegisteredQueue; /** * 循环队列 * * @param <E> 泛型 */ private class CircularQueue<E> extends ConcurrentLinkedQueue<E>{ /** * 队列大小 */ private int size = 0; public CircularQueue(int size){ this.size = size; } @Override public boolean add(E e){ this.makeSpaceIfNotAvailable(); return super.add(e); } /** * 保证空间足够 * * 当空间不够时,移除首元素 */ private void makeSpaceIfNotAvailable(){ if (this.size() == size) { this.remove(); } } public boolean offer(E e){ this.makeSpaceIfNotAvailable(); return super.offer(e); } }
第 59 至 68 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。
第 69 至 73 行 :设置应用实例的覆盖状态( overridestatus
),避免注册应用实例后,丢失覆盖状态。在 《应用实例注册发现 (八)之覆盖状态》 详细解析。
第 75 至 78 行 : 获得应用实例最终状态 ,并设置应用实例的状态。在 《应用实例注册发现 (八)之覆盖状态》 详细解析。
第 80 至 84 行 :设置租约的开始服务的时间戳( 只有第一次有效 )。
第 85 至 88 行 :设置应用实例信息的 操作类型为添加 ,并添加到最近租约变更记录队列( recentlyChangedQueue
)。 recentlyChangedQueue
用于注册信息的 增量 获取,在 《应用实例注册发现 (七)之增量获取》 详细解析。实现代码如下:
/** * 最近租约变更记录队列 */ private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
第 89 至 90 行 :设置租约的最后更新时间戳。
第 91 至 92 行 :设置响应缓存( ResponseCache )过期,在 《Eureka 源码解析 —— 应用实例注册发现 (六)之全量获取》 详细解析。
第 96 至 97 行 :释放锁。
嘿嘿,蛮嗨的,比起前面几篇写配置相关的文章来说。
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?