摘要: 原创出处 http://www.iocoder.cn/Eureka/end-point-and-resolver/ 「芋道源码」欢迎转载,保留摘要,谢谢!
关注 微信公众号:【芋道源码】 有福利:
本文主要分享 EndPoint 与 解析器 。
目前有多种 Eureka-Server 访问地址的配置方式, 本文只分享 Eureka 1.x 的配置 ,不包含 Eureka 1.x 对 Eureka 2.x 的兼容配置:
eureka.serviceUrl.defaultZone=http://127.0.0.1:8080/v2
。 eureka.shouldUseDns=true
并且 eureka.eurekaServer.domainName=eureka.iocoder.cn
。 本文涉及类在 com.netflix.discovery.shared.resolver
包下,涉及到主体类的类图如下(打开大图 ):
com.netflix.discovery.shared.resolver.EurekaEndpoint
,Eureka 服务端点 接口 ,实现代码如下:
public interface EurekaEndpoint extends Comparable<Object>{ /** * @return 完整的服务 URL */ String getServiceUrl(); /** * @deprecated use {@link #getNetworkAddress()} */ @Deprecated String getHostName(); /** * @return 网络地址 */ String getNetworkAddress(); /** * @return 端口 */ int getPort(); /** * @return 是否安全( https ) */ boolean isSecure(); /** * @return 相对路径 */ String getRelativeUri(); }
com.netflix.discovery.shared.resolver.DefaultEndpoint
,默认 Eureka 服务端点 实现类 。实现代码如下:
public class DefaultEndpoint implements EurekaEndpoint{ /** * 网络地址 */ protected final String networkAddress; /** * 端口 */ protected final int port; /** * 是否安全( https ) */ protected final boolean isSecure; /** * 相对地址 */ protected final String relativeUri; /** * 完整的服务 URL */ protected final String serviceUrl; public DefaultEndpoint(String serviceUrl){ this.serviceUrl = serviceUrl; // 将 serviceUrl 分解成 几个属性 try { URL url = new URL(serviceUrl); this.networkAddress = url.getHost(); this.port = url.getPort(); this.isSecure = "https".equals(url.getProtocol()); this.relativeUri = url.getPath(); } catch (Exception e) { throw new IllegalArgumentException("Malformed serviceUrl: " + serviceUrl); } } public DefaultEndpoint(String networkAddress, int port, boolean isSecure, String relativeUri){ this.networkAddress = networkAddress; this.port = port; this.isSecure = isSecure; this.relativeUri = relativeUri; // 几个属性 拼接成 serviceUrl StringBuilder sb = new StringBuilder().append(isSecure ? "https" : "http").append("://").append(networkAddress); if (port >= 0) { sb.append(':').append(port); } if (relativeUri != null) { if (!relativeUri.startsWith("/")) { sb.append('/'); } sb.append(relativeUri); } this.serviceUrl = sb.toString(); } }
#equals(...)
和 #hashCode(...)
方法,标准实现方式,这里就不贴代码了。 #compareTo(...)
方法,基于 serviceUrl
属性做比较。 com.netflix.discovery.shared.resolver.aws.AwsEndpoint
,基于 region
、 zone
的 Eureka 服务端点 实现类 ( 请不要在意 AWS 开头 )。实现代码如下:
public class AwsEndpoint extends DefaultEndpoint{ /** * 区域 */ protected final String region; /** * 可用区 */ protected final String zone; }
#equals(...)
和 #hashCode(...)
方法,标准实现方式,这里就不贴代码了。 EndPoint 解析器使用 委托设计模式 实现。所以,上文图片中我们看到好多个解析器, 实际代码非常非常非常清晰 。
FROM 《委托模式》
委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。
我们在上图的基础上, 增加委托的关系 ,如下图:
com.netflix.discovery.shared.resolver.ClusterResolver
,集群解析器 接口 。接口代码如下:
public interface ClusterResolver<T extends EurekaEndpoint>{ /** * @return 地区 */ String getRegion(); /** * @return EndPoint 集群( 数组 ) */ List<T> getClusterEndpoints(); }
com.netflix.discovery.shared.resolver.ClosableResolver
, 可关闭 的解析器 接口 ,继承自 ClusterResolver 接口 。接口代码如下:
public interface ClosableResolver<T extends EurekaEndpoint> extends ClusterResolver<T>{ /** * 关闭 */ void shutdown(); }
com.netflix.discovery.shared.resolver.aws.DnsTxtRecordClusterResolver
,基于 DNS TXT 记录类型的集群解析器。 类属性 代码如下:
public class DnsTxtRecordClusterResolver implements ClusterResolver<AwsEndpoint>{ /** * 地区 */ private final String region; /** * 集群根地址,例如 txt.default.eureka.iocoder.cn */ private final String rootClusterDNS; /** * 是否解析可用区( zone ) */ private final boolean extractZoneFromDNS; /** * 端口 */ private final int port; /** * 是否安全 */ private final boolean isSecure; /** * 相对地址 */ private final String relativeUri; }
DnsTxtRecordClusterResolver 通过集群根地址( rootClusterDNS
) 解析出 EndPoint 集群。需要在 DNS 配置 两层 解析记录:
TXT.${REGION}.${自定义二级域名}
。 TXT.${ZONE}.${自定义二级域名}
或者 ${ZONE}.${自定义二级域名}
。 举个例子:
rootClusterDNS
,集群根地址。例如: txt.default.eureka.iocoder.cn
,其· txt.default.eureka
为 DNS 解析记录的第一层的 主机记录 。
region
:地区。需要和 rootClusterDNS
的 ${REGION}
一致。 extractZoneFromDNS
:是否解析 DNS 解析记录的第二层级的 主机记录 的 ${ZONE}
可用区。 #getClusterEndpoints(...)
方法,实现代码如下:
1: @Override 2: public List<AwsEndpoint> getClusterEndpoints(){ 3: List<AwsEndpoint> eurekaEndpoints = resolve(region, rootClusterDNS, extractZoneFromDNS, port, isSecure, relativeUri); 4: if (logger.isDebugEnabled()) { 5: logger.debug("Resolved {} to {}", rootClusterDNS, eurekaEndpoints); 6: } 7: return eurekaEndpoints; 8: } 9: 10: private static List<AwsEndpoint> resolve(String region, String rootClusterDNS, boolean extractZone, int port, boolean isSecure, String relativeUri){ 11: try { 12: // 解析 第一层 DNS 记录 13: Set<String> zoneDomainNames = resolve(rootClusterDNS); 14: if (zoneDomainNames.isEmpty()) { 15: throw new ClusterResolverException("Cannot resolve Eureka cluster addresses; there are no data in TXT record for DN " + rootClusterDNS); 16: } 17: // 记录 第二层 DNS 记录 18: List<AwsEndpoint> endpoints = new ArrayList<>(); 19: for (String zoneDomain : zoneDomainNames) { 20: String zone = extractZone ? ResolverUtils.extractZoneFromHostName(zoneDomain) : null; // 21: Set<String> zoneAddresses = resolve(zoneDomain); 22: for (String address : zoneAddresses) { 23: endpoints.add(new AwsEndpoint(address, port, isSecure, relativeUri, region, zone)); 24: } 25: } 26: return endpoints; 27: } catch (NamingException e) { 28: throw new ClusterResolverException("Cannot resolve Eureka cluster addresses for root: " + rootClusterDNS, e); 29: } 30: }
第 12 至 16 行 :调用 #resolve(rootClusterDNS)
解析 第一层 DNS 记录。实现代码如下:
1: private static Set<String> resolve(String rootClusterDNS) throws NamingException{ 2: Set<String> result; 3: try { 4: result = DnsResolver.getCNamesFromTxtRecord(rootClusterDNS); 5: // TODO 芋艿:这块是bug,不需要这一段 6: if (!rootClusterDNS.startsWith("txt.")) { 7: result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS); 8: } 9: } catch (NamingException e) { 10: if (!rootClusterDNS.startsWith("txt.")) { 11: result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS); 12: } else { 13: throw e; 14: } 15: } 16: return result; 17: }
DnsResolver#getCNamesFromTxtRecord(...)
方法,解析 TXT 主机记录。点击 链接 查看带中文注释的 DnsResolver 的代码,比较解析,笔者就不啰嗦了。 rootClusterDNS
不以 txt.
开头时,即使第 4 行解析成功,也会报错,此时是个 Eureka 的 BUG 。因此,配置 DNS 解析记录时,主机记录暂时必须以 txt.
开头。 第 17 至 25 行 :循环第一层 DNS 记录的解析结果,进一步解析第二层 DNS 记录。
zone
)。 #resolve(rootClusterDNS)
解析 第二层 DNS 记录。 com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver
,基于 配置文件 的集群解析器。 类属性 代码如下:
public class ConfigClusterResolver implements ClusterResolver<AwsEndpoint>{ private final EurekaClientConfig clientConfig; private final InstanceInfo myInstanceInfo; public ConfigClusterResolver(EurekaClientConfig clientConfig, InstanceInfo myInstanceInfo){ this.clientConfig = clientConfig; this.myInstanceInfo = myInstanceInfo; } }
#getClusterEndpoints(...)
方法,实现代码如下:
1: @Override 2: public List<AwsEndpoint> getClusterEndpoints(){ 3: // 使用 DNS 获取 EndPoint 4: if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { 5: if (logger.isInfoEnabled()) { 6: logger.info("Resolving eureka endpoints via DNS: {}", getDNSName()); 7: } 8: return getClusterEndpointsFromDns(); 9: } else { 10: // 直接配置实际访问地址 11: logger.info("Resolving eureka endpoints via configuration"); 12: return getClusterEndpointsFromConfig(); 13: } 14: }
第 3 至 8 行 :基于 DNS 获取 EndPoint 集群,调用 #getClusterEndpointsFromDns()
方法,实现代码如下:
private List<AwsEndpoint> getClusterEndpointsFromDns(){ String discoveryDnsName = getDNSName(); // 获取 集群根地址 int port = Integer.parseInt(clientConfig.getEurekaServerPort()); // 端口 // cheap enough so just re-use DnsTxtRecordClusterResolver dnsResolver = new DnsTxtRecordClusterResolver( getRegion(), discoveryDnsName, true, // 解析 zone port, false, clientConfig.getEurekaServerURLContext() ); // 调用 DnsTxtRecordClusterResolver 解析 EndPoint List<AwsEndpoint> endpoints = dnsResolver.getClusterEndpoints(); if (endpoints.isEmpty()) { logger.error("Cannot resolve to any endpoints for the given dnsName: {}", discoveryDnsName); } return endpoints; } private String getDNSName(){ return "txt." + getRegion() + '.' + clientConfig.getEurekaServerDNSName(); }
eureka.shouldUseDns=true
,开启基于 DNS 获取 EndPoint 集群。 eureka.eurekaServer.domainName=${xxxxx}
,配置集群根地址。 eureka.eurekaServer.port
, eureka.eurekaServer.context
。 第 9 至 13 行 :直接 配置文件 填写实际 EndPoint 集群,调用 #getClusterEndpointsFromConfig()
方法,实现代码如下:
1: private List<AwsEndpoint> getClusterEndpointsFromConfig(){ 2: // 获得 可用区 3: String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); 4: // 获取 应用实例自己 的 可用区 5: String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); 6: // 获得 可用区与 serviceUrls 的映射 7: Map<String, List<String>> serviceUrls = EndpointUtils.getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka()); 8: // 拼装 EndPoint 集群结果 9: List<AwsEndpoint> endpoints = new ArrayList<>(); 10: for (String zone : serviceUrls.keySet()) { 11: for (String url : serviceUrls.get(zone)) { 12: try { 13: endpoints.add(new AwsEndpoint(url, getRegion(), zone)); 14: } catch (Exception ignore) { 15: logger.warn("Invalid eureka server URI: {}; removing from the server pool", url); 16: } 17: } 18: } 19: 20: // 打印日志,EndPoint 集群 21: if (logger.isDebugEnabled()) { 22: logger.debug("Config resolved to {}", endpoints); 23: } 24: // 打印日志,解析结果为空 25: if (endpoints.isEmpty()) { 26: logger.error("Cannot resolve to any endpoints from provided configuration: {}", serviceUrls); 27: } 28: 29: return endpoints; 30: }
eureka.${REGION}.availabilityZones
配置。 InstanceInfo#getZone(...)
方法,获得 应用实例自己所在的可用区 ( zone
)。非亚马逊 AWS 环境下,可用区数组的第一个元素就是 应用实例自己所在的可用区 。 第 7 行 :调用 EndpointUtils#getServiceUrlsMapFromConfig(...)
方法,获得可用区与 serviceUrls
的映射。实现代码如下:
// EndpointUtils.java 1: public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { 2: Map<String, List<String>> orderedUrls = new LinkedHashMap<>(); // key:zone;value:serviceUrls 3: // 获得 应用实例的 地区( region ) 4: String region = getRegion(clientConfig); 5: // 获得 应用实例的 可用区 6: String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); 7: if (availZones == null || availZones.length == 0) { 8: availZones = new String[1]; 9: availZones[0] = DEFAULT_ZONE; 10: } 11: logger.debug("The availability zone for the given region {} are {}", region, Arrays.toString(availZones)); 12: // 获得 开始位置 13: int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); 14: // 将 开始位置 的 serviceUrls 添加到结果 15: String zone = availZones[myZoneOffset]; 16: List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); 17: if (serviceUrls != null) { 18: orderedUrls.put(zone, serviceUrls); 19: } 20: // 从开始位置顺序遍历剩余的 serviceUrls 添加到结果 21: int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); 22: while (currentOffset != myZoneOffset) { 23: zone = availZones[currentOffset]; 24: serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); 25: if (serviceUrls != null) { 26: orderedUrls.put(zone, serviceUrls); 27: } 28: if (currentOffset == (availZones.length - 1)) { 29: currentOffset = 0; 30: } else { 31: currentOffset++; 32: } 33: } 34: 35: // 为空,报错 36: if (orderedUrls.size() < 1) { 37: throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); 38: } 39: return orderedUrls; 40: }
第 13 行 :获得 开始位置 。实现代码如下:
private static int getZoneOffset(String myZone, boolean preferSameZone, String[] availZones){ for (int i = 0; i < availZones.length; i++) { if (myZone != null && (availZones[i].equalsIgnoreCase(myZone.trim()) == preferSameZone)) { return i; } } logger.warn("DISCOVERY: Could not pick a zone based on preferred zone settings. My zone - {}," + " preferSameZone- {}. Defaulting to " + availZones[0], myZone, preferSameZone); return 0; }
preferSameZone=true
,即 eureka.preferSameZone=true
( 默认值 : true
) 时, 开始位置 为可用区数组( availZones
)的 第一个 和应用实例所在的可用区( myZone
)【 相等 】元素的位置。 preferSameZone=false
,即 eureka.preferSameZone=false
( 默认值 : true
) 时, 开始位置 为可用区数组( availZones
)的 第一个 和应用实例所在的可用区( myZone
)【 不相等 】元素的位置。 第 20 至 33 行 :从开始位置 顺序 将剩余的可用区的 serviceUrls
添加到结果。 顺序 理解如下图:
第 9 至 18 行 :拼装 EndPoint 集群结果。
com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver
,使用可用区亲和的集群解析器。 类属性 代码如下:
public class ZoneAffinityClusterResolver implements ClusterResolver<AwsEndpoint>{ private static final Logger logger = LoggerFactory.getLogger(ZoneAffinityClusterResolver.class); /** * 委托的解析器 * 目前代码里为 {@link ConfigClusterResolver} */ private final ClusterResolver<AwsEndpoint> delegate; /** * 应用实例的可用区 */ private final String myZone; /** * 是否可用区亲和 */ private final boolean zoneAffinity; public ZoneAffinityClusterResolver(ClusterResolver<AwsEndpoint> delegate, String myZone, boolean zoneAffinity){ this.delegate = delegate; this.myZone = myZone; this.zoneAffinity = zoneAffinity; } }
delegate
,委托的解析器。目前代码里使用的是 ConfigClusterResolver 。 zoneAffinity
,是否可用区亲和。
true
:EndPoint 可用区为 本地 的优先被放在前面。 false
:EndPoint 可用区 非本地 的优先被放在前面。 #getClusterEndpoints(...)
方法,实现代码如下:
1: @Override 2: public List<AwsEndpoint> getClusterEndpoints(){ 3: // 拆分成 本地的可用区和非本地的可用区的 EndPoint 集群 4: List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone); 5: List<AwsEndpoint> myZoneEndpoints = parts[0]; 6: List<AwsEndpoint> remainingEndpoints = parts[1]; 7: // 随机打乱 EndPoint 集群并进行合并 8: List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints); 9: // 非可用区亲和,将非本地的可用区的 EndPoint 集群放在前面 10: if (!zoneAffinity) { 11: Collections.reverse(randomizedList); 12: } 13: 14: if (logger.isDebugEnabled()) { 15: logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList); 16: } 17: 18: return randomizedList; 19: }
ClusterResolver#getClusterEndpoints()
方法,获得 EndPoint 集群。再调用 ResolverUtils#splitByZone(...)
方法,拆分成 本地 和 非本地 的可用区的 EndPoint 集群,点击 链接 查看实现。 第 8 行 :调用 #randomizeAndMerge(...)
方法, 分别 随机打乱 每个 EndPoint 集群,并进行 合并 数组,实现代码如下:
// ZoneAffinityClusterResolver.java private static List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints){ if (myZoneEndpoints.isEmpty()) { return ResolverUtils.randomize(remainingEndpoints); // 打乱 } if (remainingEndpoints.isEmpty()) { return ResolverUtils.randomize(myZoneEndpoints); // 打乱 } List<AwsEndpoint> mergedList = ResolverUtils.randomize(myZoneEndpoints); // 打乱 mergedList.addAll(ResolverUtils.randomize(remainingEndpoints)); // 打乱 return mergedList; } // ResolverUtils.java public static <T extends EurekaEndpoint> List<T> randomize(List<T> list){ // 数组大小为 0 或者 1 ,不进行打乱 List<T> randomList = new ArrayList<>(list); if (randomList.size() < 2) { return randomList; } // 以本地IP为随机种子,有如下好处: // 多个主机,实现对同一个 EndPoint 集群负载均衡的效果。 // 单个主机,同一个 EndPoint 集群按照固定顺序访问。Eureka-Server 不是强一致性的注册中心,Eureka-Client 对同一个 Eureka-Server 拉取注册信息,保证两者之间增量同步的一致性。 Random random = new Random(LOCAL_IPV4_ADDRESS.hashCode()); int last = randomList.size() - 1; for (int i = 0; i < last; i++) { int pos = random.nextInt(randomList.size() - i); if (pos != i) { Collections.swap(randomList, i, pos); } } return randomList; }
ResolverUtils#randomize(...)
使用以本机IP为随机种子 ,有如下好处:
第 10 至 12 行 :非可用区亲和,将非本地的可用区的 EndPoint 集群放在前面。
com.netflix.discovery.shared.resolver.AsyncResolver
, 异步执行 解析的集群解析器。AsyncResolver 属性较多,而且复杂的多,我们拆分到具体方法里分享。
AsyncResolver 内置定时任务, 定时 刷新 EndPoint 集群解析结果。
为什么要刷新?例如,Eureka-Server 的 serviceUrls
基于 DNS 配置。
/** * 是否已经调度定时任务 {@link #updateTask} */ private final AtomicBoolean scheduled = new AtomicBoolean(false); /** * 委托的解析器 * 目前代码为 {@link com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver} */ private final ClusterResolver<T> delegate; /** * 定时服务 */ private final ScheduledExecutorService executorService; /** * 线程池执行器 */ private final ThreadPoolExecutor threadPoolExecutor; /** * 后台任务 * 定时解析 EndPoint 集群 */ private final TimedSupervisorTask backgroundTask; /** * 解析 EndPoint 集群结果 */ private final AtomicReference<List<T>> resultsRef; /** * 定时解析 EndPoint 集群的频率 */ private final int refreshIntervalMs; /** * 预热超时时间,单位:毫秒 */ private final int warmUpTimeoutMs; // Metric timestamp, tracking last time when data were effectively changed. private volatile long lastLoadTimestamp = -1; AsyncResolver(String name, ClusterResolver<T> delegate, List<T> initialValue, int executorThreadPoolSize, int refreshIntervalMs, int warmUpTimeoutMs) { this.name = name; this.delegate = delegate; this.refreshIntervalMs = refreshIntervalMs; this.warmUpTimeoutMs = warmUpTimeoutMs; // 初始化 定时服务 this.executorService = Executors.newScheduledThreadPool(1, // 线程大小=1 new ThreadFactoryBuilder() .setNameFormat("AsyncResolver-" + name + "-%d") .setDaemon(true) .build()); // 初始化 线程池执行器 this.threadPoolExecutor = new ThreadPoolExecutor( 1, // 线程大小=1 executorThreadPoolSize, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), // use direct handoff new ThreadFactoryBuilder() .setNameFormat("AsyncResolver-" + name + "-executor-%d") .setDaemon(true) .build() ); // 初始化 后台任务 this.backgroundTask = new TimedSupervisorTask( this.getClass().getSimpleName(), executorService, threadPoolExecutor, refreshIntervalMs, TimeUnit.MILLISECONDS, 5, updateTask ); this.resultsRef = new AtomicReference<>(initialValue); Monitors.registerObject(name, this); }
backgroundTask
,后台任务,定时解析 EndPoint 集群。
updateTask
实现代码如下:
private final Runnable updateTask = new Runnable() { @Override public void run(){ try { List<T> newList = delegate.getClusterEndpoints(); // 调用 委托的解析器 解析 EndPoint 集群 if (newList != null) { resultsRef.getAndSet(newList); lastLoadTimestamp = System.currentTimeMillis(); } else { logger.warn("Delegate returned null list of cluster endpoints"); } logger.debug("Resolved to {}", newList); } catch (Exception e) { logger.warn("Failed to retrieve cluster endpoints from the delegate", e); } } };
delegate
,委托的解析器,目前代码为 ZoneAffinityClusterResolver。 后台任务的 发起 在 #getClusterEndpoints()
方法,在「3.6.2 解析 EndPoint 集群」详细解析。
调用 #getClusterEndpoints()
方法,解析 EndPoint 集群,实现代码如下:
1: @Override 2: public List<T> getClusterEndpoints(){ 3: long delay = refreshIntervalMs; 4: // 若未预热解析 EndPoint 集群结果,进行预热 5: if (warmedUp.compareAndSet(false, true)) { 6: if (!doWarmUp()) { 7: delay = 0; // 预热失败,取消定时任务的第一次延迟 8: } 9: } 10: // 若未调度定时任务,进行调度 11: if (scheduled.compareAndSet(false, true)) { 12: scheduleTask(delay); 13: } 14: // 返回 EndPoint 集群 15: return resultsRef.get(); 16: }
第 5 至 9 行 : 若未预热解析 EndPoint 集群结果 ,调用 #doWarmUp()
方法,进行预热。若预热失败,取消定时任务的第一次延迟。 #doWarmUp()
方法实现代码如下:
boolean doWarmUp(){ Future future = null; try { future = threadPoolExecutor.submit(updateTask); future.get(warmUpTimeoutMs, TimeUnit.MILLISECONDS); // block until done or timeout return true; } catch (Exception e) { logger.warn("Best effort warm up failed", e); } finally { if (future != null) { future.cancel(true); } } return false; }
updateTask
,解析 EndPoint 集群。 第 10 至 13 行 : 若未调度定时任务,进行调度 ,调用 #scheduleTask()
方法,实现代码如下:
void scheduleTask(long delay){ executorService.schedule(backgroundTask, delay, TimeUnit.MILLISECONDS); }
第 15 行 :返回 EndPoint 集群。 当第一次预热失败,会返回空,直到定时任务获得到结果 。
Eureka-Client 在初始化时,调用 DiscoveryClient#scheduleServerEndpointTask()
方法,初始化 AsyncResolver 解析器。实现代码如下:
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, AbstractDiscoveryClientOptionalArgs args){ // ... 省略无关代码 // 创建 EndPoint 解析器 eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver( clientConfig, transportConfig, eurekaTransport.transportClientFactory, applicationInfoManager.getInfo(), applicationsSource ); // ... 省略无关代码 }
调用 EurekaHttpClients#newBootstrapResolver(...)
方法,创建 EndPoint 解析器,实现代码如下:
1: public static final String COMPOSITE_BOOTSTRAP_STRATEGY = "composite"; 2: 3: public static ClosableResolver<AwsEndpoint> newBootstrapResolver( 4: final EurekaClientConfig clientConfig, 5: final EurekaTransportConfig transportConfig, 6: final TransportClientFactory transportClientFactory, 7: final InstanceInfo myInstanceInfo, 8: final ApplicationsResolver.ApplicationsSource applicationsSource) 9:{ 10: if (COMPOSITE_BOOTSTRAP_STRATEGY.equals(transportConfig.getBootstrapResolverStrategy())) { 11: if (clientConfig.shouldFetchRegistry()) { 12: return compositeBootstrapResolver( 13: clientConfig, 14: transportConfig, 15: transportClientFactory, 16: myInstanceInfo, 17: applicationsSource 18: ); 19: } else { 20: logger.warn("Cannot create a composite bootstrap resolver if registry fetch is disabled." + 21: " Falling back to using a default bootstrap resolver."); 22: } 23: } 24: 25: // if all else fails, return the default 26: return defaultBootstrapResolver(clientConfig, myInstanceInfo); 27: } 28: 29: /** 30: * @return a bootstrap resolver that resolves eureka server endpoints based on either DNS or static config, 31: * depending on configuration for one or the other. This resolver will warm up at the start. 32: */ 33: static ClosableResolver<AwsEndpoint> defaultBootstrapResolver(final EurekaClientConfig clientConfig, 34: final InstanceInfo myInstanceInfo){ 35: // 获得 可用区集合 36: String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); 37: // 获得 应用实例的 可用区 38: String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); 39: 40: // 创建 ZoneAffinityClusterResolver 41: ClusterResolver<AwsEndpoint> delegateResolver = new ZoneAffinityClusterResolver( 42: new ConfigClusterResolver(clientConfig, myInstanceInfo), 43: myZone, 44: true 45: ); 46: 47: // 第一次 EndPoint 解析 48: List<AwsEndpoint> initialValue = delegateResolver.getClusterEndpoints(); 49: 50: // 解析不到 Eureka-Server EndPoint ,快速失败 51: if (initialValue.isEmpty()) { 52: String msg = "Initial resolution of Eureka server endpoints failed. Check ConfigClusterResolver logs for more info"; 53: logger.error(msg); 54: failFastOnInitCheck(clientConfig, msg); 55: } 56: 57: // 创建 AsyncResolver 58: return new AsyncResolver<>( 59: EurekaClientNames.BOOTSTRAP, 60: delegateResolver, 61: initialValue, 62: 1, 63: clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000 64: ); 65: }
* 第 10 至 23 行 :组合解析器,用于 Eureka 1.x 对 Eureka 2.x 的兼容配置,暂时不需要了解。TODO[0028]写入集群和读取集群 * 第 26 行 :调用 `#defaultBootstrapResolver()` 方法,创建默认的解析器 AsyncResolver 。 * 第 40 至 45 行 :创建 ZoneAffinityClusterResolver 。在 ZoneAffinityClusterResolver 构造方法的参数,我们看到创建 ConfigClusterResolver 作为 `delegate` 参数。 * 第 48 行 :调用 `ZoneAffinityClusterResolver#getClusterEndpoints()` 方法,**第一次 Eureka-Server EndPoint 集群解析**。 * 第 51 至 55 行 :解析不到 Eureka-Server EndPoint 集群时,可以通过配置( `eureka.experimental.clientTransportFailFastOnInit=true` ),使 Eureka-Client 初始化失败。`#failFastOnInitCheck(...)` 方法,实现代码如下:
// potential future feature, guarding with experimental flag for now private static void failFastOnInitCheck(EurekaClientConfig clientConfig, String msg){ if ("true".equals(clientConfig.getExperimental("clientTransportFailFastOnInit"))) { throw new RuntimeException(msg); } }
* x
第 58 至 64 行 :创建 AsyncResolver 。从代码上,我们可以看到, AsyncResolver.resultsRef
属性一开始已经用 initialValue
传递给 AsyncResolver 构造方法。实现代码如下:
public AsyncResolver(String name, ClusterResolver<T> delegate, List<T> initialValues, int executorThreadPoolSize, int refreshIntervalMs){ this( name, delegate, initialValues, executorThreadPoolSize, refreshIntervalMs, 0 ); // 设置已经预热 warmedUp.set(true); }
T T 一开始看解析器,没反应过来是委托设计模式,一脸懵逼+一脸懵逼+一脸懵逼。后面理顺了,发现超级奈斯( Nice ) 啊 !!!!
胖友,你学会了么?
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?