背景: 最近在研究springCloud,对服务注册中心也非常好奇,然后就看了一下源码,而且以后面试也需要了解一下,因此记录一下
注意:EurekaClient的内容很多,我只分析主干部分
前提: 这里的springboot版本为2.1.5.RELEASE,spring-cloud版本为Greenwich.SR1。
eureka架构图
从这里我们可以看到几个重要的概念:
Eureka可以向EurekaServer Register、Renew、Cancel、Get Registy,下面就分别这几个概念讨论吧。
根据springboot的自动配置特性,我们找到org.springframework.cloud.spring-cloud-netflix-eureka-client.2.1.1.RELEASE.spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar下面的META-INF/spring.factories。
最重要的就是 EurekaClientAutoConfiguration 这个配置类了
打开配置类EurekaClientAutoConfiguration截取一下重要的配置:
EurekaClientConfigBean
@Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { EurekaClientConfigBean client = new EurekaClientConfigBean(); if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) { // We don't register during bootstrap by default, but there will be another // chance later. client.setRegisterWithEureka(false); } return client;} 复制代码
这个bean是eureka的配置信息,ymal配置的前缀是 eureka.client 开始的。这个配置类中有大量的eureka client默认配置。
比如:
1.默认的eureka配置:
/** * Default Eureka URL. */public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX + "/";复制代码
也就是说,如果没有配置eureka server的url,它会默认注册到本地8761地址中。
2.默认从eureka server中拉取配置时间间隔
/** * Indicates how often(in seconds) to fetch the registry information from the eureka * server. */private int registryFetchIntervalSeconds = 30;复制代码
也就是说默认每个30秒从eureka server中拉取一次所有服务的配置信息。
3.复制实例变化信息到eureka服务器所需要的时间间隔
/** * Indicates how often(in seconds) to replicate instance changes to be replicated to * the eureka server. */private int instanceInfoReplicationIntervalSeconds = 30;复制代码
还有很多配置可以自己去了解一下。
EurekaInstanceConfigBean
@Bean@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname"); boolean preferIpAddress = Boolean .parseBoolean(getProperty("eureka.instance.prefer-ip-address")); String ipAddress = getProperty("eureka.instance.ip-address"); boolean isSecurePortEnabled = Boolean .parseBoolean(getProperty("eureka.instance.secure-port-enabled")); String serverContextPath = env.getProperty("server.servlet.context-path", "/"); int serverPort = Integer .valueOf(env.getProperty("server.port", env.getProperty("port", "8080"))); Integer managementPort = env.getProperty("management.server.port", Integer.class); // nullable. String managementContextPath = env .getProperty("management.server.servlet.context-path"); // nullable. // should // be wrapped into // optional Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class); // nullable EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); instance.setNonSecurePort(serverPort); instance.setInstanceId(getDefaultInstanceId(env)); instance.setPreferIpAddress(preferIpAddress); instance.setSecurePortEnabled(isSecurePortEnabled); if (StringUtils.hasText(ipAddress)) { instance.setIpAddress(ipAddress); } if (isSecurePortEnabled) { instance.setSecurePort(serverPort); } if (StringUtils.hasText(hostname)) { instance.setHostname(hostname); } String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path"); String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path"); if (StringUtils.hasText(statusPageUrlPath)) { instance.setStatusPageUrlPath(statusPageUrlPath); } if (StringUtils.hasText(healthCheckUrlPath)) { instance.setHealthCheckUrlPath(healthCheckUrlPath); } ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath, managementContextPath, managementPort); if (metadata != null) { instance.setStatusPageUrl(metadata.getStatusPageUrl()); instance.setHealthCheckUrl(metadata.getHealthCheckUrl()); if (instance.isSecurePortEnabled()) { instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl()); } Map<String, String> metadataMap = instance.getMetadataMap(); metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort())); } else { // without the metadata the status and health check URLs will not be set // and the status page and health check url paths will not include the // context path so set them here if (StringUtils.hasText(managementContextPath)) { instance.setHealthCheckUrlPath( managementContextPath + instance.getHealthCheckUrlPath()); instance.setStatusPageUrlPath( managementContextPath + instance.getStatusPageUrlPath()); } } setupJmxPort(instance, jmxPort); return instance;}复制代码
这个主要是eureka client 这个几点自身的一些配置信息。
接下来是两个最主要的bean : DiscoveryClient 和 EurekaServiceRegistry
@Beanpublic DiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) { return new EurekaDiscoveryClient(client, clientConfig);}@Beanpublic EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry();}复制代码
EurekaServiceRegistry 用于服务注册,DiscoveryClient 用于服务发现
先看 EurekaServiceRegistry
我们看到这个类里面有几个方法:
register(EurekaRegistration reg) : 这个方法就是用于eureka client 注册到eureka server的,不过具体的注册逻辑不在这里。Eureka client的注册动作是在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中执行的,其实最终的注册是发生在 InstanceInfoReplicator类里面的。
maybeInitializeClient(EurekaRegistration reg): 初始化 eureka client,eureka client 等会再说。
deregister(EurekaRegistration reg):注销下线操作,同样具体下线操作不在这里,这里只是将本节点实例状态设置为下线而已。
setStatus(EurekaRegistration registration, String status):设置本节点实例的状态。
getStatus(EurekaRegistration registration):获取本节点实例状态。
最后看一下DiscoveryClient,发现有两个DiscoveryClient,一个是类,一个是接口:
类是netflix提供的,接口时springcloud提供的。回到EurekaClientAutoConfiguration,它在声明DiscoveryClient这个bean的时候用的是EurekaDiscoveryClient
@Beanpublic DiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) { return new EurekaDiscoveryClient(client, clientConfig);}复制代码
找到 EurekaDiscoveryClient
发现EurekaDiscoveryClient里面包含netflix 里的EurekaClient接口,而这个接口的默认实现就是DiscoveryClient
也就是说EurekaDiscoveryClient和DiscoveryClient是组合模式,最后调用的还是netflix的类DiscoveryClient。这个类才是服务注册发现的关键,接下来分析这个类。
DiscoveryClient
先看看源码解释:
根据这么一段话,也就是说,这个类是用于和Eureka Server来交互,这个类的主要职责是:
a) <em>Registering</em> the instance with <tt>Eureka Server</tt> 服务注册
b) <em>Renewal</em>of the lease with <tt>Eureka Server</tt> 服务续约
c) <em>Cancellation</em> of the lease from <tt>Eureka Server</tt> during shutdown 服务下线
d) <em>Querying</em> the list of services/instances registered with <tt>Eureka Server</tt> 获取注册列表信息
DiscoveryClient构造器中的重要的方法 initScheduledTasks();
Fetch Registers : 获取注册列表信息
if(clientConfig.shouldFetchRegistry()){ //registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule(new TimedSupervisorTask("cacheRefresh" , scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()), registryFetchIntervalSeconds, TimeUnit.SECONDS);}复制代码
从上面的代码可以看出,eureka client 开启一个scheduler,每个一定的时间(默认是30秒,可以通过registryFetchIntervalSeconds配置)从eureka server 拉取eureka client的配置信息。进一步看到CacheRefreshThread这个runnable。
class CacheRefreshThread implements Runnable { public void run() { refreshRegistry();//主要定时执行该方法 } }复制代码
进一步分析 refreshRegistry
void refreshRegistry() { try { ... // 定时获取注册信息 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } ... } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); }}复制代码
继续分析 fetchRegistry(boolean forceFullRegistryFetch)方法,里面最主要的是getAndStoreFullRegistry();来发送HTTP请求到注册中心来获取注册信息,并缓存到本地
private void getAndStoreFullRegistry() throws Throwable { ... Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 把拉取的信息缓存在 localRegionApps 中 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); }}复制代码
而那个localRegionApps 就是用于缓存拉取信息的
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();复制代码
Renew 服务的续期
回到initScheduledTasks()方法,
if(clientConfig.shouldRegisterWithEureka()){ ... // Heartbeat timer scheduler.schedule(new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()), renewalIntervalInSecs, TimeUnit.SECONDS); ...}else{ logger.info("Not registering with Eureka server per configuration");}复制代码
进一步看 HeartbeatThread 这个Runnable
/** * The heartbeat task that renews the lease in the given intervals. */private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } }}复制代码
/** * Renew with the eureka service by making the appropriate REST call */boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); // 注册 boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; }}复制代码
以上代码是每隔一定时间去发送心跳(默认30秒),如果返回的结果是404,就把client注册到Eureka Server上。
Register 服务注册
/** * Register with the eureka service by making the appropriate REST call. */boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}复制代码
这个方法实在DiscoveryClient类中的。
在DiscoveryClient.initScheduledTasks()中,有一段这么代码
// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());复制代码
在InstanceInfoReplicator 构造方法中会创建一个scheduler。注册方法的调用是通过InstanceInfoReplicator.instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())发送注册请求到注册中心
public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); }}复制代码
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // 注册 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); }}复制代码
Cancel: 服务下线:
这个方法实在DiscoveryClient类中的。
/** * unregister w/ the eureka service. */void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); } }}复制代码
这个方法的调用是通过 shutdown() 方法的
/** * Shuts down Eureka Client. Also sends a deregistration request to the * eureka server. */@PreDestroy@Overridepublic synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); }}复制代码
这个方法上有个注解@PreDestroy,表示在对象销毁之前触发的。可以看到这个方法中关闭了定时任务,通知eureka server 下线。
以上就是我对Eureka client的理解,
总结:
1.springcloud 整合了netflix,通过组合模式, EurekaDiscoveryClient 包含 EurekaClient, 核心功能都是由 DiscoveryClient 来完成的。
2.Eureka client 主要有 Register、Renew、Cancel、Get Registy功能。
3.Eureka client和Server的交互通过大量的定时任务触发,交互通过http协议,设计核心类为 EurekaHttpClient。
4.在DiscoveryClient被创建的时候,在其构造方法中,启动了三个线程池,然后分别启动了三个定时器任务:注册当前服务到注册中心;持续发送心跳进行续约任务;定时刷新注册中心注册细信息到本地,所以可以说,在项目启动的时候这些任务就被执行了。