转载

springcloud-ribbon源码

不要问我阅读spring源码有什么用,问就是没有用,只是让我自己使用spring的过程中自信点!

相关文章

spring-相关文章

不是很完整,后续会补充

说明

1. ribbon是使用的RestTemplate,从表象看RestTemplate实现了负载均衡
2. RestTemplate之所有能实现负载均衡是因为RestTemplate添加了拦截器
复制代码

使用

@Configuration
public class AppConfig {
	
	 @Bean
	 //这个注解必须加,不加的话就是单纯的使用restTemplate
	 @LoadBalanced
	 public RestTemplate restTemplate() {
		 return new RestTemplate();
	 }
	 
}

@RequestMapping("test01")
@RestController
public class TestController01 {
	@Autowired
	private RestTemplate restTemplate;
	
	@RequestMapping("test01")
	public  String test01() {
		//使用restTemplate 可以达到负载均衡的目的 service-ribbon-test01 为服务名
		return restTemplate.getForObject("http://service-ribbon-test01/test01/test01", String.class);
	}

}

复制代码

源码

根据LoadBalancerInterceptor获取ClientHttpRequestFactory

@Override
@Nullable
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
	RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
	HttpMessageConverterExtractor<T> responseExtractor =
			new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
	return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
}


@Override
@Nullable
public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
		@Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

	URI expanded = getUriTemplateHandler().expand(url, uriVariables);
	return doExecute(expanded, method, requestCallback, responseExtractor);
}


@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
		@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

	Assert.notNull(url, "URI is required");
	Assert.notNull(method, "HttpMethod is required");
	ClientHttpResponse response = null;
	try {
	    //获取ClientHttpRequest 判断依据就是看看是否存在拦截器
		ClientHttpRequest request = createRequest(url, method);
		if (requestCallback != null) {
			requestCallback.doWithRequest(request);
		}
		response = request.execute();
		handleResponse(url, method, response);
		return (responseExtractor != null ? responseExtractor.extractData(response) : null);
	}
	
}


protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        //这里getRequestFactory 先获取工厂
	ClientHttpRequest request = getRequestFactory().createRequest(url, method);
	initialize(request);
	if (logger.isDebugEnabled()) {
		logger.debug("HTTP " + method.name() + " " + url);
	}
	return request;
}


@Override
public ClientHttpRequestFactory getRequestFactory() {
        //获取拦截器  这个拦截器就是我们上面提到过的
	List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
	if (!CollectionUtils.isEmpty(interceptors)) {
		ClientHttpRequestFactory factory = this.interceptingRequestFactory;
		if (factory == null) {
		    //创建ClientHttpRequestFactory , 并且把拦截器放入工厂中
			factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
			this.interceptingRequestFactory = factory;
		}
		return factory;
	}
	else {
	    //假如没有拦截器就是用默认的工厂
		return super.getRequestFactory();
	}
}

//创建ClientHttpRequest
@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
        //我们关注的是工厂的拦截器(就是上个方法的拦截器),放入了ClientHttpRequest
	return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}

复制代码

LoadBalancerInterceptor的添加过程

//org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration
//LoadBalancerAutoConfiguration是个配置类
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
		final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
	//restTemplateCustomizers是个函数是接口 
	//下面方法下看下 restTemplateCustomizers 是在什么时候创建的,返回的又是个什么东西
	return () -> restTemplateCustomizers.ifAvailable(customizers -> {
		for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
			for (RestTemplateCustomizer customizer : customizers) {
			    //循环我们注册的restTemplate 调用customizer.customize
			    //假如这段代码的调用逻辑整不明白,建议先学习下 函数式接口lambda
				customizer.customize(restTemplate);
			}
		}
	});
}


//org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.LoadBalancerInterceptorConfig.restTemplateCustomizer(LoadBalancerInterceptor)
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
		final LoadBalancerInterceptor loadBalancerInterceptor) {
	return restTemplate -> {
		List<ClientHttpRequestInterceptor> list = new ArrayList<>(
				restTemplate.getInterceptors());
		list.add(loadBalancerInterceptor);
		//看这里,,,把拦截器放入 restTemplate中
		restTemplate.setInterceptors(list);
	};
}

//看了上面的两个方法,大致能发现一个重要的问题吧  
//那就是LoadBalancerAutoConfiguration.this.restTemplates非常重要
//里面有值 那就添加拦截器,然后restTemplate就能实现负载均衡,反之不能那我们就来看看 

//org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

//restTemplates初始化为一个空的集合
//@Autowired(required = false)使用了依赖注入
//@LoadBalanced 加这个注解
//假如不加@LoadBalanced注解,restTemplates只要springioc容器中存在restTemplate就会被放进去
//就是因为加了@LoadBalanced注解,所以springioc容器依赖注入的时候判断下
//你的restTemplates的创建是否使用了了@LoadBalanced
//也这是为什么我们创建restTemplate的时候要使用@LoadBalanced的原因,@LoadBalanced就像一个开关
//依赖注入请移步另一篇文章

复制代码

处理服务名为url(ribbon的主要内容)

@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
		@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

	Assert.notNull(url, "URI is required");
	Assert.notNull(method, "HttpMethod is required");
	ClientHttpResponse response = null;
	try {
	        //在这里根据 是否存在拦截器获取不同的工厂,创建不同的ClientHttpRequest
		ClientHttpRequest request = createRequest(url, method);
		if (requestCallback != null) {
			requestCallback.doWithRequest(request);
		}
		//执行
		response = request.execute();
		handleResponse(url, method, response);
		return (responseExtractor != null ? responseExtractor.extractData(response) : null);
	}
	
}


@Override
public final ClientHttpResponse execute() throws IOException {
    //判断时是否执行
	assertNotExecuted();
	//执行
	ClientHttpResponse result = executeInternal(this.headers);
	//设置为已执行
	this.executed = true;
	return result;
}


@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
	byte[] bytes = this.bufferedOutput.toByteArray();
	if (headers.getContentLength() < 0) {
		headers.setContentLength(bytes.length);
	}
	//这里
	ClientHttpResponse result = executeInternal(headers, bytes);
	this.bufferedOutput = new ByteArrayOutputStream(0);
	return result;
}

@Override
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
	InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
	//执行
	return requestExecution.execute(this, bufferedOutput);
}


@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
	if (this.iterator.hasNext()) {
	        //假如存在拦截器走这里
		ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
		return nextInterceptor.intercept(request, body, this);
	}
	else {
	        //不存在,走这里
		HttpMethod method = request.getMethod();
		Assert.state(method != null, "No standard HTTP method");
		ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
		request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
		if (body.length > 0) {
			if (delegate instanceof StreamingHttpOutputMessage) {
				StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
				streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
			}
			else {
				StreamUtils.copy(body, delegate.getBody());
			}
		}
		return delegate.execute();
	}
}



@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
		final ClientHttpRequestExecution execution) throws IOException {
	final URI originalUri = request.getURI();
	String serviceName = originalUri.getHost();
	Assert.state(serviceName != null,
			"Request URI does not contain a valid hostname: " + originalUri);
	//如果说上面的方法都是spring提供的公用方法,下面的就是ribbon组件的了
	//假如以后存在一个别的组件和ribbon的功能类似,整合的过程中,也是可以从这里开始提供具体实现的
	return this.loadBalancer.execute(serviceName,
			this.requestFactory.createRequest(request, body, execution));
}


//org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient
//RibbonLoadBalancerClient是啥时候注册的这里就不提了 ,  自己去 看吧
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
			throws IOException {
			
	//下面的两个方法都挺重要 我们一个一个看
	//获取loadBalancer  
	//loadBalancer中包含服务列表
	ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
	//获取服务,里面包含负载均衡的策略
	Server server = getServer(loadBalancer, hint);
	if (server == null) {
		throw new IllegalStateException("No instances available for " + serviceId);
	}
	RibbonServer ribbonServer = new RibbonServer(serviceId, server,
			isSecure(server, serviceId),
			serverIntrospector(serviceId).getMetadata(server));
    //执行
	return execute(serviceId, ribbonServer, request);
}

复制代码

getLoadBalancer

protected ILoadBalancer getLoadBalancer(String serviceId) {
	return this.clientFactory.getLoadBalancer(serviceId);
}
public ILoadBalancer getLoadBalancer(String name) {
	return getInstance(name, ILoadBalancer.class);
}

@Override
public <C> C getInstance(String name, Class<C> type) {
    //这个方法里面其实就是根据传入的type去springioc中获取bena   ILoadBalancer
    //ILoadBalancer  实现类为ZoneAwareLoadBalancer
    //有一个属性 包含了注册中心所有注册的服务详情
    //现在看下ZoneAwareLoadBalancer在什么地方创建干了什么
	C instance = super.getInstance(name, type);
	if (instance != null) {
		return instance;
	}
	IClientConfig config = getInstance(name, IClientConfig.class);
	return instantiateWithConfig(getContext(name), type, config);
}

复制代码

ZoneAwareLoadBalancer

//创建
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
		ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
		IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
	if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
		return this.propertiesFactory.get(ILoadBalancer.class, config, name);
	}
	
	//rule 为负载均衡策略
	//serverList 为服务列表 只要实现ServerList即可 , nacos ,, eureka 
	//serverListUpdater 为 跑定时 任务的。
	//因为我这这边使用的是nacos 所以 类型为 : NacosServerList
	//具体 NacosServerList 是啥时候创建的 这么来的,不看了,等会面说nacos源码的时候在看看
	//具体的 构造方法不看了
	//还有定时任务刷新服务列表的。。。很多
	return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
			serverListFilter, serverListUpdater);
}


复制代码

getServer

//ILoadBalancer 就是 ZoneAwareLoadBalancer
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
	if (loadBalancer == null) {
		return null;
	}
	// Use 'default' on a null hint, or just pass it on?
	return loadBalancer.chooseServer(hint != null ? hint : "default");
}

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            //根据你传递的不同的策略规则选择不同的服务
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}
//后续执行http请求
//结束
复制代码

总结

1. 根据是否存在拦截器获取不同的request工厂,获取不同的request
2. 拦截器是保存在restTemplate中的
3. 拦截器是一直存在的,但是要看restTemplate是否被@LoadBalanced注释,只有注释了,才会把拦截器放入这个restTemplate
4. 获取服务列表
5. 根据rule策略选择具体的服务
6. http请求
复制代码
原文  https://juejin.im/post/5f0e5f87f265da22f24e7538
正文到此结束
Loading...