上篇文章我们详细的介绍了RestTemplate发送请求的问题,熟悉Spring的小伙伴可能会发现:RestTemplate不就是Spring提供的一个发送请求的工具吗?它什么时候具有了实现客户端负载均衡的功能的?本文我们就来聊一聊RestTemplate的逆袭之路,看它如何从一个普通的请求发送工具变成了具有客户端负载均衡功能的请求发送工具。
本文是Spring Cloud系列的第七篇文章,了解前六篇文章内容有助于更好的理解本文:
我们在 Spring Cloud中服务的发现与消费 一文中首先使用了RestTemplate并且开启了客户端负载均衡功能,当时我们说开启负载均衡很简单,只需要在RestTemplate的bean上再添加一个@LoadBalanced注解即可,所以本文我们就从这个注解开始我们的分析吧。
首先我们来看看@LoadBalanced注解的源码:
/** * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient * @author Spencer Gibb */ @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Qualifier public @interface LoadBalanced { }
我们看它的注释说:这个注解是用来给RestTemplate做标记,以使用LoadBalancerClient来配置它。那我们来看看LoadBalancerClient是什么:
public interface ServiceInstanceChooser { ServiceInstance choose(String serviceId); } public interface LoadBalancerClient extends ServiceInstanceChooser { <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; URI reconstructURI(ServiceInstance instance, URI original); }
LoadBalancerClient是一个接口,该接口中有四个方法,我们来大概看一下这几个方法的作用:
http://HELLO-SERVICE/hello
)而不是具体的服务地址,在reconstructURI方法中,第一个参数ServiceInstance实例是一个带有host和port的具体服务实例,第二个参数URI则是使用逻辑服务名定义为host和port的URI,而返回的URI则是通过ServiceInstance的服务实例详情拼接出的具体的host:port形式的请求地址。一言以蔽之,就是把类似于 http://HELLO-SERVICE/hello
这种地址转为类似于 http://195.124.207.128/hello
地址(IP地址也可能是域名)。 OK,找到了LoadBalancerClient还不够,那么具体的配置是在哪里执行的呢?我们在LoadBalancerClient的包下面发现了一个类叫做LoadBalancerAutoConfiguration,看名字有点像是客户端负载均衡服务器的自动化配置类,我们来看看这个类的源码:
@Configuration @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializer( final List<RestTemplateCustomizer> customizers) { return new SmartInitializingSingleton() { @Override public void afterSingletonsInstantiated() { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } } }; } @Autowired(required = false) private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, transformers); } @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { @Override public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } } @Configuration @ConditionalOnClass(RetryTemplate.class) public static class RetryAutoConfiguration { @Bean public RetryTemplate retryTemplate() { RetryTemplate template = new RetryTemplate(); template.setThrowLastExceptionOnExhausted(true); return template; } @Bean @ConditionalOnMissingBean public LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory() { return new LoadBalancedRetryPolicyFactory.NeverRetryFactory(); } } @Configuration @ConditionalOnClass(RetryTemplate.class) public static class RetryInterceptorAutoConfiguration { @Bean @ConditionalOnMissingBean public RetryLoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties, LoadBalancedRetryPolicyFactory lbRetryPolicyFactory, LoadBalancerRequestFactory requestFactory) { return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, lbRetryPolicyFactory, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final RetryLoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { @Override public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } } }
这个类的源码比较长,我们就来说一下这里的核心功能:
小伙伴们应该也发现了,这里的核心其实就是一个拦截器,就是这个拦截器让一个普通的RestTemplate逆袭成为了一个具有负载均衡功能的请求器。那我们接下来就来看看这个拦截器:
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } }
拦截器中注入了LoadBalancerClient的实现,当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时,会被LoadBalancerInterceptor类的intercept方法拦截,在这个方法中直接通过getHost方法就可以获取到服务名(因为我们在使用RestTemplate调用服务的时候,使用的是服务名而不是域名,所以这里可以通过getHost直接拿到服务名然后去调用execute方法发起请求)。
OK,说到这里我们的LoadBalancerClient还只是一个接口,我们要去看看这个接口的实现是什么样的,还好,这个接口只有一个实现类,我们来看看:
public class RibbonLoadBalancerClient implements LoadBalancerClient { @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); } @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if(serviceInstance instanceof RibbonServer) { server = ((RibbonServer)serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; } protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null) { return null; } return loadBalancer.chooseServer("default"); // TODO: better handling of key } protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } }
当然这个类的源码很长,我这里只列出了一部分,在execute方法中,首先根据serviceId获取一个ILoadBalancer,然后调用getServer方法去获取一个服务实例,但是在getServer方法中,我们看到并没有调用LoadBalancerClient中的choose方法,而是调用了另一个叫做ILoadBalancer的中定义的chooseServer方法。那这个接口又是什么呢?我们来看看:
public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); }
我来大概说一说这几个方法:
1. addServers表示向负载均衡器中维护的实例列表增加服务实例
2. chooseServer表示通过某种策略,从负载均衡服务器中挑选出一个具体的服务实例
3. markServerDown表示用来通知和标识负载均衡器中某个具体实例已经停止服务,否则负载均衡器在下一次获取服务实例清单前都会认为这个服务实例是正常工作的
4. getReachableServers表示获取当前正常工作的服务实例列表
5. getAllServers表示获取所有的服务实例列表,包括正常的服务和停止工作的服务
那么这里的几个接口都涉及到一个Server对象,这里的Server对象就是一个传统的服务端节点,这个对象中存储了服务端节点的一些元数据信息,包括host,port以及其他一些部署信息。通过下图我们可以一窥该接口的实现类:
那么在这些实现类中,BaseLoadBalancer类实现了基础的负载均衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer则在负载均衡的策略上做了一些功能的扩展。那么在和Ribbon整合的时候,Spring Cloud默认采用了哪个具体的实现呢?我们可以从RibbonClientConfiguration类中一窥究竟(这个类很长,我们这里只看我们关心的):
@Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); }
OK,我们在这里看到系统默认采用了ZoneAwareLoadBalancer负载均衡器。此时我们需要重新回到RibbonLoadBalancerClient类中继续看我们的execute方法的执行情况,在execute方法中,当获取到一个Server对象之后,将之包装成一个RibbonServer对象(从包装的过程我们可以发现,RibbonServer对象中保存了Server的所有信息,同时还保存了服务名serviceId、是否需要HTTPS等其他信息),然后再调用另一个重载的execute方法,在另一个重载的execute方法中最终调用到了LoadBalancerRequest中的apply方法,该方法向一个具体的服务实例发送请求,从而实现了从 http://服务名/hello
到 http://域名/hello
的转换。apply方法接收了一个参数叫做ServiceInstance,这个实际上就是RibbonServer传进来的那个实例,我们查看RibbonServer,发现它其实就是ServiceInstance的一个子类,而ServiceInstance接口对象是对服务实例的抽象定义,ServiceInstance接口中暴露了服务治理体系中每个服务实例需要提供的一些基本信息,比如serviceId、host、port等,具体定义如下:
public interface ServiceInstance { String getServiceId(); String getHost(); int getPort(); boolean isSecure(); URI getUri(); Map<String, String> getMetadata(); }
RibbonServer是ServiceInstance的一个子类,具体实现差不多,这里我就不贴出源码了。
这时候我们发现apply方法是LoadBalancerRequest接口中的一个方法,且LoadBalancerRequest接口没有实现类,那么apply方法的实现是在哪里实现的呢?此时我们发现LoadBalancerRequest中的apply方法在执行的时候,这个request是从LoadBalancerInterceptor拦截器里边传来的,我们再回到LoadBalancerInterceptor的intercept方法中,在这个方法中最终通过 requestFactory.createRequest(request, body, execution)
来创建一个LoadBalancerRequest,在这个方法中,我们找到了apply的实现:
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) { return new LoadBalancerRequest<ClientHttpResponse>() { @Override public ClientHttpResponse apply(final ServiceInstance instance) throws Exception { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer); if (transformers != null) { for (LoadBalancerRequestTransformer transformer : transformers) { serviceRequest = transformer.transformRequest(serviceRequest, instance); } } return execution.execute(serviceRequest, body); } }; }
我们看到,在apply的实现中,重新创建了一个ServiceRequestWrapper,这个ServiceRequestWrapper实际上就是HttpRequestWrapper的一个子类,ServiceRequestWrapper重写了HttpRequestWrapper的getURI()方法,重写的URI实际上就是通过调用LoadBalancerClient接口的reconstructURI函数来重新构建一个URI进行访问,如下:
public class ServiceRequestWrapper extends HttpRequestWrapper { private final ServiceInstance instance; private final LoadBalancerClient loadBalancer; public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) { super(request); this.instance = instance; this.loadBalancer = loadBalancer; } @Override public URI getURI() { URI uri = this.loadBalancer.reconstructURI( this.instance, getRequest().getURI()); return uri; } }
此时,我们再回到RibbonLoadBalancerClient类的reconstructURI方法中,来详细的看看这里的重构过程:
@Override public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); Server server = new Server(instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); URI uri = RibbonUtils.updateToHttpsIfNeeded(original, clientConfig, serverIntrospector, server); return context.reconstructURIWithServer(server, uri); }
从reconstructURI函数中我们可以看到,首先获取到了一个serviceId,然后根据这个id获取到RibbonLoadBalancerContext对象(RibbonLoadBalancerContext类用来存储一些被负载均衡器使用的上下文内容和API操作),然后这里会根据ServiceInstance的信息来构造一个具体的服务实例信息的Server对象,最后再调用reconstructURIWithServer方法来构建服务实例的URI。好,我们再来看一看reconstructURIWithServer方法:
public URI reconstructURIWithServer(Server server, URI original) { String host = server.getHost(); int port = server .getPort(); if (host.equals(original.getHost()) && port == original.getPort()) { return original; } String scheme = original.getScheme(); if (scheme == null) { scheme = deriveSchemeAndPortFromPartialUri(original).first(); } try { StringBuilder sb = new StringBuilder(); sb.append(scheme).append("://"); if (!Strings.isNullOrEmpty(original.getRawUserInfo())) { sb.append(original.getRawUserInfo()).append("@"); } sb.append(host); if (port >= 0) { sb.append(":").append(port); } sb.append(original.getRawPath()); if (!Strings.isNullOrEmpty(original.getRawQuery())) { sb.append("?").append(original.getRawQuery()); } if (!Strings.isNullOrEmpty(original.getRawFragment())) { sb.append("#").append(original.getRawFragment()); } URI newURI = new URI(sb.toString()); return newURI; } catch (URISyntaxException e) { throw new RuntimeException(e); } }
reconstructURIWithServer函数的逻辑看起来很好理解,首先它从Server对象中获取host和port信息,然后根据以服务名为host的URI对象original中获取其他请求信息,将这两者的内容进行拼接整合,形成最终要访问的服务实例地址,至此,我们就拿到了一个组装之后的URI。
我们再回到LoadBalancerRequest类的createRequest方法,这里调用了execution.execute(serviceRequest, body)来创建了一个ClientHttpResponse对象,这里调用了ClientHttpRequestExecution接口中的execute方法,ClientHttpRequestExecution接口只有一个实现类,那就是InterceptingRequestExecution,在InterceptingRequestExecution中我们找到了execute方法的实现,如下:
@Override public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); return nextInterceptor.intercept(request, body, this); } else { ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod()); for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) { List<String> values = entry.getValue(); for (String value : values) { delegate.getHeaders().add(entry.getKey(), value); } } if (body.length > 0) { StreamUtils.copy(body, delegate.getBody()); } return delegate.execute(); } }
小伙伴们看到,这里在创建ClientHttpRequest对象的时候,调用了request的getURI()方法,此时的getURI()已经是被重写过的URI了。
OK,至此,RestTemplate从一个简单的服务请求控件变成了具有客户端负载均衡功能的请求控件,小伙伴们也大概理清了Spring Cloud Ribbon中实现客户端负载均衡的基本套路了。简而言之,就是RestTemplate发起一个请求,这个请求被LoadBalancerInterceptor给拦截了,拦截后将请求的地址中的服务逻辑名转为具体的服务地址,然后继续执行请求,就是这么一个过程。
这就是RestTemplate的逆袭之路,有问题欢迎留言讨论。
更多JavaEE资料请关注公众号:
以上。。