在项目中使用 Ribbon
的目的是在客户端(服务消费端)实现负载均衡。
在上一篇《 Spring Cloud OpenFeign
源码分析》中我们分析了为什么使用 OpenFeign
时,不配置 url
,且不导入 Ribbon
的依赖会报错。本篇继续分析 OpenFeign
是如何与 Ribbon
整合、 Ribbon
是如何实现负载均衡的、 Ribbon
是如何从注册中心获取服务的。
OpenFeign
与 Ribbon
整合后的接口调用流程 OpenFeign
与 Ribbon
整合实现负载均衡调用接口的流程如下:
spring-cloud-openfeign-core
模块:
LoadBalancerFeignClient
的 execute
调用远程方法; FeignLoadBalancer
的 executeWithLoadBalancer
方法实现负载均衡调用。 ribbon-core
模块:
LoadBalancerCommand
的 submit
方法实现异步调用同步阻塞等待结果。 LoadBalancerCommand
的 selectServer
方法从多个服务提供者中负载均衡选择一个调用; ribbon-loadbalancer
模块:
ILoadBalancer
的 chooseServer
方法选择服务; IRule
的 choose
方法按某种算法选择一个服务,如随机算法、轮询算法; OpenFeign
是如何与 Ribbon
整合的 sck-demo
项目项目地址: https://github.com/wujiuye/share-projects/tree/master/sck-demo
。
当我们使用 openfeign
时,如果不配置 @FeignClient
的 url
属性,那么就需要导入 spring-cloud-starter-kubernetes-ribbon
的依赖,使用 LoadBalancerFeignClient
调用接口。如果我们不需要使用 Ribbon
来实现负载均衡,那么我们可以直接将 @FeignClient
的 url
属性配置为: http://{serviceId}
,而不用添加 Ribbon
的依赖。
sck-demo
项目中添加 spring-cloud-starter-kubernetes-ribbon
依赖,非 Spring Cloud Kubernetes
项目只需添加 spring-cloud-starter-netflix-ribbon
。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-kubernetes-ribbon</artifactId> </dependency> 复制代码
当我们在项目中添加 spring-cloud-starter-kubernetes-ribbon
依赖配置时,会将 spring-cloud-starter-netflix-ribbon
和 spring-cloud-kubernetes-ribbon
都会导入到项目中,如下图所示。
当项目中使用 openfeign
并添加 spring-cloud-starter-netflix-ribbon
后, Ribbon
就能通过自动配置与 openfeign
整合,为项目注入 ILoadBalancer
的实现类实例。默认使用的是 ZoneAwareLoadBalancer
,这是 spring-cloud-netflix-ribbon
下的类。
spring-cloud-netflix-ribbon
的 META-INF
目录下的 spring.factories
文件内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=/ org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration 复制代码
可以说 spring-cloud-netflix-ribbon
是 spring-cloud-commons
的 loadbalancer
接口的实现。
RibbonAutoConfiguration
会注入一个 LoadBalancerClient
, LoadBalancerClient
是 spring-cloud-commons
定义的负载均衡接口, RibbonLoadBalancerClient
是 Ribbon
实现 spring-cloud-commons
负载均衡接口 LoadBalancerClient
的实现类,是提供给代码中使用 @LoadBalanced
注解使用的。
@Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } 复制代码
在创建 RibbonLoadBalancerClient
时调用 springClientFactory
方法创建 SpringClientFactory
:
@Bean public SpringClientFactory springClientFactory() { SpringClientFactory factory = new SpringClientFactory(); factory.setConfigurations(this.configurations); return factory; } 复制代码
SpringClientFactory
是 NamedContextFactory
的子类,其构建方法调用父类构造方法时传入了一个配置类 RibbonClientConfiguration.class
,这是 RibbonClientConfiguration
配置类生效的原因。
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> { static final String NAMESPACE = "ribbon"; public SpringClientFactory() { super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name"); } } 复制代码
SpringClientFactory
会为每个服务提供者创建一个 ApplicationContext
,实现 bean
的隔离,解决 bean
名称冲突问题,以及实现使用不同配置。
在创建 ApplicationContext
时会注册 defaultConfigType
到 bean
工厂,该 defaultConfigType
就是构造方法传递进来的 RibbonClientConfiguration.class
。
protected AnnotationConfigApplicationContext createContext(String name) { // 创建ApplicationContext AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); ...... // 注册多个Configuration类 context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType); ...... // 调用ApplicationContext的refresh方法 context.refresh(); return context; } 复制代码
那 createContext
方法是什么时候被调用的呢?
以 sck-demo
中服务消费者调用服务提供者接口为例:
@Service public class DemoInvokeServiceImpl implements DemoInvokeService { @Resource private DemoService demoService; @Override public ListGenericResponse<DemoDto> invokeDemo() { return demoService.getServices(); } } 复制代码
DemoService
是被 @FeignClient
注解声明的接口,当我们调用 DemoService
的某个方法时,经过《 Spring Cloud OpenFeign
源码分析》我们知道,最终会调用到 LoadBalancerFeignClient
的 execute
方法时。
public class LoadBalancerFeignClient implements Client { //............ private SpringClientFactory clientFactory; // 后面再分析execute方法 @Override public Response execute(Request request, Request.Options options) throws IOException{ // ..... IClientConfig requestConfig = getClientConfig(options, clientName); // ..... } } 复制代码
execute
方法中需要调用 getClientConfig
方法从 SpringClientFactory
获取 IClientConfig
实例,即获取客户端配置。 getClientConfig
方法就是要从服务提供者的 ApplicationContext
工厂中获取实现了 IClientConfig
接口的 bean
。
当首次调用某个服务提供者的接口时,由于并未初始化 AnnotationConfigApplicationContext
,因此会先调用 createContext
方法创建 ApplicationContext
,该方法将 RibbonClientConfiguration
类注册到 ApplicationContext
,最后调用 context.refresh();
时就会调用到 RibbonClientConfiguration
的被 @Bean
注释的方法。
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class }) public class RibbonClientConfiguration { // ........ @RibbonClientName private String name = "client"; @Autowired private PropertiesFactory propertiesFactory; // IClientConfig实例,配置client的连接超时、读超时等 @Bean @ConditionalOnMissingBean public IClientConfig ribbonClientConfig() { DefaultClientConfigImpl config = new DefaultClientConfigImpl(); config.loadProperties(this.name); config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT); config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT); config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD); return config; } // 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule @Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; } // 配置服务更新器,定时从注册中心拉去服务,由ILoadBalancer启动 @Bean @ConditionalOnMissingBean public ServerListUpdater ribbonServerListUpdater(IClientConfig config) { return new PollingServerListUpdater(config); } // 配置ribbon的负载均衡器,默认使用ZoneAwareLoadBalancer @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } // ......其它的暂时不去理解 } 复制代码
ILoadBalancer
是 Ribbon
定义的负载均衡接口。 ZoneAwareLoadBalancer
是 DynamicServerListLoadBalancer
的子类, DynamicServerListLoadBalancer
封装了服务更新逻辑。
DynamicServerListLoadBalancer
在构造方法中调用 enableAndInitLearnNewServersFeature
方法开启服务更新器 ServerListUpdater
, ServerListUpdater
定时从注册中心拉取可用的服务更新服务列表缓存。
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected volatile ServerListUpdater serverListUpdater; protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; public void enableAndInitLearnNewServersFeature(){ serverListUpdater.start(updateAction); } // 调用ServerList获取服务 @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); // 如果需要过滤 if (filter != null) { servers = filter.getFilteredListOfServers(servers); } } updateAllServerList(servers); } } 复制代码
Ribbon
是如何实现负载均衡的 ServerList
我们后面再讲解,先搞清楚 openfegin
与 ribbon
整合后的整个调用链路。我们继续从 LoadBalancerFeignClient
的 execute
方法继续分析。( LoadBalancerFeignClient
是由 FeignRibbonClientAutoConfiguration
自动配置类配置的,如果忘记的话可以再看下上一篇。)
public class LoadBalancerFeignClient implements Client { //............ private SpringClientFactory clientFactory; @Override public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); // 拿到的是服务的名称,如:sck-demo-prodiver String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); // delegate是:class Default implements Client {} FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); // 首先获取客户端配置 IClientConfig requestConfig = getClientConfig(options, clientName); return // 负载均衡选择一个服务提供者 lbClient(clientName) // 调用接口获取响应结果 .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } } } 复制代码
lbClient
创建一个 FeignLoadBalancer
对象,调用 FeignLoadBalancer
的 executeWithLoadBalancer
方法实现负载均衡调用接口,最终会调用到 FeignLoadBalancer
的 execute
方法。 Ribbon
使用 RxJava
实现异步调用转同步阻塞获取结果。
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { // command也封装了负载均衡的实现逻辑 LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { // 调用FeignLoadBalancer的execute方法 return Observable.just( AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig) ); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { ..... } } 复制代码
LoadBalancerCommand
的 submit
方法代码比较多,逻辑也比较复杂,因此就不展开说明了。
public Observable<T> submit(final ServerOperation<T> operation) { // Use the load balancer Observable<T> o = (server == null ? selectServer() : Observable.just(server)) } 复制代码
selectServer
方法返回一个 Observable<Server>
, Observable
是 RxJava
的 API
,我们跳过这部分。
private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { // 调用LoadBalancerContext的getServerFromLoadBalancer方法 Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); } 复制代码
selectServer
方法中调用 LoadBalancerContext
的 getServerFromLoadBalancer
方法获取一个服务提供者,此 LoadBalancerContext
实际是 FeignLoadBalancer
(在 buildLoadBalancerCommand
方法中可以找到答案)。
getServerFromLoadBalancer
方法部分代码如下:
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException { ILoadBalancer lb = getLoadBalancer(); if (host == null) { if (lb != null){ Server svc = lb.chooseServer(loadBalancerKey); return svc; } // ..... } } 复制代码
由于 Ribbon
默认使用的 ILoadBalancer
是 ZoneAwareLoadBalancer
,因此 getLoadBalancer
方法返回的是 ZoneAwareLoadBalancer
。获取到负载均衡器后调用负载均衡器的 chooseServer
选择一个服务提供者。
ZoneAwareLoadBalancer
的 chooseServer
方法:
@Override public Server chooseServer(Object key) { if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } } 复制代码
if
条件成立时,调用的是父类 BaseLoadBalancer
的 chooseServer
方法:
public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware { protected IRule rule = DEFAULT_RULE; public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { // 调用IRule的choose方法,rule是在创建ZoneAwareLoadBalancer时通过构造方法注入的 return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } } } 复制代码
IRule
是服务选择器、是负载均衡算法的实现。在 RibbonAutoConfiguration
配置类中注入。
// 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule @Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; } 复制代码
在创建 ZoneAwareLoadBalancer
时通过构造方法注入。
@Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { // ...... return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } 复制代码
至于 ZoneAvoidanceRule
是怎么从多个服务提供者中选择一个调用的,这就是负载均衡算法的实现,本篇不做分析。
Ribbon
是如何从注册中心获取服务提供者的 前面我们分析到, ZoneAwareLoadBalancer
是 DynamicServerListLoadBalancer
的子类, DynamicServerListLoadBalancer
封装了服务更新逻辑,定时调用 ServerList
的 getUpdatedListOfServers
方法从注册中心拉取服务。
ServerList
是 ribbon-loadbalancer
包下的类,并不是 spring-cloud
的接口,所以与 spring-cloud
的服务发现接口是没有关系的。
public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); /** * Return updated list of servers. This is called say every 30 secs * (configurable) by the Loadbalancer's Ping cycle * */ public List<T> getUpdatedListOfServers(); } 复制代码
在分析 RibbonClientConfiguration
时,我们发现有一个方法会注册一个 ServerList<Server>
,但这个方法必不会执行到。
public class RibbonClientConfiguration { @Bean @ConditionalOnMissingBean public ServerList<Server> ribbonServerList(IClientConfig config) { if (this.propertiesFactory.isSet(ServerList.class, name)) { return this.propertiesFactory.get(ServerList.class, config, name); } ConfigurationBasedServerList serverList = new ConfigurationBasedServerList(); serverList.initWithNiwsConfig(config); return serverList; } } 复制代码
因为我们在 sck-demo
项目中使用的是 spring-cloud-starter-kubernetes-ribbon
,所以我们现在来看下 spring-cloud-kubernetes-ribbon
负责做什么。首先从 spring-cloud-starter-kubernetes-ribbon
的 spring.factories
文件中找到自动配置类。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=/ org.springframework.cloud.kubernetes.ribbon.RibbonKubernetesAutoConfiguration 复制代码
自动配置类 RibbonKubernetesAutoConfiguration
的源码如下:
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnBean(SpringClientFactory.class) @ConditionalOnProperty(value = "spring.cloud.kubernetes.ribbon.enabled",matchIfMissing = true) @AutoConfigureAfter(RibbonAutoConfiguration.class) @RibbonClients(defaultConfiguration = KubernetesRibbonClientConfiguration.class) public class RibbonKubernetesAutoConfiguration { } 复制代码
SpringClientFactory
我们分析过了, RibbonAutoConfiguration
我们也分析过了,只剩下 KubernetesRibbonClientConfiguration
这个配置类。
KubernetesRibbonClientConfiguration
是使用 @RibbonClients
注解导入的配置类,也就是通过 ImportBeanDefinitionRegistrar
注册的。
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(KubernetesRibbonProperties.class) public class KubernetesRibbonClientConfiguration { @Bean @ConditionalOnMissingBean public ServerList<?> ribbonServerList(KubernetesClient client, IClientConfig config, KubernetesRibbonProperties properties) { KubernetesServerList serverList; if (properties.getMode() == KubernetesRibbonMode.SERVICE) { serverList = new KubernetesServicesServerList(client, properties); } else { serverList = new KubernetesEndpointsServerList(client, properties); } serverList.initWithNiwsConfig(config); return serverList; } } 复制代码
看到这我们就明白了, spring-cloud-kubernetes-ribbon
负责实现 ribbon
的服务列表接口 ServerList<Server>
。当 spring.cloud.kubernetes.ribbon.mode
配置为 SERVICE
时,使用 KubernetesServicesServerList
,否则使用 KubernetesEndpointsServerList
。默认 mode
是 POD
。
@ConfigurationProperties(prefix = "spring.cloud.kubernetes.ribbon") public class KubernetesRibbonProperties { /** * Ribbon enabled,default true. */ private Boolean enabled = true; /** * {@link KubernetesRibbonMode} setting ribbon server list with ip of pod or service * name. default value is POD. */ private KubernetesRibbonMode mode = KubernetesRibbonMode.POD; /** * cluster domain. */ private String clusterDomain = "cluster.local"; } 复制代码
KubernetesRibbonMode
是个枚举类,支持 pod
和 service
。
public enum KubernetesRibbonMode { /** * using pod ip and port. */ POD, /** * using kubernetes service name and port. */ SERVICE } 复制代码
什么意思呢? 当 mode
为 service
时,就是获取服务提供者在 kubernetes
中的 service
的名称和端口,使用这种模式会导致 Ribbon
的负载均衡失效,转而使用 kubernetes
的负载均衡。而当 mode
为 pod
时,就是获取服务提供者的 pod
的 ip
和端口,该 ip
是 kubernetes
集群的内部 ip
,只要服务消费者是部署在同一个 kubernetes
集群内就能通过 pod
的 ip
和服务提供者暴露的端口访问 pod
上的服务提供者。
如果我们不想使用 Ribbon
实现负载均衡,那么我们可以在配置文件中添加如下配置项:
spring: cloud: kubernetes: ribbon: mode: SERVICE 复制代码
你学会了吗?下一篇我们了解 Spring Cloud Kubernetes
的服务注册。