摘要: 原创出处 http://www.iocoder.cn/Eureka/transport/ 「芋道源码」欢迎转载,保留摘要,谢谢!
关注 微信公众号:【芋道源码】 有福利:
本文主要分享 Eureka 的网络通信部分 。在不考虑 Eureka 2.x 的兼容的情况下,Eureka 1.x 主要两部分的网络通信:
本文涉及类在 com.netflix.discovery.shared.transport
包下,涉及到主体类的类图如下(打开大图 ):
类图看起来很复杂,整体调用关系如下(打开大图 ):
OK ,我们逐层解析,嗨起来。
com.netflix.discovery.shared.transport.jersey.EurekaJerseyClient
,EurekaHttpClient 接口 。接口代码如下:
public interface EurekaJerseyClient{ ApacheHttpClient4 getClient(); void destroyResources(); }
com.sun.jersey.client.apache4.ApacheHttpClient4
,基于 Apache HttpClient4 实现的 Jersey Client 。 com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl
,EurekaHttpClient 实现类 。实现代码如下:
public class EurekaJerseyClientImpl implements EurekaJerseyClient{ /** * 基于 Apache HttpClient4 实现的 Jersey Client */ private final ApacheHttpClient4 apacheHttpClient; /** * Apache HttpClient 空闲连接清理器 */ private final ApacheHttpClientConnectionCleaner apacheHttpClientConnectionCleaner; /** * Jersey Client 配置 */ ClientConfig jerseyClientConfig; public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout, ClientConfig clientConfig){ try { jerseyClientConfig = clientConfig; // 创建 ApacheHttpClient apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig); // 设置 连接参数 HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams(); HttpConnectionParams.setConnectionTimeout(params, connectionTimeout); HttpConnectionParams.setSoTimeout(params, readTimeout); // 创建 ApacheHttpClientConnectionCleaner this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout); } catch (Throwable e) { throw new RuntimeException("Cannot create Jersey client", e); } } @Override public ApacheHttpClient4 getClient(){ return apacheHttpClient; } @Override public void destroyResources(){ apacheHttpClientConnectionCleaner.shutdown(); apacheHttpClient.destroy(); } }
com.netflix.discovery.shared.transport.jersey.ApacheHttpClientConnectionCleaner
,Apache HttpClient 空闲连接清理器,负责 周期性 关闭处于 half-close
状态的空闲连接。点击 链接 查看带中文注释的 ApacheHttpClientConnectionCleaner。推荐阅读: 《HttpClient容易忽视的细节——连接关闭》 。 EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类 ,用于创建 EurekaJerseyClientImpl 。
调用 #build()
方法,创建 EurekaJerseyClientImpl ,实现代码如下:
// EurekaJerseyClientBuilder.java public EurekaJerseyClient build(){ MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config(); try { return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config); } catch (Throwable e) { throw new RuntimeException("Cannot create Jersey client ", e); } }
com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config
,实现 自定义配置 。点击 链接 查看带中文注释的 MyDefaultApacheHttpClient4Config。例如 :
com.netflix.discovery.provider.DiscoveryJerseyProvider
。 com.netflix.discovery.shared.transport.jersey.SSLSocketFactoryAdapter
将 Apache HttpClient 4.3.4 对 SSL 功能的增强适配到老版本 API 。点击 链接 查看带中文注释的 SSLSocketFactoryAdapter。 com.netflix.discovery.shared.transport.EurekaHttpClient
,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法 。点击 链接 查看带中文注释的 EurekaHttpClient。
com.netflix.discovery.shared.transport.EurekaHttpResponse
,请求响应对象,实现代码如下:
public class EurekaHttpResponse<T>{ /** * 返回状态码 */ private final int statusCode; /** * 返回对象( Entity ) */ private final T entity; /** * 返回 header */ private final Map<String, String> headers; /** * 重定向地址 */ private final URI location; // ... 省略 setting / getting 和 Builder }
com.netflix.discovery.shared.transport.TransportClientFactory
,创建 EurekaHttpClient 的工厂 接口 。接口代码如下:
public interface TransportClientFactory{ /** * 创建 EurekaHttpClient * * @param serviceUrl Eureka-Server 地址 * @return EurekaHttpClient */ EurekaHttpClient newClient(EurekaEndpoint serviceUrl); /** * 关闭工厂 */ void shutdown(); }
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient
,实现 EurekaHttpClient 的 抽象类 , 真正 实现了具体的 Eureka-Server API 调用方法。实现代码如下:
1: public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient{ 2: 3: private static final Logger logger = LoggerFactory.getLogger(AbstractJerseyEurekaHttpClient.class); 4: 5: /** 6: * Jersey Client 7: */ 8: protected final Client jerseyClient; 9: /** 10: * 请求的 Eureka-Server 地址 11: */ 12: protected final String serviceUrl; 13: 14: protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl){ 15: this.jerseyClient = jerseyClient; 16: this.serviceUrl = serviceUrl; 17: logger.debug("Created client for url: {}", serviceUrl); 18: } 19: 20: @Override 21: public EurekaHttpResponse<Void> register(InstanceInfo info){ 22: // 设置 请求地址 23: String urlPath = "apps/" + info.getAppName(); 24: ClientResponse response = null; 25: try { 26: Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); 27: // 设置 请求头 28: addExtraHeaders(resourceBuilder); 29: // 请求 Eureka-Server 30: response = resourceBuilder 31: .header("Accept-Encoding", "gzip") // GZIP 32: .type(MediaType.APPLICATION_JSON_TYPE) // 请求参数格式 JSON 33: .accept(MediaType.APPLICATION_JSON) // 响应结果格式 JSON 34: .post(ClientResponse.class, info); // 请求参数 35: // 创建 EurekaHttpResponse 36: return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); 37: } finally { 38: if (logger.isDebugEnabled()) { 39: logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), 40: response == null ? "N/A" : response.getStatus()); 41: } 42: if (response != null) { 43: response.close(); 44: } 45: } 46: }
jerseyClient
属性,Jersey Client ,使用上文的 EurekaHttpClient#getClient(...)
方法,获取 ApacheHttpClient4 。 serviceUrl
属性,请求的 Eureka-Server 地址。 #register()
方法,实现向 Eureka-Server 注册应用实例。 其他方法代码类似 。
第 28 行 :调用 #addExtraHeaders(...)
方法,设置请求头( header )。该方法是 抽象方法 ,提供子类实现自定义的请求头。代码如下:
protected abstract void addExtraHeaders(Builder webResource);
com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient
,实现 Eureka-Client 请求 Eureka-Server 的网络通信。点击 链接 查看带中文注释的 JerseyApplicationClient。
com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory
,创建 JerseyApplicationClient 的 工厂类 。实现代码如下:
public class JerseyEurekaHttpClientFactory implements TransportClientFactory{ private final EurekaJerseyClient jerseyClient; private final ApacheHttpClient4 apacheClient; private final ApacheHttpClientConnectionCleaner cleaner; private final Map<String, String> additionalHeaders; public JerseyEurekaHttpClientFactory(ApacheHttpClient4 apacheClient, long connectionIdleTimeout, Map<String, String> additionalHeaders){ this(null, apacheClient, connectionIdleTimeout, additionalHeaders); } private JerseyEurekaHttpClientFactory(EurekaJerseyClient jerseyClient, ApacheHttpClient4 apacheClient, long connectionIdleTimeout, Map<String, String> additionalHeaders){ this.jerseyClient = jerseyClient; this.apacheClient = jerseyClient != null ? jerseyClient.getClient() : apacheClient; this.additionalHeaders = additionalHeaders; this.cleaner = new ApacheHttpClientConnectionCleaner(this.apacheClient, connectionIdleTimeout); } @Override public EurekaHttpClient newClient(EurekaEndpoint endpoint){ return new JerseyApplicationClient(apacheClient, endpoint.getServiceUrl(), additionalHeaders); } @Override public void shutdown(){ cleaner.shutdown(); if (jerseyClient != null) { jerseyClient.destroyResources(); } else { apacheClient.destroy(); } } }
JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类 ,用于创建 JerseyEurekaHttpClientFactory 。点击 链接 查看带中文注释的 JerseyEurekaHttpClientFactory。
调用 JerseyEurekaHttpClientFactory#create(...)
方法,创建 JerseyEurekaHttpClientFactory ,实现代码如下:
public static JerseyEurekaHttpClientFactory create(EurekaClientConfig clientConfig, Collection<ClientFilter> additionalFilters, InstanceInfo myInstanceInfo, AbstractEurekaIdentity clientIdentity){ JerseyEurekaHttpClientFactoryBuilder clientBuilder = newBuilder() .withAdditionalFilters(additionalFilters) // 客户端附加过滤器 .withMyInstanceInfo(myInstanceInfo) // 应用实例 .withUserAgent("Java-EurekaClient") // UA .withClientConfig(clientConfig) .withClientIdentity(clientIdentity); // 设置 Client Name if ("true".equals(System.getProperty("com.netflix.eureka.shouldSSLConnectionsUseSystemSocketFactory"))) { clientBuilder.withClientName("DiscoveryClient-HTTPClient-System").withSystemSSLConfiguration(); } else if (clientConfig.getProxyHost() != null && clientConfig.getProxyPort() != null) { clientBuilder.withClientName("Proxy-DiscoveryClient-HTTPClient") .withProxy( clientConfig.getProxyHost(), Integer.parseInt(clientConfig.getProxyPort()), clientConfig.getProxyUserName(), clientConfig.getProxyPassword() ); // http proxy } else { clientBuilder.withClientName("DiscoveryClient-HTTPClient"); } return clientBuilder.build(); } public static JerseyEurekaHttpClientFactoryBuilder newBuilder(){ return new JerseyEurekaHttpClientFactoryBuilder().withExperimental(false); }
com.netflix.eureka.transport.JerseyReplicationClient
,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。
实现 AbstractJerseyEurekaHttpClient#addExtraHeaders()
方法,添加自定义头 x-netflix-discovery-replication=true
,代码如下:
@Override protected void addExtraHeaders(Builder webResource){ webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true"); }
重写了 #sendHeartBeat(...)
方法,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
com.netflix.eureka.cluster.HttpReplicationClient
接口,实现了 #submitBatchUpdates(...)
方法,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。 JerseyReplicationClient 没有专属的工厂 。
调用 JerseyReplicationClient#createReplicationClient(...)
静态方法 ,创建 JerseyReplicationClient 。点击 链接 查看带中文注释的方法代码。
com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator
,EurekaHttpClient 委托者 抽象类 。实现代码如下:
public abstract class EurekaHttpClientDecorator implements EurekaHttpClient{ /** * 执行请求 * * @param requestExecutor 请求执行器 * @param <R> 请求泛型 * @return 响应 */ protected abstract <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor); @Override public EurekaHttpResponse<Void> register(final InstanceInfo info){ return execute(new RequestExecutor<Void>() { @Override public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate){ return delegate.register(info); } @Override public RequestType getRequestType(){ return RequestType.Register; } }); } }
#execute(...)
抽象方法 ,子类实现该方法,实现自己的特性。 #register()
方法,实现向 Eureka-Server 注册应用实例。 其他方法代码类似 。
#execute(...)
方法,并将原有的注册实现通过 RequestExecutor 传递进去。 #execute(...)
方法,可以调用 RequestExecutor#execute(...)
方法,继续执行原有逻辑。 RequestType ,请求类型 枚举类 。代码如下:
// EurekaHttpClientDecorator.java public enum RequestType { Register, Cancel, SendHeartBeat, StatusUpdate, DeleteStatusOverride, GetApplications, GetDelta, GetVip, GetSecureVip, GetApplication, GetInstance, GetApplicationInstance }
RequestExecutor ,请求执行器 接口 。接口代码如下:
// EurekaHttpClientDecorator.java public interface RequestExecutor<R>{ /** * 执行请求 * * @param delegate 委托的 EurekaHttpClient * @return 响应 */ EurekaHttpResponse<R> execute(EurekaHttpClient delegate); /** * @return 请求类型 */ RequestType getRequestType(); }
EurekaHttpClientDecorator 的 每个实现类实现一个特性 ,代码非常非常非常清晰。
FROM 《委托模式》
委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。
我们在上图的基础上, 增加委托的关系 ,如下图(打开大图 ):
com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient
,监控指标收集 EurekaHttpClient ,配合 Netflix Servo 实现监控信息采集。
#execute()
方法,代码如下:
1: @Override 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor){ 3: // 获得 请求类型 的 请求指标 4: EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType()); 5: Stopwatch stopwatch = requestMetrics.latencyTimer.start(); 6: try { 7: // 执行请求 8: EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate); 9: // 增加 请求指标 10: requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); 11: return httpResponse; 12: } catch (Exception e) { 13: requestMetrics.connectionErrors.increment(); 14: exceptionsMetric.count(e); 15: throw e; 16: } finally { 17: stopwatch.stop(); 18: } 19: }
RequestExecutor#execute(...)
方法,继续执行请求。
delegate
属性,对应 JerseyApplicationClient 。 com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient
, 寻找非 302 重定向 的 Eureka-Server 的 EurekaHttpClient 。
#execute()
方法,代码如下:
1: @Override 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor){ 3: EurekaHttpClient currentEurekaClient = delegateRef.get(); 4: if (currentEurekaClient == null) { // 未找到非 302 的 Eureka-Server 5: AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint)); 6: try { 7: EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef); 8: // 关闭原有的委托 EurekaHttpClient ,并设置当前成功非 302 请求的 EurekaHttpClient 9: TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get())); 10: return response; 11: } catch (Exception e) { 12: logger.error("Request execution error", e); 13: TransportUtils.shutdown(currentEurekaClientRef.get()); 14: throw e; 15: } 16: } else { // 已经找到非 302 的 Eureka-Server 17: try { 18: return requestExecutor.execute(currentEurekaClient); 19: } catch (Exception e) { 20: logger.error("Request execution error", e); 21: delegateRef.compareAndSet(currentEurekaClient, null); 22: currentEurekaClient.shutdown(); 23: throw e; 24: } 25: } 26: }
serviceUrls
执行请求,寻找非 302 状态码返回的 Eureka-Server。
serviceEndpoint
( 相当于 serviceUrls
) 创建 委托 EurekaHttpClient 。 #executeOnNewServer(...)
方法,通过执行请求的方式,寻找非 302 状态码返回的 Eureka-Server。实现代码,点击 链接 查看带中文注释的代码实现。 delegateRef
( 因为此处可能存在并发,多个线程都找到非 302 状态码返回的 Eureka-Server ),并设置当前成功非 302 请求的 EurekaHttpClient 到 delegateRef
。 currentEurekaClientRef
,当请求发生异常或者超过最大重定向次数。 currentEurekaClient
,后面要重新非返回 302 状态码的 Eureka-Server 。 RedirectingEurekaHttpClient 提供 #createFactory(...)
静态方法 获得创建其的工厂,点击 链接 查看。
com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient
,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。
#execute()
方法,代码如下:
1: @Override 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor){ 3: List<EurekaEndpoint> candidateHosts = null; 4: int endpointIdx = 0; 5: for (int retry = 0; retry < numberOfRetries; retry++) { 6: EurekaHttpClient currentHttpClient = delegate.get(); 7: EurekaEndpoint currentEndpoint = null; 8: 9: // 当前委托的 EurekaHttpClient 不存在 10: if (currentHttpClient == null) { 11: // 获得候选的 Eureka-Server 地址数组 12: if (candidateHosts == null) { 13: candidateHosts = getHostCandidates(); 14: if (candidateHosts.isEmpty()) { 15: throw new TransportException("There is no known eureka server; cluster server list is empty"); 16: } 17: } 18: 19: // 超过候选的 Eureka-Server 地址数组上限 20: if (endpointIdx >= candidateHosts.size()) { 21: throw new TransportException("Cannot execute request on any known server"); 22: } 23: 24: // 创建候选的 EurekaHttpClient 25: currentEndpoint = candidateHosts.get(endpointIdx++); 26: currentHttpClient = clientFactory.newClient(currentEndpoint); 27: } 28: 29: try { 30: // 执行请求 31: EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient); 32: // 判断是否为可接受的相应,若是,返回。 33: if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) { 34: delegate.set(currentHttpClient); 35: if (retry > 0) { 36: logger.info("Request execution succeeded on retry #{}", retry); 37: } 38: return response; 39: } 40: logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode()); 41: } catch (Exception e) { 42: logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace 43: } 44: 45: // 请求失败,若是 currentHttpClient ,清除 delegate 46: // Connection error or 5xx from the server that must be retried on another server 47: delegate.compareAndSet(currentHttpClient, null); 48: 49: // 请求失败,将 currentEndpoint 添加到隔离集合 50: if (currentEndpoint != null) { 51: quarantineSet.add(currentEndpoint); 52: } 53: } 54: throw new TransportException("Retry limit reached; giving up on completing the request"); 55: }
currentHttpClient
不存在,意味着原有 delegate
不存在向 Eureka-Server 成功请求的 EurekaHttpClient 。
delegate
,直接使用它进行执行请求。 第 11 至 17 行 :调用 #getHostCandidates()
方法,获得候选的 Eureka-Server serviceUrls
数组。实现代码如下:
1: private List<EurekaEndpoint> getHostCandidates(){ 2: // 获得候选的 Eureka-Server 地址数组 3: List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints(); 4: 5: // 保留交集(移除 quarantineSet 不在 candidateHosts 的元素) 6: quarantineSet.retainAll(candidateHosts); 7: 8: // 在保证最小可用的候选的 Eureka-Server 地址数组,移除在隔离集合内的元素 9: // If enough hosts are bad, we have no choice but start over again 10: int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage()); // 0.66 11: if (quarantineSet.isEmpty()) { 12: // no-op 13: } else if (quarantineSet.size() >= threshold) { 14: logger.debug("Clearing quarantined list of size {}", quarantineSet.size()); 15: quarantineSet.clear(); 16: } else { 17: List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size()); 18: for (EurekaEndpoint endpoint : candidateHosts) { 19: if (!quarantineSet.contains(endpoint)) { 20: remainingHosts.add(endpoint); 21: } 22: } 23: candidateHosts = remainingHosts; 24: } 25: 26: return candidateHosts; 27: }
ClusterResolver#getClusterEndpoints()
方法,获得候选的 Eureka-Server 地址数组( candidateHosts
)。 注意 :该方法返回的 Eureka-Server 地址数组,使用以本机 IP 为 随机种子 ,达到不同 IP 的应用实例获得的数组顺序不同,而相同 IP 的应用实例获得的数组顺序一致, 效果类似基于 IP HASH 的负载均衡算法 。实现该功能的代码,在 《Eureka 源码解析 —— EndPoint 与 解析器》搜索关键字【ResolverUtils#randomize(…)】 详细解析。 Set#retainAll()
方法,移除隔离的故障 Eureka-Server 地址数组( quarantineSet
) 中不在 candidateHosts
的元素。 candidateHosts
,移除在 quarantineSet
的元素。
eureka.retryableClientQuarantineRefreshPercentage
来设置百分比,默认值: 0.66
。 quarantineSet
数量超过阀值,清空 quarantineSet
,全部 candidateHosts
重试。 quarantineSet
数量未超过阀值,移除 candidateHosts
中在 quarantineSet
的元素。 第 19 至 22 行 :超过 candidateHosts
上限,全部 Eureka-Server 请求失败,抛出异常。
第 33 行 :调用 ServerStatusEvaluator#accept()
方法,判断响应状态码和请求类型是否能够接受。实现代码如下:
// ServerStatusEvaluators.java private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() { @Override public boolean accept(int statusCode, RequestType requestType){ if (statusCode >= 200 && statusCode < 300 || statusCode == 302) { return true; } else if (requestType == RequestType.Register && statusCode == 404) { // 注册,404 可接受 return true; } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) { // 心跳,404 可接受 return true; } else if (requestType == RequestType.Cancel) { // cancel is best effort 下线,接受全部 return true; } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) { // 增量获取注册信息,403 404 可接受 return true; } return false; } };
第 34 行 :请求成功,设置 delegate
。下次请求,优先使用 delegate
,失败才进行候选的 Eureka-Server 地址数组重试。
delegate
若等于 currentHttpClient
,进行清除。 quarantineSet
。 RetryableEurekaHttpClient 提供 #createFactory(...)
静态方法 获得创建其的工厂,点击 链接 查看。
com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient
,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。
#execute(...)
,代码如下:
1: @Override 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor){ 3: long now = System.currentTimeMillis(); 4: long delay = now - lastReconnectTimeStamp; 5: 6: // 超过 当前会话时间,关闭当前委托的 EurekaHttpClient 。 7: if (delay >= currentSessionDurationMs) { 8: logger.debug("Ending a session and starting anew"); 9: lastReconnectTimeStamp = now; 10: currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs); 11: TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null)); 12: } 13: 14: // 获得委托的 EurekaHttpClient 。若不存在,则创建新的委托的 EurekaHttpClient 。 15: EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get(); 16: if (eurekaHttpClient == null) { 17: eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient()); 18: } 19: return requestExecutor.execute(eurekaHttpClient); 20: }
第 7 至 12 行 :超过当前会话时间,关闭当前委托的 EurekaHttpClient 。
第 10 行 :调用 #randomizeSessionDuration(...)
方法,计算计算下一次会话超时时长,公式为 sessionDurationMs * (0.5, 1.5)
,代码如下:
protected long randomizeSessionDuration(long sessionDurationMs){ long delta = (long) (sessionDurationMs * (random.nextDouble() - 0.5)); return sessionDurationMs + delta; }
第 15 至 18 行 :获得委托的 EurekaHttpClient 。若不存在,创建新的委托的 EurekaHttpClient 。 TransportUtils#getOrSetAnotherClient(...)
方法代码如下:
1: public static EurekaHttpClient getOrSetAnotherClient(AtomicReference<EurekaHttpClient> eurekaHttpClientRef, EurekaHttpClient another){ 2: EurekaHttpClient existing = eurekaHttpClientRef.get(); 3: // 为空才设置 4: if (eurekaHttpClientRef.compareAndSet(null, another)) { 5: return another; 6: } 7: // 设置失败,意味着另外一个线程已经设置 8: another.shutdown(); 9: return existing; 10: }
eurekaHttpClientRef
里的 EurekaHttpClient 。若获取不到,将 another
设置到 eurekaHttpClientRef
。当有多个线程设置时,有且只有一个线程设置成功,另外的设置失败的线程们,意味着当前 eurekaHttpClientRef
有 EurekaHttpClient ,返回 eurekaHttpClientRef
。 目前该方法存在 BUG ,失败的线程直接返回 existing
的是 null
,需要修改成 return eurekaHttpClientRef.get()
。模拟重现该 BUG 代码如下 :
第 19 行 :执行请求。
在 SessionedEurekaHttpClient 类里,没有实现创建其的工厂。在 「6. 创建网络通讯客户端」搜索 canonicalClientFactory
,可以看到 EurekaHttpClients#canonicalClientFactory(...)
方法,内部有 SessionedEurekaHttpClient 的创建工厂。
对于 Eureka-Server 来说,调用 JerseyReplicationClient#createReplicationClient(...)
静态方法 即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。
对于 Eureka-Client 来说,分成用于 注册应用实例( registrationClient
) 和 查询注册信息( newQueryClient
) 的 两个不同 网络通信客户端。在 DiscoveryClient 初始化时进行创建,代码如下:
// DiscoveryClient.class 1: private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, 2: AbstractDiscoveryClientOptionalArgs args){ 3: 4: Collection<?> additionalFilters = args == null 5: ? Collections.emptyList() 6: : args.additionalFilters; 7: 8: EurekaJerseyClient providedJerseyClient = args == null 9: ? null 10: : args.eurekaJerseyClient; 11: 12: TransportClientFactories argsTransportClientFactories = null; 13: if (args != null && args.getTransportClientFactories() != null) { 14: argsTransportClientFactories = args.getTransportClientFactories(); 15: } 16: 17: // Ignore the raw types warnings since the client filter interface changed between jersey 1/2 18: @SuppressWarnings("rawtypes") 19: TransportClientFactories transportClientFactories = argsTransportClientFactories == null 20: ? new Jersey1TransportClientFactories() 21: : argsTransportClientFactories; 22: 23: // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity 24: // noinspection unchecked 25: eurekaTransport.transportClientFactory = providedJerseyClient == null 26: ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo()) 27: : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient); 28: 29: // (省略代码)初始化 应用解析器的应用实例数据源 TODO[0028]写入集群和读取集群 30: 31: // (省略代码)创建 EndPoint 解析器 32: eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(...) 33: 34: if (clientConfig.shouldRegisterWithEureka()) { 35: EurekaHttpClientFactory newRegistrationClientFactory = null; 36: EurekaHttpClient newRegistrationClient = null; 37: try { 38: newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( 39: eurekaTransport.bootstrapResolver, 40: eurekaTransport.transportClientFactory, 41: transportConfig 42: ); 43: newRegistrationClient = newRegistrationClientFactory.newClient(); 44: } catch (Exception e) { 45: logger.warn("Transport initialization failure", e); 46: } 47: eurekaTransport.registrationClientFactory = newRegistrationClientFactory; 48: eurekaTransport.registrationClient = newRegistrationClient; 49: } 50: 51: // new method (resolve from primary servers for read) 52: // Configure new transport layer (candidate for injecting in the future) 53: if (clientConfig.shouldFetchRegistry()) { 54: EurekaHttpClientFactory newQueryClientFactory = null; 55: EurekaHttpClient newQueryClient = null; 56: try { 57: newQueryClientFactory = EurekaHttpClients.queryClientFactory( 58: eurekaTransport.bootstrapResolver, 59: eurekaTransport.transportClientFactory, 60: clientConfig, 61: transportConfig, 62: applicationInfoManager.getInfo(), 63: applicationsSource 64: ); 65: newQueryClient = newQueryClientFactory.newClient(); 66: } catch (Exception e) { 67: logger.warn("Transport initialization failure", e); 68: } 69: eurekaTransport.queryClientFactory = newQueryClientFactory; 70: eurekaTransport.queryClient = newQueryClient; 71: } 72: }
第 18 至 27 行 :调用 Jersey1TransportClientFactories#newTransportClientFactory(...)
方法,创建 registrationClient
和 queryClient
公用的委托的 EurekaHttpClientFactory ,代码如下:
// Jersey1TransportClientFactories.java public TransportClientFactory newTransportClientFactory(final EurekaClientConfig clientConfig, final Collection<ClientFilter> additionalFilters, final InstanceInfo myInstanceInfo){ // JerseyEurekaHttpClientFactory final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create( clientConfig, additionalFilters, myInstanceInfo, new EurekaClientIdentity(myInstanceInfo.getIPAddr()) ); // TransportClientFactory final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory); // 委托 TransportClientFactory return new TransportClientFactory() { @Override public EurekaHttpClient newClient(EurekaEndpoint serviceUrl){ return metricsFactory.newClient(serviceUrl); } @Override public void shutdown(){ metricsFactory.shutdown(); jerseyFactory.shutdown(); } }; }
第 34 至 49 行 :调用 EurekaHttpClients#registrationClientFactory(...)
方法,创建 registrationClient
的 EurekaHttpClientFactory ,代码如下 :
// EurekaHttpClients.java public static EurekaHttpClientFactory registrationClientFactory(ClusterResolver bootstrapResolver, TransportClientFactory transportClientFactory, EurekaTransportConfig transportConfig){ return canonicalClientFactory(EurekaClientNames.REGISTRATION, transportConfig, bootstrapResolver, transportClientFactory); } static EurekaHttpClientFactory canonicalClientFactory(final String name, final EurekaTransportConfig transportConfig, final ClusterResolver<EurekaEndpoint> clusterResolver, final TransportClientFactory transportClientFactory){ return new EurekaHttpClientFactory() { // SessionedEurekaHttpClientFactory @Override public EurekaHttpClient newClient(){ return new SessionedEurekaHttpClient( name, RetryableEurekaHttpClient.createFactory( // RetryableEurekaHttpClient name, transportConfig, clusterResolver, RedirectingEurekaHttpClient.createFactory(transportClientFactory), // RedirectingEurekaHttpClient ServerStatusEvaluators.legacyEvaluator()), transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000 ); } @Override public void shutdown(){ wrapClosable(clusterResolver).shutdown(); } }; }
第 51 至 71 行 :调用 EurekaHttpClients#queryClientFactory(...)
方法,创建 queryClient
的 EurekaHttpClientFactory ,代码如下 :
// EurekaHttpClients.java public static EurekaHttpClientFactory queryClientFactory(ClusterResolver bootstrapResolver, TransportClientFactory transportClientFactory, EurekaClientConfig clientConfig, EurekaTransportConfig transportConfig, InstanceInfo myInstanceInfo, ApplicationsResolver.ApplicationsSource applicationsSource){ ClosableResolver queryResolver = transportConfig.useBootstrapResolverForQuery() ? wrapClosable(bootstrapResolver) : queryClientResolver(bootstrapResolver, transportClientFactory, clientConfig, transportConfig, myInstanceInfo, applicationsSource); return canonicalClientFactory(EurekaClientNames.QUERY, transportConfig, queryResolver, transportClientFactory); // 该方法上面有 }
这次真的是彩蛋,我们将整体调用关系调整如下如下(打开大图 ):
胖友,你学会了么?
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?