在《 OpenFeign
与 Ribbon
源码分析总结》这篇文章中,我们只是简单地了解 Ribbon
的重试机制的实现原理,本篇我们再对 Ribbon
的重试机制地实现做详细分析,从源码分析找出我们想要地答案,即如何配置 Ribbon
实现调用每个服务的接口使用不一样的重试策略,如配置失败重试多少次,以及自定义重试策略 RetryHandler
。
Ribbon Ribbon RetryHandler
LoadBalancerFeignClient
: OpenFeign
整合 Ribbon
时使用的 Client
( OpenFeign
使用 Client
发送请求); FeignLoadBalancer
: OpenFeign
整合 Ribbon
的桥接器,由 LoadBalancerFeignClient
创建; LoadBalancerCommand
: Ribbon
将请求转为 RxJava API
调用的实现,由 FeignLoadBalancer
调用; CachingSpringLoadBalancerFactory
: OpenFeign
整合 Ribbon
用于创建 FeignLoadBalancer
桥接器的带缓存功能的 FeignLoadBalancer
工厂。 RibbonLoadBalancerClient
: Ribbon
提供的实现 Spring Cloud
负载均衡接口( LoadBalancerClient
)的类; RibbonAutoConfiguration
: Ribbon
的自动配置类,注册 RibbonLoadBalancerClient
到 Spring
容器。 SpringClientFactory
: Ribbon
自己管理一群 ApplicationContext
, Ribbon
会为每个 Client
创建一个 ApplicationContext
; RibbonClientConfiguration
: Ribbon
为每个 Client
提供 ApplicationContext
实现环境隔离,这是 Ribbon
为每个 Client
创建 ApplicationContext
时都使用的配置类,用于注册 Ribbon
的各种功能组件,如负载均衡器 ILoadBalancer
; RequestSpecificRetryHandler
: RetryHandler
接口的实现类, OpenFeign
整合 Ribbon
使用的默认失败重试策略处理器; Ribbon
重试机制地实现源码分析 Ribbon
的重试机制使用了 RxJava
的 API
,而重试次数以及是否重试的决策由 RetryHandler
实现。 Ribbon
提供两个 RetryHandler
的实现类,如下图所示。
现在我们要找出 Ribbon
使用的是哪个 RetryHandler
,我们只分析 OpenFeign
与 Ribbon
整合的使用, Spring Cloud
的 @LoadBalanced
注解方式使用我们不做分析。
spring-cloud-netflix-ribbon
的 spring.factories
文件导入的自动配置类是 RibbonAutoConfiguration
,该配置类向 Spring
容器注入了一个 RibbonLoadBalancerClient
, RibbonLoadBalancerClient
正是 Ribbon
为 Spring Cloud
的负载均衡接口提供的实现类。
在创建 RibbonLoadBalancerClient
时给构造方法传入了一个 SpringClientFactory
,源码如下。
@Configuration public class RibbonAutoConfiguration{ // 创建RibbonLoadBalancerClient @Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } } 复制代码
SpringClientFactory
是 Ribbon
使用的 ApplicationContext
, Ribbon
会为每个 Client
都创建一个 AnnotationConfigApplicationContext
,用作环境隔离。
SpringClientFactory
在调用父类构造方法时传入了一个配置类: RibbonClientConfiguration
,源码如下。
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>{ public SpringClientFactory() { super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name"); } } 复制代码
RibbonClientConfiguration
配置类在每个 Client
对应的 AnnotationConfigApplicationContext
初始化时生效,在第一次调用服务的接口时 AnnotationConfigApplicationContext
才被创建。创建 ApplicationContext
并且调用 register
方法注册 RibbonClientConfiguration
配置类以及其它一些配置类,最后调用其 refresh
方法初始化该 ApplicationContext
。
RibbonClientConfiguration
负责为每个 Client
对应的 ApplicationContext
注入服务列表 ServerList<Server>
、服务列表更新器 ServerListUpdater
、负载均衡器 ILoadBalancer
、负载均衡算法 IRule
、客户端配置 IClientConfig
、重试决策处理器 RetryHandler
等。
ServerList<Server>
:从注册中心获取可用服务提供者节点; ServerListUpdater
:定时更新本地缓存的服务列表,调用 ServerList
从注册中心获取; IRule
:实现各种负载均衡算法,如随机、轮询等; ILoadBalancer
:调用负载均衡算法 IRule
选择一个服务提供者节点调用; RetryHandler
:决定本次失败是否重试; 由于 RibbonClientConfiguration
注册的 Bean
是注册在 Client
隔离的 ApplicationContext
中的, 所以调用每个服务提供者的接口将可以使用不同的客户端配置( IClientConfig
)、重试决策处理器( RetryHandler
)等。这也是我们能够为 Ribbon
配置调用每个服务的接口使用不一样的重试策略的前提条件,不过这也不是充分必要条件。
RibbonClientConfiguration
配置类会注册一个重试决策处理器 RetryHandler
,但这个 RetryHandler
并未被使用,也可能是别的地方使用。
@Configuration public class RibbonClientConfiguration{ // 未使用 @Bean @ConditionalOnMissingBean public RetryHandler retryHandler(IClientConfig config) { return new DefaultLoadBalancerRetryHandler(config); } } 复制代码
OpenFeign
整合 Ribbon
时,真正使用的 RetryHandler
是 RequestSpecificRetryHandler
。前面我们分析 OpenFeign
整合 Ribbon
源码时提到一个启到桥接作用的类: FeignLoadBalancer
。
当 OpenFeign
整合 Ribbon
使用时, OpenFeigin
使用的 Client
是 LoadBalancerFeignClient
,由 LoadBalancerFeignClient
创建 FeignLoadBalancer
,并调用 FeignLoadBalancer
的 executeWithLoadBalancer
方法实现负载均衡调用。
executeWithLoadBalancer
方法实际是 FeignLoadBalancer
的父类 AbstractLoadBalancerAwareClient
提供的方法,其源码如下(有删减)。
public abstract class AbstractLoadBalancerAwareClient{ public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit({....}) .toBlocking() .single(); } catch (Exception e) { } } } 复制代码
executeWithLoadBalancer
方法中会创建一个 LoadBalancerCommand
,然后调用 LoadBalancerCommand
的 submit
方法提交请求, submit
方法源码如下(有删减):
public Observable<T> submit(final ServerOperation<T> operation) { // ....... // 获取重试次数 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer Observable<T> o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(Server server) { //....... // 相同节点的重试 if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); // 不同节点的重试 if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(...); } 复制代码
submit
方法中调用 retryHandler
的 getMaxRetriesOnSameServer
方法和 getMaxRetriesOnNextServer
方法分别获取配置 maxRetrysSame
、 maxRetrysNext
。 maxRetrysSame
表示调用相同节点的重试次数,默认为 0
; maxRetrysNext
表示调用不同节点的重试次数,默认为 1
。
retryPolicy
方法返回的是一个包装 RetryHandler
重试决策者的 RxJava API
的对象,最终由该 RetryHandler
决定是否需要重试,如抛出的异常是否允许重试。而是否达到最大重试次数则是在 retryPolicy
返回的 Func2
中完成,这是 RxJava
的 API
, retryPolicy
方法的源码如下。
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) { return new Func2<Integer, Throwable, Boolean>() { @Override public Boolean call(Integer tryCount, Throwable e) { if (e instanceof AbortExecutionException) { return false; } // 大于最大重试次数 if (tryCount > maxRetrys) { return false; } if (e.getCause() != null && e instanceof RuntimeException) { e = e.getCause(); } // 调用RetryHandler判断是否重试 return retryHandler.isRetriableException(e, same); } }; } 复制代码
那么这个 retryHandler
是怎么来的呢?
FeignLoadBalancer
的 executeWithLoadBalancer
方法中调用 buildLoadBalancerCommand
方法构造 LoadBalancerCommand
对象时创建的, buildLoadBalancerCommand
方法源码如下。
protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) { // 获取RetryHandler RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config); // 使用Builder构造者模式构造LoadBalancerCommand LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder() .withLoadBalancerContext(this) // 传入RetryHandler .withRetryHandler(handler) .withLoadBalancerURI(request.getUri()); return builder.build(); } 复制代码
从源码中可以看出, Ribbon
使用的 RetryHandler
是 RequestSpecificRetryHandler
。这里还用到了 Builder
构造者模式。
FeignLoadBalancer
的 getRequestSpecificRetryHandler
方法源码如下:
@Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler( RibbonRequest request, IClientConfig requestConfig) { //..... if (!request.toRequest().httpMethod().name().equals("GET")) { // 调用this.getRetryHandler()方法获取一次RetryHandler return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig); } else { // 调用this.getRetryHandler()方法获取一次RetryHandler return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig); } } 复制代码
RequestSpecificRetryHandler
的构造方法可以传入一个 RetryHandler
,这有点像类加载器 ClassLoader
实现的双亲委派模型。比如当 RequestSpecificRetryHandler
配置的重试次数为 0
时,则会获取父 RetryHandler
配置的重试次数。
this.getRetryHandler
方法获取到的又是哪个 RetryHandler
?(源码在 FeignLoadBalancer
的祖父类 LoadBalancerContext
中)
[FeignLoadBalancer的父类的父类LoadBalancerContext] public class LoadBalancerContext{ protected RetryHandler defaultRetryHandler = new DefaultLoadBalancerRetryHandler(); public final RetryHandler getRetryHandler() { return defaultRetryHandler; } } [FeignLoadBalancer] public class FeignLoadBalancer extends AbstractLoadBalancerAwareClient{ public FeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig, ServerIntrospector serverIntrospector) { super(lb, clientConfig); // 使用DefaultLoadBalancerRetryHandler this.setRetryHandler(RetryHandler.DEFAULT); this.clientConfig = clientConfig; // IClientConfig,RibbonClientConfiguration配置类注入的 this.ribbon = RibbonProperties.from(clientConfig); RibbonProperties ribbon = this.ribbon; // 从IClientConfig中读取超时参数配置 this.connectTimeout = ribbon.getConnectTimeout(); this.readTimeout = ribbon.getReadTimeout(); this.serverIntrospector = serverIntrospector; } } 复制代码
从 FeignLoadBalancer
的构造方法中可以看出, RequestSpecificRetryHandler
的父 RetryHandler
是 DefaultLoadBalancerRetryHandler
。
RetryHandler
接口的定义如下图所示。
RetryHandler
接口方法说明:
isRetriableException方法
:该异常是否可重试; isCircuitTrippingException
方法:是否是 Circuit
熔断类型异常; getMaxRetriesOnSameServer
方法:调用同一节点的最大重试次数; getMaxRetriesOnNextServer
方法:调用不同节点的最大重试次数; Ribbon
的重试策略配置 FeignLoadBalancer
在创建 RequestSpecificRetryHandler
时传入了 IClientConfig
,这个 IClientConfig
是从哪里创建的我们稍会再分析。 RequestSpecificRetryHandler
在构造方法中从这个 IClientConfig
中获取调用同服务节点的最大重试次数和调用不同服务节点的最大重试次数,源码如下。
public class RequestSpecificRetryHandler implements RetryHandler { public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) { // ..... // 从 IClientConfig中获取两种最大重试次数的配置 if (requestConfig != null) { if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) { // 获取同节点调用最大重试次数 this.retrySameServer = (Integer)requestConfig.get(CommonClientConfigKey.MaxAutoRetries); } if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) { // 获取不同节点调用最大重试次数 this.retryNextServer = (Integer)requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer); } } } } 复制代码
requestConfig
是在 LoadBalancerFeignClient
创建 FeignLoadBalancer
时,从 SpringClientFactory
中获取的,也正是 RibbonClientConfiguration
自动配置类注入的。
public FeignLoadBalancer create(String clientName) { FeignLoadBalancer client = this.cache.get(clientName); if (client != null) { return client; } // this.factory就是SpringClientFactory IClientConfig config = this.factory.getClientConfig(clientName); ILoadBalancer lb = this.factory.getLoadBalancer(clientName); ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,ServerIntrospector.class); // 创建FeignLoadBalancer client = this.loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,this.loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector); // 缓存FeignLoadBalancer this.cache.put(clientName, client); return client; } 复制代码
IClientConfig
是在 RibbonClientConfiguration
中配置的,其源码如下:
public class RibbonClientConfiguration { // 默认连接超时 public static final int DEFAULT_CONNECT_TIMEOUT = 1000; // 默认读超时 public static final int DEFAULT_READ_TIMEOUT = 1000; // 自动注入,${ribbon.client.name} @RibbonClientName private String name; // 注册IClientConfig实例,使用DefaultClientConfigImpl @Bean @ConditionalOnMissingBean public IClientConfig ribbonClientConfig() { DefaultClientConfigImpl config = new DefaultClientConfigImpl(); config.loadProperties(this.name); // 配置连接超时 config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT); // 配置读超时 config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT); config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD); return config; } } 复制代码
那么我们要怎么修改配置呢?
如何在 application
配置文件中配置 Ribbon
的重试次数等参数。
我们可以在 RibbonClientConfiguration
这个配置类的 ribbonClientConfig
方法下断点调试,如下图所示。
从图中可以看出,配置参数 key
的格式为:
<服务提供者的名称(serverId)>:<ribbon>:<参数名>=<value> 复制代码
假设我们针对服务提供者 sck-demo-provider
配置最大同节点重试次数为 10
,配置最大不同节点重试次数为 12
,配置连接超时为 15
秒,那么我们需要在 application-[环境].yaml
配置文件中添加如下配置。
sck-demo-provider: ribbon: MaxAutoRetries: 10 MaxAutoRetriesNextServer: 12 ConnectTimeout: 15000 复制代码
其中 MaxAutoRetries
和 MaxAutoRetriesNextServer
都能生效,但是 ConnectTimeout
配置是不生效的,原因是在 RibbonClientConfiguration
中创建 DefaultClientConfigImpl
时,先调用 loadProperties
方法(传入的 name
参数就是服务名称)从配置文件获取配置,再调用 set
方法覆盖了三个配置:连接超时配置、读超时配置、是否开启 gzip
压缩配置。所以这种方式配置连接超是不生效的。
代码配置就是我们手动注册 IClientConfig
,而不使用 RibbonClientConfiguration
自动注册的。 RibbonClientConfiguration
自动注册 IClientConfig
的方法上添加了 @ConditionalOnMissingBean
条件注解,正因为如此,我们才可以自己注册 IClientConfig
。
但要注意一点, RibbonClientConfiguration
是在 Ribbon
为每个 Client
创建的 ApplicationContext
中生效的,所以我们需要创建一个配置类( Configuration
),并将其注册到 SpringClientFactory
。这样,在 SpringClientFactory
为 Client
创建 ApplicationContext
时,就会将配置类注册到 ApplicationContext
,向 SpringClientFactory
注册的配置类也就成了创建的 ApplicationContext
的配置类。
@Configuration public class RibbonConfiguration implements InitializingBean { @Resource private SpringClientFactory springClientFactory; @Override public void afterPropertiesSet() throws Exception { List<RibbonClientSpecification> cfgs = new ArrayList<>(); RibbonClientSpecification configuration = new RibbonClientSpecification(); // 针对哪个服务提供者配置 configuration.setName(ProviderConstant.SERVICE_NAME); // 注册的配置类 configuration.setConfiguration(new Class[]{RibbonClientCfg.class}); cfgs.add(configuration); springClientFactory.setConfigurations(cfgs); } // 指定在RibbonClientConfiguration之后生效 @AutoConfigureBefore(RibbonClientConfiguration.class) public static class RibbonClientCfg { @Bean public IClientConfig ribbonClientConfig() { DefaultClientConfigImpl config = new DefaultClientConfigImpl(); config.setClientName("随便填,不影响,用不到"); config.set(CommonClientConfigKey.MaxAutoRetries, 1); config.setProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, 3); config.set(CommonClientConfigKey.ConnectTimeout, 15000); config.set(CommonClientConfigKey.ReadTimeout, 15000); return config; } } } 复制代码
因为 Ribbon
是在第一次调用接口时才会创建 ApplicationContext
,所以我们在应用程序的 Spring
容器初始化阶段获取 SpringClientFactory
并为其添加自定义配置类能够生效。
RibbonClientCfg
声明在 RibbonClientConfiguration
之前生效,这样 RibbonClientConfiguration
就不会向容器中注册 IClientConfig
了。
RetryHandler
? OpenFeign
整合 Ribbon
使用时,默认使用的是 FeignLoadBalancer
的 getRequestSpecificRetryHandler
方法创建的 RequestSpecificRetryHandler
,笔者也看了一圈源码,实在找不到怎么替换 RetryHandler
,可能 OpenFeign
就是不想给我们替换吧。这种情况我们只能另寻辟径了。
既然使用的是 FeignLoadBalancer
的 getRequestSpecificRetryHandler
方法返回的 RetryHandler
,那么我们是不是可以继承 FeignLoadBalancer
并重写 getRequestSpecificRetryHandler
方法来替换 RetryHandler
呢?答案是可以的。
自定义的 FeignLoadBalancer
代码如下:
/** * 自定义FeignLoadBalancer,替换默认的RequestSpecificRetryHandler */ public static class MyFeignLoadBalancer extends FeignLoadBalancer { public MyFeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig, ServerIntrospector serverIntrospector) { super(lb, clientConfig, serverIntrospector); } @Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonRequest request, IClientConfig requestConfig) { // 返回自定义的RequestSpecificRetryHandler // 参数一:是否连接异常重试时重试 // 参数二:是否所有异常都重试 return new RequestSpecificRetryHandler(false, false, getRetryHandler(), requestConfig) { /** * @param e 抛出的异常 * @param sameServer 是否同节点服务的重试 * @return */ @Override public boolean isRetriableException(Throwable e, boolean sameServer) { if (e instanceof ClientException) { // 连接异常重试 if (((ClientException) e).getErrorType() == ClientException.ErrorType.CONNECT_EXCEPTION) { return true; } // 连接超时重试 if (((ClientException) e).getErrorType() == ClientException.ErrorType.SOCKET_TIMEOUT_EXCEPTION) { return true; } // 读超时重试,读超时重试只允许不同服务节点的重试 // 所以同节点的重试不支持,读超时了就不要重新请求同一个节点了。 if (((ClientException) e).getErrorType() == ClientException.ErrorType.READ_TIMEOUT_EXCEPTION) { return !sameServer; } // 服务端异常 // 服务端异常切换新节点重试 if (((ClientException) e).getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) { return !sameServer; } } // 连接异常时重试 return isConnectionException(e); } }; } } 复制代码
由于 FeignLoadBalancer
是在 OpenFeign
的 LoadBalancerFeignClient
中调用一个 CachingSpringLoadBalancerFactory
创建的,所以我们还需要替换 OpenFeign
的 FeignRibbonClientAutoConfiguration
配置类注册的 CachingSpringLoadBalancerFactory
,并且重写 CachingSpringLoadBalancerFactory
的 create
方法,代码如下。
@Configuration public class RibbonConfiguration { /** * 使用自定义FeignLoadBalancer缓存工厂 * * @return */ @Bean public CachingSpringLoadBalancerFactory cachingSpringLoadBalancerFactory() { return new CachingSpringLoadBalancerFactory(springClientFactory) { private volatile Map<String, FeignLoadBalancer> cache = new ConcurrentReferenceHashMap<>(); @Override public FeignLoadBalancer create(String clientName) { FeignLoadBalancer client = this.cache.get(clientName); if (client != null) { return client; } IClientConfig config = this.factory.getClientConfig(clientName); ILoadBalancer lb = this.factory.getLoadBalancer(clientName); ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class); // 使用自定义的FeignLoadBalancer client = new MyFeignLoadBalancer(lb, config, serverIntrospector); this.cache.put(clientName, client); return client; } }; } } 复制代码