注:文章中使用的 dubbo 源码版本为 2.5.4
一、服务引用的目的
二、关键概念及关系
三、服务引用流程详解
四、整体流程图总结
五、后续系列文章预告
“服务消费者”向注册中心订阅“服务提供者”提供的服务地址,并生成服务接口的实际代理对象。
服务引用Bean和引用配置类,服务引用信息的承载体。
协议抽象接口 Protocol 的实现。其 <T> Invoker<T> refer(Class<T> type, URL url) 方法完成了服务引用的完整功能。
同时具有目录服务 Directory 和监听器 NotifyListener 两个接口的功能,并组合了注册器Registry。
subscribe(URL url) 方法完成了在注册中心对服务提供者地址变更的监听功能; NotifyListener 接口的 notify(List<URL> urls) 方法执行监听回调,监听服务提供者地址变更并创建对应的Invoker; 基于javassist的动态代理工厂,其内部使用dubbo自己的动态代理机制 Proxy 完成动态代理类的创建工作。
2.5 关系图
点击这里查看大图
引用配置及初始化
引用配置:
服务引用方在工程中会有如上图中的Spring配置。
在容器启动的时候会解析schema元素 dubbo:reference/ 转换成dubbo内部数据结构ReferenceBean。
ReferenceBean就是服务引用,它继承了dubbo的引用配置类ReferenceConfig,并实现了Spring的FactoryBean和InitializingBean接口。
public Object getObject() throws Exception {
return get();
}
ReferenceBean 的 getObject 方法执行其get()方法,最终会执行到 ReferenceConfig 的 init() 方法,在该方法中完成代理对象的生成过程。
private void init() {
/*...省略部分代码...*/
//step1 根据配置生成Map
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
/*...省略部分代码...*/
//step2 调用createProxy生成代理类ref
ref = createProxy(map);
}
如上,init()中主要干的事情有两件。一,根据配置生成map;二,调用createProxy生成代理对象ref。
下图展示了生成的map的信息:
createProxy() 方法主要做了如下三件事:
- 1)加载配置中心拼装成urls;
- 2)遍历urls,调用refProtocol创建远程的动态代理Invoker;
- 3)调用proxyFactory创建服务代理。
private T createProxy(Map<String, String> map) {
/*...省略部分代码...*/
//step1 加载配置的所有注册中心,拼装成urls
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
/*...省略部分代码...*/
//step2 遍历urls,调用refProtocol.refer创建远程的动态代理Invoker
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
/*...省略部分代码...*/
// step3 调用proxyFactory创建服务代理
return (T) proxyFactory.getProxy(invoker);
refProtocol 为dubbo通过 ExtensionLoader 动态注入的 RegistryProtocol 实例。通过其refer方法创建Invoker。
具体进行的工作分为如下几步:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//step1 从url的registryKey获取注册中心类型:zookeeper
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//step2 从RegistryFactory获取注册器
Registry registry = registryFactory.getRegistry(url);
//...省略部分代码...
//doRefer
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//step3 构建RegistryDirectory,可以把它理解为注册资源,其中包含了消费者/服务/路由等相关信息
//其同时也是回调监听器
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//step4 向注册中心注册服务消费者
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//step5 从注册中心订阅服务提供者(即引用的服务)
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//step6
return cluster.join(directory);
}
先按照上图对RegistryDirectory进行说明:
所以,执行到"step5 从注册中心订阅服务提供者",调用RegistryDirectory的subscribe(URL url)方法时,完成了对引用url的订阅并同时出发监听流程:
public synchronized void notify(List<URL> urls) {
//step1 根据类别将urls分类
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
//...省略部分代码...
//providers
refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
//...省略部分代码...
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
//...省略部分代码...
//step3 最终刷新urlInvokerMap和methodInvokerMap
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
//...省略部分代码...
//step2 protocol.refer根据url创建远程代理Invoker
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
执行到 return cluster.join(directory) ,即通过 FailFastCluster 获取并返回远程代理Invoker:
public class FailfastCluster implements Cluster {
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailfastClusterInvoker<T>(directory);
}
}
其实就是new了一个 FailfastClusterInvoker 并返回,构造方法参数为3.3步骤中的RegistryDirectory。
当 invoke(Invocation invocation) 方法被调用时:
public Result invoke(final Invocation invocation) throws RpcException {
LoadBalance loadbalance;
//step1 根据服务引用的url从RegistryDirectory中获取对应远程DubboInvocation
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
//step2 动态创建负载均衡算法
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//step3 根据负载均衡算法从Invoker列表中选出一个invoker执行
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
最终回到 ReferenceConfig 中init的最后一步 return (T) proxyFactory.getProxy(invoker); ,通过 JavassistProxyFactory.getProxy(invoker) 将Invoker转换成代理对象ref:
InvokerInvocationHandler
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
执行完以上步骤后,终于生成了dubbo远程动态代理对象T ref,我们可以直接使用该对象来完成RPC调用了。
点击这里查看大图
整体看下来,服务引用主要分为以下几大步骤:
该部分以Spring配置及 ReferenceBean 为入口,主要在 ReferenceConfig 中进行。
ReferenceConfig 依赖 RegistryProtocol 完成了 "服务引用者注册"、"服务提供者订阅"和"Invoker创建" 的工作; ReferenceConfig 依赖 JavassistProxyFactory 完成了 "代理对象生成" 的工作; 该部分主要由 RegistryDirectory 和 FailfastCluster 实现。
ReferenceConfig 调用 RegistryDirectory 的 subscribe 方法,触发了对服务提供者url的订阅及监听,在监听过程中 RegistryDirectory 借助 DubboProtocol 完成了Invoker的创建工作,并保存了服务引用url和Invoker的关系; ReferenceConfig 调用 FailfastCluster 的join方法,完成了对Invoker对象的获取; 该部分主要由 JavassistProxyFactory 完成。
ReferenceConfig 调用 JavassistProxyFactory 的getProxy方法为入口,传入Invoker; InvokerInvocationHandler ,并使用dubbo自己的动态代理工具Proxy最终生成代理对象T ref;
黑线表示方法调用
虚线表示对象引用
红线表示请求调用过程