在之前的文章: EurekaClient自动装配及启动流程解析 中,我们提到了在构造 DiscoveryClient
时除了包含注册流程之外,还调度了一个心跳线程:
scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
其中 HeartbeatThread
线程如下:
private class HeartbeatThread implements Runnable { public void run() { //续约 if (renew()) { //续约成功时间戳更新 lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { //发送续约请求 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); //重新注册 boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
这里直接发出了续约请求,如果续约请求失败则会尝试再次去注册
服务端接受续约请求的Controller在 InstanceResource
类中
@PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); //续约 boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // 续约失败 if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // 校验客户端与服务端的时间差异,如果存在问题则需要重新发起注册 Response response = null; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); return response; }
可以看到续约之后还有一个检查时间差的问题,这个不详细展开,继续往下看续约的相关信息
public boolean renew(final String appName, final String id, final boolean isReplication) { if (super.renew(appName, id, isReplication)) { //集群同步 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
这里集群同步的相关内容在之前的文章已经说过了,不再展开,续约的核心处理在下面
public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); //获取已存在的租约 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } //租约不存在 if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { //获取客户端 InstanceInfo instanceInfo = leaseToRenew.getHolder(); //设置客户端的状态 if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); //覆盖当前状态 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); //设置租约最后更新时间 leaseToRenew.renew(); return true; } }
对于看过之前文章的同学来说整体流程比较简单
在文章 Eureka应用注册与集群数据同步源码解析 一文中大家应该对下面这行代码比较熟悉
int registryCount = registry.syncUp();
上面这行代码发起了集群数据同步,而紧接着这行代码的就是服务端的过期租约清理逻辑
registry.openForTraffic(applicationInfoManager, registryCount);
openForTraffic
方法的最后调用了一个方法 postInit
,而在 postInit
方法中启动了一个线程 EvictionTask
,这个线程就负责清理已经过期的租约
evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs());
看一下这个线程
class EvictionTask extends TimerTask { @Override public void run() { try { //补偿时间毫秒数 long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); // 清理逻辑 evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } }
其中补偿时间的获取是这样的:
long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return 0l; } long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); //当前时间 - 最后任务执行时间 - 任务执行频率 long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0l ? 0l : compensationTime; }
接着看清理的核心逻辑
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // 1. 获得所有的过期租约 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // 2. 计算允许清理的数量 int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); // 3. 过期 if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); } } }
整个过期的执行过程主要分为以下3个步骤:
过期租约的计算方法为 isExpired
public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); }
服务下线时间>0||当前时间>(最后更新时间+租约持续时间+补偿时间)
getRenewalPercentThreshold()
默认值为0.85,也是就说默认情况下每次清理最大允许过期数量和15%的所有注册数量两者之间的最小值
过期的清理是随机进行的,这样设计也是为了避免单个应用全部过期的。
过期的处理则和注册的处理正好是相反的:
protected boolean internalCancel(String appName, String id, boolean isReplication) { try { read.lock(); CANCEL.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { leaseToCancel = gMap.remove(id); } synchronized (recentCanceledQueue) { recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); } InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { leaseToCancel.cancel(); InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); return true; } } finally { read.unlock(); } }