在之前的文章: EurekaClient自动装配及启动流程解析 中,我们提到了在类 DiscoveryClient
的构造方法中存在一个刷新线程和从服务端拉取注册信息的操作
这两个就是eureka获取服务列表的两种情况:
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); }
全量获取使用的 fetchRegistry
方法,如果使用此方法没有成功获取到的话则会执行 fetchRegistryFromBackup
方法使用备份方式拉取,备份拉取使用的是 BackupRegistry
接口的实现类,只不过eureka默认没有实现。
继续看拉取流程
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { Applications applications = getApplications(); if (clientConfig.shouldDisableDelta()//禁用部分获取 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch//全部获取 || (applications == null)//本地没有任何实例 || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } }
forceFullRegistryFetch
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>(); public Applications getApplications() { return localRegionApps.get(); }
可以看到所有实例应该缓存在 localRegionApps
对象中
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; //发起获取 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //缓存结果 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
其中调用的逻辑比较简单:
public EurekaHttpResponse<Applications> getApplications(String... regions) { return getApplicationsInternal("apps/", regions); } private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } }
全量获取的服务端Controller在类 ApplicationsResource
中
@GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader,@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,@Context UriInfo uriInfo,@Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } return response; }
虽然这个Controller很长,但是与返回结果相关的也就这么几行
Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); }
这里有两个点, Key
和 ResponseCacheImpl
Key
这个对象中包含了缓存键
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) { this.regions = regions; this.entityType = entityType; this.entityName = entityName; this.requestType = type; this.requestVersion = v; this.eurekaAccept = eurekaAccept; hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "") + requestType.name() + requestVersion.name() + this.eurekaAccept.name(); }
这个hashKey最后的结果就是类似于这样的:ApplicationALL_APPSJSONV2full
ResponseCacheImpl
这个对象是响应缓存的实现
当hashKey创造好之后, responseCache.getGZIP(cacheKey)
就是读取缓存并压缩的方法
public byte[] getGZIP(Key key) { Value payload = getValue(key, shouldUseReadOnlyResponseCache); if (payload == null) { return null; } return payload.getGzipped(); }
payload.getGzipped()
是压缩的方法就不看了,看 getValue
Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; }
大致就是先从 readOnlyCacheMap
只读缓存中获取,如果不存在的话则从 readWriteCacheMap
读写缓存中获取
上面服务端处理请求时是直接从缓存中读取的,那么这个缓存又是在什么时候生成的呢?
缓存的生成在 ResponseCacheImpl
的构造方法中
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } });
可以看到读写缓存的容量是1000,而缓存的生成方法在 generatePayload
方法中
private Value generatePayload(Key key) { Stopwatch tracer = null; try { String payload; switch (key.getEntityType()) { case Application: boolean isRemoteRegionRequested = key.hasRegions(); if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } } else { tracer = serializeOneApptimer.start(); payload = getPayLoad(key, registry.getApplication(key.getName())); } break; case VIP: case SVIP: tracer = serializeViptimer.start(); payload = getPayLoad(key, getApplicationsForVip(key, registry)); break; default: logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType()); payload = ""; break; } return new Value(payload); } finally { if (tracer != null) { tracer.stop(); } } }
这个方法的重点在这一句上`payload = getPayLoad(key, registry.getApplications());
`
getApplications
public Applications getApplications() { boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion(); if (disableTransparentFallback) { return getApplicationsFromLocalRegionOnly(); } else { return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled. } }
这里会进入 getApplicationsFromLocalRegionOnly
方法
public Applications getApplicationsFromLocalRegionOnly() { return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY); } public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) { boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0; logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}", includeRemoteRegion, remoteRegions); if (includeRemoteRegion) { GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment(); } else { GET_ALL_CACHE_MISS.increment(); } Applications apps = new Applications(); apps.setVersion(1L); for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) { Application app = null; if (entry.getValue() != null) { for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) { Lease<InstanceInfo> lease = stringLeaseEntry.getValue(); if (app == null) { app = new Application(lease.getHolder().getAppName()); } app.addInstance(decorateInstanceInfo(lease)); } } if (app != null) { apps.addApplication(app); } } if (includeRemoteRegion) { for (String remoteRegion : remoteRegions) { RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion); if (null != remoteRegistry) { Applications remoteApps = remoteRegistry.getApplications(); for (Application application : remoteApps.getRegisteredApplications()) { if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) { logger.info("Application {} fetched from the remote region {}", application.getName(), remoteRegion); Application appInstanceTillNow = apps.getRegisteredApplications(application.getName()); if (appInstanceTillNow == null) { appInstanceTillNow = new Application(application.getName()); apps.addApplication(appInstanceTillNow); } for (InstanceInfo instanceInfo : application.getInstances()) { appInstanceTillNow.addInstance(instanceInfo); } } else { logger.debug("Application {} not fetched from the remote region {} as there exists a " + "whitelist and this app is not in the whitelist.", application.getName(), remoteRegion); } } } else { logger.warn("No remote registry available for the remote region {}", remoteRegion); } } } apps.setAppsHashCode(apps.getReconcileHashCode()); return apps; }
这里获取的时候分为3个部分:
Application
对象封装,多个 Application
使用 Applications
对象封装 getPayLoad
这里则仅仅只是一个编码
private String getPayLoad(Key key, Applications apps) { // 获得编码器 EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept()); String result; try { // 编码 result = encoderWrapper.encode(apps); } catch (Exception e) { logger.error("Failed to encode the payload for all apps", e); return ""; } if(logger.isDebugEnabled()) { logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode()); } return result; }
只读缓存是定时刷新的,同样也在 ResponseCacheImpl
的构造方法中
if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); }
这个刷新任务是这样的
private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } } } }; }
观察for循环里面的内容,发现只读缓存的内容都是基于读写缓存来的
增量拉取的线程调度和发送心跳的线程调度是在一个方法 initScheduledTasks
中执行的,代码如下:
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS);
看一下线程 CacheRefreshThread
class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; //省略了一部分无关代码 //核心 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
核心在 fetchRegistry
方法,这个在上面已经说过了,只不过部分拉取获取调用的接口是 getAndUpdateDelta
而已
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
先看服务端的处理,然后再看如何处理结果吧
@Path("delta") @GET public Response getContainerDifferential( @PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); // If the delta flag is disabled in discovery or if the lease expiration // has been disabled, redirect clients to get all instances if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) { return Response.status(Status.FORBIDDEN).build(); } String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL_DELTA.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { return Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { return Response.ok(responseCache.get(cacheKey)) .build(); } }
这里的处理逻辑跟全量获取大部分逻辑都是一样的,只有一些几点不同:
if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } }
看看 getApplicationDeltas
方法吧
public Applications getApplicationDeltas() { GET_ALL_CACHE_MISS_DELTA.increment(); Applications apps = new Applications(); apps.setVersion(responseCache.getVersionDelta().get()); Map<String, Application> applicationInstancesMap = new HashMap<String, Application>(); try { write.lock(); Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator(); logger.debug("The number of elements in the delta queue is : {}", this.recentlyChangedQueue.size()); while (iter.hasNext()) { Lease<InstanceInfo> lease = iter.next().getLeaseInfo(); InstanceInfo instanceInfo = lease.getHolder(); logger.debug( "The instance id {} is found with status {} and actiontype {}", instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()); Application app = applicationInstancesMap.get(instanceInfo .getAppName()); if (app == null) { app = new Application(instanceInfo.getAppName()); applicationInstancesMap.put(instanceInfo.getAppName(), app); apps.addApplication(app); } app.addInstance(decorateInstanceInfo(lease)); } boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion(); if (!disableTransparentFallback) { Applications allAppsInLocalRegion = getApplications(false); for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) { Applications applications = remoteRegistry.getApplicationDeltas(); for (Application application : applications.getRegisteredApplications()) { Application appInLocalRegistry = allAppsInLocalRegion.getRegisteredApplications(application.getName()); if (appInLocalRegistry == null) { apps.addApplication(application); } } } } Applications allApps = getApplications(!disableTransparentFallback); apps.setAppsHashCode(allApps.getReconcileHashCode()); return apps; } finally { write.unlock(); } }
与全量获取不同的是这个最终的结果是从最近租约变更记录队列 recentlyChangedQueue
里来的,其他的流程则差不多
结果的处理代码
if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } }
updateDelta
private void updateDelta(Applications delta) { int deltaCount = 0; for (Application app : delta.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; if (ActionType.ADDED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Deleted instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance); } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }
大致处理流程为:
remoteRegionVsApps