Spring Cloud Netflix 作为springcloud 我们常用的一个项目,其子项目Eureka,zuul,Rebbion是我熟悉的。但是Spring Cloud Netflix 被宣布进入了维护模式, 意思不再添加新特性了,这对于我们来说很不友好了。 大家纷纷寻找相应的替代工具。(具体可以网上搜索)
但这不影响我们学习一些组件的框架思想。我对注册发现,负载均衡这块比较感兴趣。所以在此记录下自己的阅读心得。
版本说明:Finchley.SR1
当我们在springboot的启动类上加上 @EnableEurekaServer
,一个基本的注册中心就可以生效了。
@SpringBootApplication @EnableEurekaServer public class EurekaServerApplication { public static void main(String[] args) { SpringApplication.run(EurekaServerApplication.class, args); } } 复制代码
@EnableEurekaServer
仅仅是引入 EurekaServerMarkerConfiguration
类。 Marker的英文意思是标记的意思,spring相关框架中有很多类似 xxxMarkerxxx
这样的注解.其实他们的意思就是一个 开关 。会在其他地方进行开关的判断,有对应 xxxMarkerxxx
类就表示打开,没有表示关闭。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { } 复制代码
EurekaServerMarkerConfiguration
开关打开的是哪个类呢??
org.springframework.cloud.netflix.eureka.server
项目spring.factories资源文件中自动注入类 EurekaServerAutoConfiguration
,此类在自动注入的过程中,会判断开关是否打开来决定是否自动注入相关类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=/ org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration 复制代码
@Configuration @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { ..... } 复制代码
由此看出 EurekaServerMarkerConfiguration
开关打开的 EurekaServerAutoConfiguration
。
下面我们看看 EurekaServerAutoConfiguration
配置了什么东西。 (1.先看注解上相关配置
@Configuration @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { ... } 复制代码
EurekaServerInitializerConfiguration
类,此类继承了 SmartLifecycle
接口,所以会在spring启动完毕时回调此类的start()方法 /eureka/server.properties
的配置属性。 (2.再看类内部相关配置(代码比较长,这里只讲内容,建议打开源码看) 寻找类中的Bean
@ConfigurationProperties(“eureka.server”)
映射我们的配置文件中的 eureka.server.xxxx
格式的配置信息(此类很重要啊,我们想修改EurekaServer的配置信息,可以配置 eureka.server.xxxx
覆盖此类中的默认配置) (ServerCodecs)CloudServerCodecs
InstanceRegistry
( 注意PeerAwareInstanceRegistry实现了AbstractInstanceRegistry,这里准确的说是 对等节点+当前节点同步器 ) DefaultEurekaServerContext
/eureka
开头的请求都交给Jersey 框架去解析。容器是 com.sun.jersey.spi.container.servlet.ServletContainer
com.netflix.discovery","com.netflix.eureka"
包路径下的接口。通常我们再springmvc中通过Controller概念来表示接口,Jersey框架下用ApplicationResource的概念来表示接口。暴露的接口其实就是eureka各个应用通信的接口。(下面再说这些接口) EurekaServerAutoConfiguration
基本上就做了这些工作。我们来归类总结下
针对当前Eureka实例的相关组件:
/eureka
路径的相关接口,注册拦截 /eureka
的拦截器,注册 com.sun.jersey.spi.container.servlet.ServletContainer
容器来处理对应的请求 两个针对集群下相关组件:
两个针对启动相关类:
至此:我们也可以大致了解了一个EurekaServer大致长什么样子了。
EurekaServerContext作为上下文,应该是核心所在。上文讲过注册 DefaultEurekaServerContext
。此类中有 @Inject,@PostConstruct, @PreDestroy
注解的方法,重点来看看。
@Inject public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.registry = registry; this.peerEurekaNodes = peerEurekaNodes; this.applicationInfoManager = applicationInfoManager; } 复制代码
@Inject
注解的方法,参数由IOC容器注入。 serverConfig ,serverCodecs ,registry ,peerEurekaNodes
我们已经认识了。ApplicationInfoManager 是用来管理应用信息的,也就是实例注册信息,由ApplicationInfoManager统一管理。
@PostConstruct
修饰的方法会在服务器加载Servle的时候运行,并且只会被服务器执行一次,被 @PostConstruct
修饰的方法会在构造函数之后,init()方法之前运行.
@PostConstruct @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); } 复制代码
这个方法很简明,主要有两个重要的的点:
PeerEurekaNodes: 用于管理PeerEurekaNode节点集合。 peerEurekaNodes.start();
public void start() { //创建一个单线程定时任务线程池:线程的名称叫做Eureka-PeerNodesUpdater taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 解析Eureka Server URL,并更新PeerEurekaNodes列表 updatePeerEurekaNodes(resolvePeerUrls()); //创建任务 //任务内容为:解析Eureka Server URL,并更新PeerEurekaNodes列表 Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; //交给线程池执行,执行间隔10min taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } } 复制代码
解析配置的对等体URL。就是在配置文件中配置的多个Eureka注册中心的URL.
protected void updatePeerEurekaNodes(List<String> newPeerUrls) { //计算需要移除的url= 原来-新配置。 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); //计算需要增加的url= 新配置-原来的。 Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); //没有变化就不更新 if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); // 删除需要移除url对应的节点。 if (!toShutdown.isEmpty()) { int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // 添加需要增加的url对应的节点 if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } //更新节点列表 this.peerEurekaNodes = newNodeList; //更新节点url列表 this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); } 复制代码
总结:start()方法,其实就是完成新配置的eureka集群信息的初始化更新工作。
对等节点同步器的初始化。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { //统计最近X秒内的来自对等节点复制的续约数量(默认1秒) this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; //初始化返回结果缓存 initializedResponseCache(); //更新续约阀值 scheduleRenewalThresholdUpdateTask(); //初始化远程区域注册 相关信息 initRemoteRegionRegistry(); ... } 复制代码
启动一个定时任务,任务名称为 Eureka-MeasureRateTimer
,每1秒统计从对等节点复制的续约数,将当前的桶的统计数据放到lastBucket,当前桶置为0
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1); -- this.timer = new Timer("Eureka-MeasureRateTimer", true); --- timer.schedule(new TimerTask() { @Override public void run() { try { // Zero out the current bucket. lastBucket.set(currentBucket.getAndSet(0)); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate", e); } } }, sampleInterval, sampleInterval); 复制代码
注意:此统计器用于节点之间复制的统计。
精辟,缓存来实现返回结果的缓存,优秀设计啊。
使用goole cache初始化一个缓存类 ResponseCacheImpl
,缓存( all applications, delta changes and for individual applications
)请求的结果, 此类中有两个缓存:
readWriteCacheMap
: 读写缓存。初始化容量1000,失效时间3分钟。 readOnlyCacheMap
:只读缓存, shouldUseReadOnlyResponseCache
属性控制是否启用,默认是启用的。此缓存会使用,名为 Eureka-CacheFillTimer
的timer,每30s更新从 readWriteCacheMap
中更新 readOnlyCacheMap
中的缓存值。 取值逻辑: 先从 readOnlyCacheMap
取值,没有去 readWriteCacheMap
,没有去通过CacheLoader加载,而CacheLoader会到维护应用实例注册信息的Map中获取。
这里就产生了一个疑问,为啥有搞个二级缓存来缓存结果呢?不是很理解。
使用名为 ReplicaAwareInstanceRegistry - RenewalThresholdUpdater
的timer,每15(900s)分钟执行 updateRenewalThreshold()
任务,更新续约阀值。
private void updateRenewalThreshold() { try { Applications apps = eurekaClient.getApplications(); int count = 0; //统计所有注册instance个数 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { 当总数》预期值时 或者 关闭了自我保护模式,更新 if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } } 复制代码
expectedNumberOfRenewsPerMin
每分钟最大的续约数量 (30s/次,2次/s): =客户端数量count*2 numberOfRenewsPerMinThreshold
每分钟续约阈值。 serverConfig.getRenewalPercentThreshold()*expectedNumberOfRenewsPerMin
serverConfig.getRenewalPercentThreshold()默认是0.85 当每分钟续约数小于 numberOfRenewsPerMinThreshold
阈值时,并且自我保护没有关闭的情况下,开启自我保护,此期间不剔除任何一个客户端。(下面的 EvictionTask()
驱逐任务会讲到如何利用)
此四个地方都会更新两个值
初始化 远程区域注册 相关信息
@PreDestroy
修饰的方法会在服务器加载Servle的时候运行,并且只会被服务器执行一次,被 @PreDestroy
修饰的方法会 Destroy
方法之后执行。.
public void shutdown() { registry.shutdown(); peerEurekaNodes.shutdown(); } 复制代码
停掉init()时启动的定时任务
清空集群url缓存,集群节点缓存。
总结: EurekaServerContext
的初始化做了很多事情,很精辟,建议多阅读,多学习
EurekaServerInitializerConfiguration
实现了 SmartLifecycle
接口,在spring启动后,执行start()方法
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); //发布注册中心可以注册事件 publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); //状态为运行状态 EurekaServerInitializerConfiguration.this.running = true; //发布注册中心启动完成事件 publish(new EurekaServerStartedEvent(getEurekaServerConfig())); 复制代码
这里重点看先 EurekaServerBootstrap.contextInitialized
EurekaServerBootstrap
的contextInitialized主要干了两件事
initEurekaEnvironment();初始化环境 initEurekaServerContext();初始化上下文 复制代码
主要是数据中心等环境变量的初始化
此方法中最重要的是
从相邻eureka节点拷贝注册列表信息 int registryCount = this.registry.syncUp(); 允许开始与客户端的数据传输,即开始作为Server服务 this.registry.openForTraffic(this.applicationInfoManager, registryCount); 复制代码
@Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; //重试次数 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } //从eurekaClient获取服务列表 Applications apps = eurekaClient.getApplications(); //遍历注册 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; } 复制代码
允许开始与客户端的数据传输,即开始作为Server服务
InstanceRegistry.openForTraffic public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { super.openForTraffic(applicationInfoManager, count == 0 ? this.defaultOpenForTrafficCount : count); } PeerAwareInstanceRegistryImpl.openForTraffic public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { //每分钟期待的续约数(默认30s续约,60s就是2次) this.expectedNumberOfRenewsPerMin = count * 2; // 每分钟续约的阀值:0.85 * expectedNumberOfRenewsPerMin this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); .... logger.info("Changing status to UP"); //applicationInfoManager设置状态为UP applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); } 复制代码
protected void postInit() { //又启动了一个续约数统计器,此统计器用于配合驱逐任务 renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); } 复制代码
创建一个名为 Eureka-EvictionTimer
的定时器来执行EvictionTask()任务。 EvictionTask()任务:
@Override public void run() { // 获取延迟秒数,就是延迟几秒下线时间。 long compensationTimeMs = getCompensationTimeMs(); //驱逐操作 evict(compensationTimeMs); } 复制代码
evict()驱逐操作:清理过期租约
public void evict(long additionalLeaseMs) { // 判断是否开启自我保护,自我保护期间不剔除任何任务 if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); //循环获得 所有过期的租约 for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); // 判断是否过期 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // 计算 最大允许清理租约数量 int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; // 计算 清理租约数量 int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { Random random = new Random(System.currentTimeMillis()); // 遍历清理。 for (int i = 0; i < toEvict; i++) { int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); internalCancel(appName, id, false); } } } 复制代码
isLeaseExpirationEnabled():判断是否开启自我保护的两个条件
Lease.isExpire():是否过期的判断:
public boolean isExpired(long additionalLeaseMs) { return ( //或者明确实例下线时间。 evictionTimestamp > 0 //或者距离最后更新时间已经过去至少3分钟 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } 复制代码
//续约时更新lastUpdateTimestamp,加上了过期间隔? public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } 复制代码
过期时间判断: System.currentTimeMillis()> lastUpdateTimestamp + duration + additionalLeaseMs 这里加了两次duration, 也就是180秒,加上延迟下线时间。也就是最少需要3分钟才判断下线。
至此Eureka server的初始化就完成了。 这里通过debug模式来看看初始化过程中的定时任务。
Eureka Server 启动后,就是对外提供服务了。等待客户端来注册。
Eureka是一个基于REST(Representational State Transfer)服务,我们从官方文档中可以看到其对外提供的接口: 官方文档
可以推测,客户端注册时也是调用了这些接口来进行与服务端的通信的。
上文说过,Eureka 使用jersey框架来做MVC框架,暴露接口。 ApplicationResource
类似springmvc中的Controller。
在 com.netflix.eureka.resources
包下我们可以看到这些 ApplicationResource
ApplicationResource.addInstance
对应的就是服务注册接口
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { .... //使用PeerAwareInstanceRegistryImpl#register() 注册实例信息。 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible } 复制代码
InstanceRegistry @Override public void register(final InstanceInfo info, final boolean isReplication) { //发布注册事件, handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication); super.register(info, isReplication); } PeerAwareInstanceRegistryImpl @Override public void register(final InstanceInfo info, final boolean isReplication) { //租期90s int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //注册实例 super.register(info, leaseDuration, isReplication); //复制到其他节点。 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } 复制代码
AbstractInstanceRegistry public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { read.lock()读锁 1.从缓存中获取实例名称对应的租约信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); 2.统计数+1 REGISTER.increment(isReplication); //gmap为null.则创建一个Map。 3.租约的处理分两种情况: 租约已经存在: 比较新租约与旧租约的LastDirtyTimestamp,使用LastDirtyTimestamp最晚的租约 租约不存在,即新注册: synchronized (lock) { 更新期待每分钟续约数 更新续约阈值 } 将租约放入appname对应的map中。 4.在最近注册队(recentRegisteredQueue)里添加一个当前注册信息 5.状态的处理: 将当前实例的OverriddenStatus状态,放到Eureka Server的overriddenInstanceStatusMap; 根据OverriddenStatus状态,设置状态 7.实例actionType=ADDED registrant.setActionType(ActionType.ADDED); 8. 维护recentlyChangedQueue,保存最近操作 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); 9.更新最后更新时间 registrant.setLastUpdatedTimestamp(); 10.使当前实例的结果缓存ResponseCache失效() invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); } 复制代码
此处可以看源码阅读,在此不讲了
我们获取的实例信息,其实都是从缓存中获取的 String payLoad = responseCache.get(cacheKey);
@GET public Response getApplication(@PathParam("version") String version, @HeaderParam("Accept") final String acceptHeader, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) { if (!registry.shouldAllowAccess(false)) { return Response.status(Status.FORBIDDEN).build(); } EurekaMonitors.GET_APPLICATION.increment(); CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; if (acceptHeader == null || !acceptHeader.contains("json")) { keyType = Key.KeyType.XML; } Key cacheKey = new Key( Key.EntityType.Application, appName, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept) ); String payLoad = responseCache.get(cacheKey); if (payLoad != null) { logger.debug("Found: {}", appName); return Response.ok(payLoad).build(); } else { logger.debug("Not Found: {}", appName); return Response.status(Status.NOT_FOUND).build(); } } 复制代码