配置类的作用一般就是配置框架运行的基本组件,所以看懂配置类,也就入了框架的门。
当我们在启动类上加入 @EnableDiscoveryClient
或者 @EnableEurekaClient
时,就能使Eureka客户端生效。
这两个注解最终都会使,Eureka客户端对应的配置类 EurekaClientAutoConfiguration
生效。这里直接讲配置类,具体注解如何使他生效,不在此处赘述。
EurekaClientAutoConfiguration 作为EurekaClient的自动配置类,配了EurekaClient运行所需要的组件。
(1.注解上的Bean
@Configuration @EnableConfigurationProperties//启动属性映射 @ConditionalOnClass(EurekaClientConfig.class)//需要EurekaClientConfig类存在 @Import(DiscoveryClientOptionalArgsConfiguration.class)//加载可选参数配置类到容器, @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)//需要开关存在 @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)//需要eureka.client.enabled属性 //在这三个自动注入类之前解析 @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) //在这三个自动注入类之后解析 @AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"}) public class EurekaClientAutoConfiguration { } 复制代码
可以看出,注解大部分都是在做EurekaClientAutoConfiguration 配置类生效条件的判断。
其中@AutoConfigureAfter对应的三个配置类需要讲下: RefreshAutoConfiguration:与刷新作用域有关的配置类, EurekaDiscoveryClientConfiguration: * 向容器中注入开关EurekaDiscoveryClientConfiguration.Marker * 创建RefreshScopeRefreshedEvent事件监听器, * 当eureka.client.healthcheck.enabled=true时,注入EurekaHealthCheckHandler用于健康检查 AutoServiceRegistrationAutoConfiguration:关于服务自动注册的相关配置。 复制代码
注解上没有引入重要组件。
(2.类内部的Bean
(3. 内部类的Bean EurekaClientAutoConfiguration 内部有两个关于EurekaClient的配置类,
不管是哪种EurekaClient都会注册三个组件:
(4. 分类总结:
EurekaClient 可以看做是客户端的上下文。他的初始化,卸载方法包括了客户端的整个生命周期
上文讲到,此时EurekaClient注册的是CloudEurekaClient。
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { //留下扩展点,可以通过参数配置各种处理器 if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } //赋值applicationInfoManager属性 this.applicationInfoManager = applicationInfoManager; //applicationInfoManager在初始化时, //new InstanceInfoFactory().create(config);创建了当前实例信息 InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } //备用注册处理器 this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); //本地区域应用缓存初始化,Applications存储Application 列表 localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } //开始初始化默认区域 logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); // 如果既不要向eureka server注册,又不要获取服务列表,就什么都不用初始化 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()){ .... return; } try { //默认创建2个线程的调度池,用于TimedSupervisorTask任务。 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); //心跳线程池,2个线程 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //缓存刷新线程池,2个线程 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //初始化与EurekaServer真正通信的通信器。 eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); //配置区域映射器。可配置DNS映射。 AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } //创建实例区域检查器。 instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 如果需要从eureka server获取服务列表,并且尝试fetchRegistry(false)失败, //调用BackupRegistry if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } //回调扩展点处理器。此处理器,在注册前处理。 if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // 初始化所有定时任务 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } //将client,clientConfig放到DiscoveryManager 统一管理,以便其他地方可以DI依赖注入。 DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); } 复制代码
Eureka初始化还是比较复杂,我们找重点说说。
在DiscoveryClient初始化时,初始化EurekaTransport。
eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); 复制代码
EurekaTransport 是客户端与服务端底层通信器。 有5个重要的属性:
scheduleServerEndpointTask方法完成EurekaTransport5个属性的初始化
transportClientFactory 属于低层次的http工厂。 EurekaHttpClientFactory 属于高层次的http工厂。 通过具有不同功能的EurekaHttpClientFactory 工厂 对transportClientFactory 进行层层装饰。生产的http工具也具有不同层次的功能。 列如: 最外层的SessionedEurekaHttpClient 具有会话功能的EurekaHttpClient 第二次RetryableEurekaHttpClient 具有重试功能的EurekaHttpClient
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { //获取本地缓存 Applications applications = getApplications(); //如果增量拉取被禁用或是第一次拉取,全量拉取server端已经注册的服务实例信息 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //获取全部实例 getAndStoreFullRegistry(); } else { //增量拉取服务实例 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { return false; } finally { if (tracer != null) { tracer.stop(); } } // 刷新本地缓存 onCacheRefreshed(); // 基于缓存中的实例数据更新远程实例状态, (发布StatusChangeEvent) updateInstanceRemoteStatus(); // 注册表拉取成功后返回true return true; } 复制代码
全量获取最终调用
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); 复制代码
增量获取
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); 复制代码
可以看出,客户端与服务端通信底层是EurekaTransport在提供支持。
在此之前有必要说说:TimedSupervisorTask。 TimedSupervisorTask 是自动调节间隔的周期性任务,当不超时,将以初始化的间隔执行。当任务超时时,将下一个周期的间隔调大。每次超时都会增大相应倍数,直到外部设置的最大参数。一旦新任务不再超时,间隔自动恢复默认值。
也就是说,这是一个具有自适应的周期性任务。(非常棒的设计啊)
private void initScheduledTasks() { //1.如果获取服务列表,则创建周期性缓存更新(即获取服务列表任务)任务 if (clientConfig.shouldFetchRegistry()) { //初始间隔时间(默认30秒) int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); //最大倍数 默认10倍 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //执行TimedSupervisorTask ,监督CacheRefreshThread任务的执行。 //具体执行线程池cacheRefreshExecutor,具体任务CacheRefreshThread scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()//缓存刷新,调用fetchRegistry()获取服务列表 ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } //2. 如何注册,就创建周期性续租任务,维持心跳。 if (clientConfig.shouldRegisterWithEureka()) { //心跳间隔,默认30秒。 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); //最大倍数 默认10倍 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //执行TimedSupervisorTask ,监督HeartbeatThread任务的执行。 //具体执行线程池heartbeatExecutor,具体任务HeartbeatThread scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); //3.创建应用实例信息复制器。 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //4.创建状态改变监听器,监听StatusChangeEvent statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } //状态有变化,使用信息复制器,执行一个任务,更新状态变化到注册中心 instanceInfoReplicator.onDemandUpdate(); } }; //是否关注状态变化,将监听器添加到applicationInfoManager if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 启动InstanceInfo复制器 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } 复制代码
总结下initScheduledTasks()的工作: 如何配置获取服务
cacheRefresh
,监督服务获取 CacheRefreshThread(DiscoveryClient-CacheRefreshExecutor-%d)
线程的执行,默认每30秒执行一次。获取服务列表更新到本地缓存。任务内容 refreshRegistry
,本地缓存 localRegionApps
如何配置注册,
"heartbeat",
监督续约任务 HeartbeatThread(DiscoveryClient-HeartbeatExecutor-%d)
的执行,默认每30秒执行一次。任务内容为 renew()
onDemandUpdate()
方法,更新变化到远程server (DiscoveryClient-InstanceInfoReplicator-%d)
,定时(默认40秒)检测当前实例的DataCenterInfo、LeaseInfo、InstanceStatus,如果有变更,执行 InstanceInfoReplicator.this.run()
方法将变更信息同步到server 下面我们看看这几个重要的任务内容:
eurekaTransport.registrationClient.sendHeartBeat
向server发送当前实例信息 public void run() { try { //1.刷新DataCenterInfo //2.刷新LeaseInfo 租约信息 //3.从HealthCheckHandler中获取InstanceStatus discoveryClient.refreshInstanceInfo(); //如果isInstanceInfoDirty=true表明需要更新,返回dirtyTimestamp。 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register();//注册。 instanceInfo.unsetIsDirty(dirtyTimestamp);//注册完成,设置没有更新。 } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } 复制代码
可以看出实例注册就在 discoveryClient.register()
。
那么第一次注册发生在什么时候呢?
initScheduledTasks方法中,执行instanceInfoReplicator.start时,会首先调用instanceInfo.setIsDirty(),初始化是否更新标志位为ture ,开启线程,40秒后发起第一次注册。(当然如果在这40秒内,如果有状态变化,会立即发起注册。)
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { //发起注册 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; } 复制代码
可以看出,register方法本质也是通过eurekaTransport 来发起与server的通信的。
注解@PreDestroy修饰的shutdown()会在Servlet被彻底卸载之前执行。
public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); //移除监听 if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } //取消任务 cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); //下线 unregister(); } //通信中断 if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } } 复制代码
EurekaAutoServiceRegistration实现了SmartLifecycle,会在spring启动完毕后,调用其start()方法。
@Override public void start() { // 设置端口 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } if (!this.running.get() && this.registration.getNonSecurePort() > 0) { //注册 this.serviceRegistry.register(this.registration); //发布注册成功事件 this.context.publishEvent( new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true);// } } 复制代码
this.serviceRegistry.register(this.registration);
@Override public void register(EurekaRegistration reg) { maybeInitializeClient(reg); // 更改状态,会触发监听器的执行 reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); } 复制代码
EurekaAutoServiceRegistration.start()
-->EurekaServiceRegistry.register(EurekaRegistration)
-->ApplicationInfoManager.setInstanceStatus 状态改变,
-->StatusChangeListener 监听器监听到状态改变
-->InstanceInfoReplicator.onDemandUpdate() 更新状态到server
-->InstanceInfoReplicator.run()
-->DiscoveryClient.register()
-->eurekaTransport.registrationClient.register(instanceInfo);
-->jerseyClient
由于篇幅原因,很多细节不能一一展示。本文志在说说一些原理,具体细节可以研读源码,会发现框架真优秀啊。