不建议为了看源码而看源码,这样做无意义!其次,有时候了解工作机制即可,不一定得深入源码的每一个细节。更多的时候,看源码,只是工作需要。因此,源码分析,不会贴大量的源码,而只是给了流程 + 代码入口。看源码的工作,还是得自己去看。
EurekaServerConfig:eureka server 配置类的 抽象
CodecWrapper:定义如何编码,解码。可通过 eureka server 的 jsonCodecName 修改 CodecWrapper
CodecWrappers:CodecWrapper 工具类
CodecWrapper:编解码组件
ServerCodecs:获取 编码、解码器
PeerEurekaNodes:管理 PeerEurekaNode
PeerEurekaNode:表示一个 eureka-server
EurekaServerContext: 容器,上下文,可获取本地 Eureka 相关的类(被容器管理,可注入)
InstanceInfo:实例的抽象,即一个 client
ReplicationClientAdditionalFilters:向别的 eureka-server 发起请求时,会执行过滤。
RefreshablePeerEurekaNodes:PeerEurekaNodes 的继承类, eureka-server 注入的 PeerEurekaNodes 其实是 RefreshablePeerEurekaNodes 。 见名知意,即 可刷新的 PeerEurekaNodes 。 是否刷新逻辑,见: RefreshablePeerEurekaNodes#onApplicationEvent 。
EurekaHttpClient:发送 http 请求抽象。
ResponseCache:响应请求缓存类。缓存的类型,参考 com.netflix.eureka.registry.Key
netflix 封装了一个类似线程池的任务组件,用于提交 eureka-server 之间的操作任务。
AcceptorExecutor:任务接收器
TaskDispatcher:任务分发器
TaskDispatchers:任务分发器的工厂类
TaskProcessor:任务处理器
TaskHolder:对任务的再封装,依赖 task。
大致的交互图如下:
代码入口: ApplicationResource#addInstance
ConcurrentHashMap<String //* appName /*/, Map<String //* instanceId=ip + ":" + appName + ":" + port /*/, Lease<InstanceInfo/>>>
代码入口: InstanceResource#renewLease
流程比较简单,主要是修改 Lease 的 lastUpdateTimestamp 字段。同样的会向其他 eureka-server 节点发送续期请求。
代码入口: ApplicationsResource#getContainers
MeasuredRate renewsLastMin; // 统计每分钟的心跳包 volatile int numberOfRenewsPerMinThreshold; // 每分钟 client 应该续期的最小次数 volatile int expectedNumberOfClientsSendingRenews; //注册的 client 数量
eureka-server 在一定时间内(默认90s)没有收到 client 心跳,会剔除该实例。但是发生网络分区故障时,client 无法与 server 通信,此时不应该剔除该 client。因此有了 eureka-server 的自我保护机制。 eureka-server 在进入自我保护机制时,不会剔除 client。当网络故障恢复之后,会自动退出自我保护机制。
AbstractInstanceRegistry
MeasuredRate renewsLastMin; // 统计每分钟的心跳包 volatile int numberOfRenewsPerMinThreshold; // 每分钟 client 应该续期的最小次数 volatile int expectedNumberOfClientsSendingRenews; //注册的 client 数量
numberOfRenewsPerMinThreshold 参数的计算依赖于 expectedNumberOfClientsSendingRenews。
先看, expectedNumberOfClientsSendingRenews 相关的方法
// 初始化 expectedNumberOfClientsSendingRenews = 1 this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold();
synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } }
synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold or if self preservation is disabled. if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); } }
synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } }
每隔15分钟,调用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold 更新 expectedNumberOfClientsSendingRenews
每次更新了 expectedNumberOfClientsSendingRenews,必然会调用 updateRenewsPerMinThreshold() 方法,更新 numberOfRenewsPerMinThreshold。
protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); // this.expectedNumberOfClientsSendingRenews * (60.0 / 30 ) * 0.85 }
renewsLastmin,在 client 续期时,会调用 MeasuredRate#increment(), 将 currentBucket 值 + 1
服务端接收客户端续期请求 代码入口:AbstractInstanceRegistry#renew
public void increment() { currentBucket.incrementAndGet(); }
MeasureRate 在初始化时,会启动一个定时器,每隔 60s。便会将 currentBucket 清 0
代码入口: MeasuredRate#start
MeasureRate 初始化代码入口: AbstractInstanceRegistry 构造函数
在启动 eureka-server 时,会初始化一个定时器,每隔 60s 剔除未及时发送心跳包的 client。
代码入口: AbstractInstanceRegistry#postInit
真正执行剔除逻辑的代码入口: AbstractInstanceRegistry#evict(long additionalLeaseMs)
public void evict(long additionalLeaseMs) { // ... if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // ... }
public boolean isLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire. return true; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; }
上面介绍了自我保护机制的相关参数,当 isLeaseExpirationEnabled() 返回 false 时,会剔除过期的 client。从方法看,只要开启自我保护,一定返回 true,因此不会剔除过期的 client。