摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-renew/ 「芋道源码」欢迎转载,保留摘要,谢谢!
关注 微信公众号:【芋道源码】 有福利:
本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程 。
FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑
Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease )。
Eureka-Client 固定间隔 向 Eureka-Server 发起 续租 ( renew ),避免租约过期。
默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。
Eureka-Client 在初始化过程中,创建 心跳 线程, 固定间隔 向 Eureka-Server 发起 续租 ( renew )。实现代码如下:
// DiscoveryClient.java DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { // ... 省略无关代码 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // ... 省略无关代码 // 【3.2.14】初始化定时任务 initScheduledTasks(); } private void initScheduledTasks(){ // 向 Eureka-Server 心跳(续租)执行器 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); // logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // ... 省略无关代码 } // ... 省略无关代码 }
scheduler
,定时任务服务,用于定时触发心跳( 续租 )。细心如你,会发现任务提交的方式是 ScheduledExecutorService#schedule(...)
方法, 只延迟执行一次心跳,说好的固定频率执行心跳呢 !!!答案在「2.3 TimedSupervisorTask」揭晓。 heartbeatExecutor
,心跳任务执行线程池。为什么有 scheduler
的情况下,还有 heartbeatExecutor
???答案也在「2.3 TimedSupervisorTask」揭晓。 com.netflix.discovery.DiscoveryClient.HeartbeatThread
,心跳线程, 实现 执行 Eureka-Client 向 Eureka-Server 发起 续租 ( renew )请求。实现代码如下:
// DiscoveryClient.java /** * 最后成功向 Eureka-Server 心跳时间戳 */ private volatile long lastSuccessfulHeartbeatTimestamp = -1; private class HeartbeatThread implements Runnable{ public void run(){ if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
调用 #renew
方法,执行续租逻辑。实现代码如下:
// DiscoveryClient.java boolean renew(){ EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); // 发起注册 boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }
调用 AbstractJerseyEurekaHttpClient#sendHeartBeat(...)
方法,发起 续租请求 ,实现代码如下:
// AbstractJerseyEurekaHttpClient.java @Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus){ String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
apps/${APP_NAME}/${INSTANCE_INFO_ID}
接口,参数为 status
、 lastDirtyTimestamp
、 overriddenstatus
,实现续租。 调用 AbstractJerseyEurekaHttpClient#register(...)
方法,当 Eureka-Server 不存在租约 时,重新发起注册,在 《Eureka 源码解析 —— 应用实例注册发现 (一)之注册》 有详细解析。
com.netflix.discovery.TimedSupervisorTask
,监管 定时任务 的任务。
A supervisor task that schedules subtasks while enforce a timeout.
创建 TimedSupervisorTask 代码如下:
public class TimedSupervisorTask extends TimerTask{ private final Counter timeoutCounter; private final Counter rejectedCounter; private final Counter throwableCounter; private final LongGauge threadPoolLevelGauge; /** * 定时任务服务 */ private final ScheduledExecutorService scheduler; /** * 执行子任务线程池 */ private final ThreadPoolExecutor executor; /** * 子任务执行超时时间 */ private final long timeoutMillis; /** * 子任务 */ private final Runnable task; /** * 当前任子务执行频率 */ private final AtomicLong delay; /** * 最大子任务执行频率 * * 子任务执行超时情况下使用 */ private final long maxDelay; public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task){ this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); } }
scheduler
,定时任务服务,用于定时【 发起 】子任务。 executor
,执行子任务线程池,用于【 提交 】子任务执行。 task
,子任务。 timeoutMillis
,子任务执行超时时间,单位:毫秒。 delay
,当前子任务执行频率,单位:毫秒。值等于 timeout
参数。 maxDelay
, 最大 子任务执行频率,单位:毫秒。值等于 timeout * expBackOffBound
参数。
scheduler
初始化延迟执行 TimedSupervisorTask 。 task
到 executor
执行任务。
task
执行正常,TimedSupervisorTask 再次 提交 自己 到 scheduler
延迟 timeoutMillis
执行。 task
执行超时,重新计算延迟时间( 不允许超过 maxDelay
), 再次 提交 自己 到 scheduler
延迟执行。 实现代码如下:
// TimedSupervisorTask.java 1: @Override 2: public void run(){ 3: Future<?> future = null; 4: try { 5: // 提交 任务 6: future = executor.submit(task); 7: // 8: threadPoolLevelGauge.set((long) executor.getActiveCount()); 9: // 等待任务 执行完成 或 超时 10: future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout 11: // 设置 下一次任务执行频率 12: delay.set(timeoutMillis); 13: // 14: threadPoolLevelGauge.set((long) executor.getActiveCount()); 15: } catch (TimeoutException e) { 16: logger.error("task supervisor timed out", e); 17: timeoutCounter.increment(); // 18: 19: // 设置 下一次任务执行频率 20: long currentDelay = delay.get(); 21: long newDelay = Math.min(maxDelay, currentDelay * 2); 22: delay.compareAndSet(currentDelay, newDelay); 23: 24: } catch (RejectedExecutionException e) { 25: if (executor.isShutdown() || scheduler.isShutdown()) { 26: logger.warn("task supervisor shutting down, reject the task", e); 27: } else { 28: logger.error("task supervisor rejected the task", e); 29: } 30: 31: rejectedCounter.increment(); // 32: } catch (Throwable e) { 33: if (executor.isShutdown() || scheduler.isShutdown()) { 34: logger.warn("task supervisor shutting down, can't accept the task"); 35: } else { 36: logger.error("task supervisor threw an exception", e); 37: } 38: 39: throwableCounter.increment(); // 40: } finally { 41: // 取消 未完成的任务 42: if (future != null) { 43: future.cancel(true); 44: } 45: 46: // 调度 下次任务 47: if (!scheduler.isShutdown()) { 48: scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); 49: } 50: } 51: }
task
到执行子任务线程池 executor
。 task
执行完成或执行超时。 task
执行完成,设置下一次执行延迟 delay
。 task
执行超时,重新计算下一次执行延迟 delay
。计算公式为 Math.min(maxDelay, currentDelay * 2)
。如果多次超时,超时时间不断乘以 2 ,不允许超过最大延迟时间( maxDelay
)。 com.netflix.eureka.resources.InstanceResource
,处理 单个 应用实例信息的请求操作的 Resource ( Controller )。
续租应用实例信息的请求,映射 InstanceResource#renewLease()
方法,实现代码如下:
1: @PUT 2: public Response renewLease( 3: @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, 4: @QueryParam("overriddenstatus") String overriddenStatus, 5: @QueryParam("status") String status, 6: @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp){ 7: boolean isFromReplicaNode = "true".equals(isReplication); 8: // 续租 9: boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); 10: 11: // 续租失败 12: // Not found in the registry, immediately ask for a register 13: if (!isSuccess) { 14: logger.warn("Not Found (Renew): {} - {}", app.getName(), id); 15: return Response.status(Status.NOT_FOUND).build(); 16: } 17: 18: // 比较 InstanceInfo 的 lastDirtyTimestamp 属性 19: // Check if we need to sync based on dirty time stamp, the client 20: // instance might have changed some value 21: Response response = null; 22: if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { 23: response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); 24: // Store the overridden status since the validation found out the node that replicates wins 25: if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() 26: && (overriddenStatus != null) 27: && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) 28: && isFromReplicaNode) { 29: registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); 30: } 31: } else { // 成功 32: response = Response.ok().build(); 33: } 34: logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus()); 35: return response; 36: }
第 8 至 9 行 :调用 PeerAwareInstanceRegistryImpl#renew(...)
方法,续租。实现代码如下:
// PeerAwareInstanceRegistryImpl.java public boolean renew(final String appName, final String id, final boolean isReplication){ if (super.renew(appName, id, isReplication)) { // 续租 // Eureka-Server 复制 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
AbstractInstanceRegistry#renew(...)
方法,注册应用实例信息。 第 11 至 16 行 :续租失败,返回 404 响应。当 Eureka-Client 收到 404 响应后,会重新发起 InstanceInfo 的注册。
第 18 至 30 行 :比较请求的 lastDirtyTimestamp
和 Server 的 InstanceInfo 的 lastDirtyTimestamp
属性差异,需要配置 eureka.syncWhenTimestampDiffers = true
( 默认开启 )。
第 23 行 :调用 #validateDirtyTimestamp(...)
方法,比较 lastDirtyTimestamp
的差异。实现代码如下:
// InstanceResource.java 1: private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication){ 2: // 获取 InstanceInfo 3: InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false); 4: if (appInfo != null) { 5: if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) { 6: Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication}; 7: // 请求 的 较大 8: if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) { 9: logger.debug("Time to sync, since the last dirty timestamp differs -" 10: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args); 11: return Response.status(Status.NOT_FOUND).build(); 12: // Server 的 较大 13: } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) { 14: // In the case of replication, send the current instance info in the registry for the 15: // replicating node to sync itself with this one. 16: if (isReplication) { 17: logger.debug( 18: "Time to sync, since the last dirty timestamp differs -" 19: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", 20: args); 21: return Response.status(Status.CONFLICT).entity(appInfo).build(); 22: } else { 23: return Response.ok().build(); 24: } 25: } 26: } 27: 28: } 29: return Response.ok().build(); 30: }
lastDirtyTimestamp
较大, 意味着请求方( 可能是 Eureka-Client ,也可能是 Eureka-Server 集群内的其他 Server )存在 InstanceInfo 和 Eureka-Server 的 InstanceInfo 的数据不一致,返回 404 响应。请求方收到 404 响应后重新发起注册 。 lastDirtyTimestamp
较大,并且请求方为 Eureka-Client,续租成功,返回 200 成功响应。 lastDirtyTimestamp
一致,返回 200 成功响应。 第 31 至 33 行 :续租成功,返回 200 成功响应。
调用 AbstractInstanceRegistry#renew(...)
方法,续租应用实例信息,实现代码如下:
1: public boolean renew(String appName, String id, boolean isReplication){ 2: // 增加 续租次数 到 监控 3: RENEW.increment(isReplication); 4: // 获得 租约 5: Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); 6: Lease<InstanceInfo> leaseToRenew = null; 7: if (gMap != null) { 8: leaseToRenew = gMap.get(id); 9: } 10: // 租约不存在 11: if (leaseToRenew == null) { 12: RENEW_NOT_FOUND.increment(isReplication); 13: logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); 14: return false; 15: } else { 16: InstanceInfo instanceInfo = leaseToRenew.getHolder(); 17: if (instanceInfo != null) { 18: // touchASGCache(instanceInfo.getASGName()); 19: // override status 20: InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( 21: instanceInfo, leaseToRenew, isReplication); 22: if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { 23: logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" 24: + "; re-register required", instanceInfo.getId()); 25: RENEW_NOT_FOUND.increment(isReplication); 26: return false; 27: } 28: if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { 29: Object[] args = { 30: instanceInfo.getStatus().name(), 31: instanceInfo.getOverriddenStatus().name(), 32: instanceInfo.getId() 33: }; 34: logger.info( 35: "The instance status {} is different from overridden instance status {} for instance {}. " 36: + "Hence setting the status to overridden status", args); 37: instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); 38: } 39: } 40: // 新增 续租每分钟次数 41: renewsLastMin.increment(); 42: // 设置 租约最后更新时间(续租) 43: leaseToRenew.renew(); 44: return true; 45: } 46: }
false
)。 UNKNOWN
,无法续约,返回 false
。在 《应用实例注册发现 (八)之覆盖状态》 详细解析。 第 40 至 41 行 :新增续租每分钟次数( renewsLastMin
)。 com.netflix.eureka.util.MeasuredRate
,速度测量类,实现代码如下:
// AbstractInstanceRegistry.java /** * 续租每分钟次数 */ private final MeasuredRate renewsLastMin; // MeasuredRate.java public class MeasuredRate{ /** * 上一个间隔次数 */ private final AtomicLong lastBucket = new AtomicLong(0); /** * 当前间隔次数 */ private final AtomicLong currentBucket = new AtomicLong(0); /** * 间隔 */ private final long sampleInterval; /** * 定时器 */ private final Timer timer; private volatile boolean isActive; public MeasuredRate(long sampleInterval){ this.sampleInterval = sampleInterval; this.timer = new Timer("Eureka-MeasureRateTimer", true); this.isActive = false; } public synchronized void start(){ if (!isActive) { timer.schedule(new TimerTask() { @Override public void run(){ try { // Zero out the current bucket. lastBucket.set(currentBucket.getAndSet(0)); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate", e); } } }, sampleInterval, sampleInterval); isActive = true; } } public synchronized void stop(){ if (isActive) { timer.cancel(); isActive = false; } } /** * Returns the count in the last sample interval. */ public long getCount(){ return lastBucket.get(); } /** * Increments the count in the current sample interval. */ public void increment(){ currentBucket.incrementAndGet(); } }
timer
,定时器,负责每个 sampleInterval
间隔重置 当前次数 ( currentBucket
),并将 原当前次数 设置到 上一个次数 ( lastBucket
)。 #increment()
方法,返回 当前次数 ( currentBucket
)。 #getCount()
方法,返回 上一个次数 ( lastBucket
)。 renewsLastMin
有如下用途:
第 42 至 43 行 :调用 Lease#renew()
方法,设置租约最后更新时间( 续租 ),实现代码如下:
public void renew(){ lastUpdateTimestamp = System.currentTimeMillis() + duration; }
第 44 行 :返回续租成功( true
)。
效率比想象的低一些,加油继续更新下一篇。
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?