转载

Dubbo中暴露服务的过程解析

作者:大程熙

地址:cxis.me/2017/02/19/…

示例项目: github.com/souyunku/sp…

Dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的ServiceBean类进行回调onApplicationEvent 事件方法,dubbo会在这个方法中调用ServiceBean父类ServiceConfig的export方法,而该方法真正实现了服务的(异步或者非异步)发布。

加载dubbo配置

Spring容器在启动的时候,会读取到Spring默认的一些schema以及Dubbo自定义的schema,每个schema都会对应一个自己的NamespaceHandler,NamespaceHandler里面通过BeanDefinitionParser来解析配置信息并转化为需要加载的bean对象。

遇到dubbo名称空间 ,首先会调用DubboNamespaceHandler类的 init方法 进行初始化操作。

根据命名空间去获取具体的处理器NamespaceHandler。那具体的处理器是在哪定义的呢,在” META-INF/spring.handlers ”文件中,Spring在会自动加载该文件中所有内容。

META-INF/spring.handlers

http/://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

META-INF/spring.schemas

http/://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd

根据不同的XML节点,会委托NamespaceHandlerSupport 类找出合适的BeanDefinitionParser,其中Dubbo所有的标签都使用 DubboBeanDefinitionParser进行解析,基于一对一属性映射,将XML标签解析为Bean对象。

DubboNamespaceHandler.java 类代码如下

package com.alibaba.dubbo.config.spring.schema;

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}

由于DubboBeanDefinitionParser 类中 parse转换的过程代码还是比较复杂,只抽离出来bean的注册这一块的代码如下

DubboBeanDefinitionParser.java 类代码如下

package com.alibaba.dubbo.config.spring.schema;

public class DubboBeanDefinitionParser implements BeanDefinitionParser {

	@SuppressWarnings("unchecked")
	private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
	
	RootBeanDefinition beanDefinition = new RootBeanDefinition();
        beanDefinition.setBeanClass(beanClass);
        beanDefinition.setLazyInit(false);
        String id = element.getAttribute("id");
        //省略......
         if(id != null && id.length() > 0) {
            if(parserContext.getRegistry().containsBeanDefinition(id)) {
                throw new IllegalStateException("Duplicate spring bean id " + id);
            }
            //registerBeanDefinition 注册Bean的定义
            //具体的id如下 applicationProvider.xml解析后的显示 id,
            //如id="dubbo_provider"  beanDefinition = "ApplicationConfig"
            parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
            beanDefinition.getPropertyValues().addPropertyValue("id", id);
        }
	 }	
}

通过DubboBeanDefinitionParser 类的 parse方法会将class信息封装成BeanDefinition,然后将BeanDefinition再放进DefaultListableBeanFactory的beanDefinitionMap中。

最后通过Spring bean 的加载机制进行加载。

服务暴露过程

Dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的ServiceBean类进行回调onApplicationEvent 事件方法,dubbo会在这个方法中调用ServiceBean父类ServiceConfig的export方法,而该方法真正实现了服务的(异步或者非异步)发布。

服务暴露入口

由服务配置类 ServiceConfig 进行初始化工作及服务暴露入口,首先进去执行该类的export()方法。

ServiceConfig.java 类的 export 方法

export的步骤简介

  1. 首先会检查各种配置信息,填充各种属性,总之就是保证我在开始暴露服务之前,所有的东西都准备好了,并且是正确的。
  2. 加载所有的注册中心,因为我们暴露服务需要注册到注册中心中去。
  3. 根据配置的所有协议和注册中心url分别进行导出。
  4. 进行导出的时候,又是一波属性的获取设置检查等操作。
  5. 如果配置的不是remote,则做本地导出。
  6. 如果配置的不是local,则暴露为远程服务。
  7. 不管是本地还是远程服务暴露,首先都会获取Invoker。
  8. 获取完Invoker之后,转换成对外的Exporter,缓存起来。

export方法先判断是否需要延迟暴露(这里我们使用的是不延迟暴露),然后执行doExport方法。

doExport方法先执行一系列的检查方法,然后调用doExportUrls方法。检查方法会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。

doExportUrls方法先调用loadRegistries获取所有的注册中心url,然后遍历调用doExportUrlsFor1Protocol方法。对于在标签中指定了registry属性的Bean,会在加载BeanDefinition的时候就加载了注册中心。

ServiceConfig.java 类的 export 方法

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

public synchronized void export() {
	if (provider != null) {
		if (export == null) {
			export = provider.getExport();
		}
		if (delay == null) {
			delay = provider.getDelay();
		}
	}
	if (export != null && !export) {
		return;
	}

	if (delay != null && delay > 0) {
		delayExportExecutor.schedule(new Runnable() {
			public void run() {
				doExport();
			}
		}, delay, TimeUnit.MILLISECONDS);
	} else {
		doExport();
	}
}

可以看出发布发布是支持延迟暴露发布服务的,这样可以用于当我们发布的服务非常多,影响到应用启动的问题,前提是应用允许服务发布的延迟特性。

接下来就进入到 ServiceConfig.java 类的 doExport() 方法。

检查DUBBO配置的合法性

ServiceConfig.java 类的 doExport方法。检查DUBBO配置的合法性,并调用doExportUrls 方法。

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:service interface=/"/" /> interface not allow null!");
        }
        checkDefault();
        if (provider != null) {
            if (application == null) {
                application = provider.getApplication();
            }
            if (module == null) {
                module = provider.getModule();
            }
            if (registries == null) {
                registries = provider.getRegistries();
            }
            if (monitor == null) {
                monitor = provider.getMonitor();
            }
            if (protocols == null) {
                protocols = provider.getProtocols();
            }
        }
        if (module != null) {
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor();
            }
        }
        if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            checkInterfaceAndMethods(interfaceClass, methods);
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        if (local != null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local";
            }
            Class<?> localClass;
            try {
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(localClass)) {
                throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);
            }
        }
        if (stub != null) {
            if ("true".equals(stub)) {
                stub = interfaceName + "Stub";
            }
            Class<?> stubClass;
            try {
                stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(stubClass)) {
                throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);
            }
        }
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        doExportUrls();
    }
}

我们可以看出该方法的实现的逻辑包含了根据配置的优先级将 ProviderConfig,ModuleConfig,MonitorConfig,ApplicaitonConfig 等一些配置信息进行组装和合并。还有一些逻辑是检查配置信息的合法性。最后又调用了doExportUrls方法。

服务多协议暴露过程

ServiceConfig.java 类的 doExportUrls() 方法

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

	@SuppressWarnings({"unchecked", "rawtypes"})
	private void doExportUrls() {
		List<URL> registryURLs = loadRegistries(true);
		for (ProtocolConfig protocolConfig : protocols) {
			doExportUrlsFor1Protocol(protocolConfig, registryURLs);
		}
	}
}

该方法第一步是加载注册中心列表

loadRegistries(true); 加载注册中心列表响应示例

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.5.6&file=/data/dubbo/cache/dubbo-provider&pid=21448®istry=zookeeper×tamp=1524134852031

第二部是将服务发布到多种协议的url上,并且携带注册中心列表的参数,从这里我们可以看出dubbo是支持同时将一个服务发布成为多种协议的,这个需求也是很正常的,客户端也需要支持多协议,根据不同的场景选择合适的协议。

ServiceConfig.java 类的 doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) 方法。

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

	private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
			String name = protocolConfig.getName();
			if (name == null || name.length() == 0) {
				name = "dubbo";
			}

			String host = protocolConfig.getHost();
			//host = 10.4.81.95
			if (provider != null && (host == null || host.length() == 0)) {
				host = provider.getHost();
			}
			boolean anyhost = false;
			if (NetUtils.isInvalidLocalHost(host)) {
				anyhost = true;
				try {
					host = InetAddress.getLocalHost().getHostAddress();
				} catch (UnknownHostException e) {
					logger.warn(e.getMessage(), e);
				}
				if (NetUtils.isInvalidLocalHost(host)) {
					if (registryURLs != null && registryURLs.size() > 0) {
						for (URL registryURL : registryURLs) {
							try {
								Socket socket = new Socket();
								try {
									SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
									socket.connect(addr, 1000);
									host = socket.getLocalAddress().getHostAddress();
									break;
								} finally {
									try {
										socket.close();
									} catch (Throwable e) {
									}
								}
							} catch (Exception e) {
								logger.warn(e.getMessage(), e);
							}
						}
					}
					if (NetUtils.isInvalidLocalHost(host)) {
						host = NetUtils.getLocalHost();
					}
				}
			}

			Integer port = protocolConfig.getPort();
			if (provider != null && (port == null || port == 0)) {
				port = provider.getPort();
			}
			final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
			if (port == null || port == 0) {
				port = defaultPort;
			}
			if (port == null || port <= 0) {
				port = getRandomPort(name);
				if (port == null || port < 0) {
					port = NetUtils.getAvailablePort(defaultPort);
					putRandomPort(name, port);
				}
				logger.warn("Use random available port(" + port + ") for protocol " + name);
			}

			Map<String, String> map = new HashMap<String, String>();
			if (anyhost) {
				map.put(Constants.ANYHOST_KEY, "true");
			}
			map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
			map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
			map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
			if (ConfigUtils.getPid() > 0) {
				map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
			}
			appendParameters(map, application);
			appendParameters(map, module);
			appendParameters(map, provider, Constants.DEFAULT_KEY);
			appendParameters(map, protocolConfig);
			appendParameters(map, this);
			if (methods != null && methods.size() > 0) {
				for (MethodConfig method : methods) {
					appendParameters(map, method, method.getName());
					String retryKey = method.getName() + ".retry";
					if (map.containsKey(retryKey)) {
						String retryValue = map.remove(retryKey);
						if ("false".equals(retryValue)) {
							map.put(method.getName() + ".retries", "0");
						}
					}
					List<ArgumentConfig> arguments = method.getArguments();
					if (arguments != null && arguments.size() > 0) {
						for (ArgumentConfig argument : arguments) {
							//类型自动转换.
							if (argument.getType() != null && argument.getType().length() > 0) {
								Method[] methods = interfaceClass.getMethods();
								//遍历所有方法
								if (methods != null && methods.length > 0) {
									for (int i = 0; i < methods.length; i++) {
										String methodName = methods[i].getName();
										//匹配方法名称,获取方法签名.
										if (methodName.equals(method.getName())) {
											Class<?>[] argtypes = methods[i].getParameterTypes();
											//一个方法中单个callback
											if (argument.getIndex() != -1) {
												if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
													appendParameters(map, argument, method.getName() + "." + argument.getIndex());
												} else {
													throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
												}
											} else {
												//一个方法中多个callback
												for (int j = 0; j < argtypes.length; j++) {
													Class<?> argclazz = argtypes[j];
													if (argclazz.getName().equals(argument.getType())) {
														appendParameters(map, argument, method.getName() + "." + j);
														if (argument.getIndex() != -1 && argument.getIndex() != j) {
															throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
														}
													}
												}
											}
										}
									}
								}
							} else if (argument.getIndex() != -1) {
								appendParameters(map, argument, method.getName() + "." + argument.getIndex());
							} else {
								throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
							}

						}
					}
				} // end of methods for
			}

			if (ProtocolUtils.isGeneric(generic)) {
				map.put("generic", generic);
				map.put("methods", Constants.ANY_VALUE);
			} else {
				String revision = Version.getVersion(interfaceClass, version);
				if (revision != null && revision.length() > 0) {
					map.put("revision", revision);
				}

				String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
				if (methods.length == 0) {
					logger.warn("NO method found in service interface " + interfaceClass.getName());
					map.put("methods", Constants.ANY_VALUE);
				} else {
					map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
				}
			}
			if (!ConfigUtils.isEmpty(token)) {
				if (ConfigUtils.isDefault(token)) {
					map.put("token", UUID.randomUUID().toString());
				} else {
					map.put("token", token);
				}
			}
			if ("injvm".equals(protocolConfig.getName())) {
				protocolConfig.setRegister(false);
				map.put("notify", "false");
			}
			// 服务发布 map 中的一些关键配置信息
			/*
			map = {HashMap@6276}  size = 17
			 0 = {HashMap$Node@6516} "side" -> "provider"
			 1 = {HashMap$Node@6517} "default.version" -> "1.0"
			 2 = {HashMap$Node@6518} "methods" -> "sayHello"
			 3 = {HashMap$Node@6519} "dubbo" -> "2.5.6"
			 4 = {HashMap$Node@6520} "threads" -> "500"
			 5 = {HashMap$Node@6521} "pid" -> "21448"
			 6 = {HashMap$Node@6522} "interface" -> "io.ymq.dubbo.api.DemoService"
			 7 = {HashMap$Node@6523} "threadpool" -> "fixed"
			 8 = {HashMap$Node@6524} "generic" -> "false"
			 9 = {HashMap$Node@6525} "default.retries" -> "0"
			 10 = {HashMap$Node@6526} "delay" -> "-1"
			 11 = {HashMap$Node@6527} "application" -> "dubbo-provider"
			 12 = {HashMap$Node@6528} "default.connections" -> "5"
			 13 = {HashMap$Node@6529} "default.delay" -> "-1"
			 14 = {HashMap$Node@6530} "default.timeout" -> "10000"
			 15 = {HashMap$Node@6531} "anyhost" -> "true"
			 16 = {HashMap$Node@6532} "timestamp" -> "1524135271940"
			*/
			// 导出服务
			String contextPath = protocolConfig.getContextpath();
			if ((contextPath == null || contextPath.length() == 0) && provider != null) {
				contextPath = provider.getContextpath();
			}
			URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
			// url
			/*
			dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
			anyhost=true&
			application=dubboprovider&
			default.connections=5&
			default.delay=-1&
			default.retries=0&
			default.timeout=10000&
			default.version=1.0&
			delay=-1&
			dubbo=2.5.6&
			generic=false&
			interface=io.ymq.dubbo.api.DemoService&
			methods=sayHello&
			pid=21448&
			side=provider&
			threadpool=fixed&
			threads=500&
			timestamp=1524135271940
			*/
			
			if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
					.hasExtension(url.getProtocol())) {
				url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
						.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
			}

			String scope = url.getParameter(Constants.SCOPE_KEY);
			//配置为none不暴露
			if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

				//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
				if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
					exportLocal(url);
				}
				//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
				if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
					if (logger.isInfoEnabled()) {
						logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
					}
					if (registryURLs != null && registryURLs.size() > 0
							&& url.getParameter("register", true)) {
						for (URL registryURL : registryURLs) {
							url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
							URL monitorUrl = loadMonitor(registryURL);
							if (monitorUrl != null) {
								url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
							}
							if (logger.isInfoEnabled()) {
								logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
							}
							Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

							Exporter<?> exporter = protocol.export(invoker);
							exporters.add(exporter);
						}
					} else {
						Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

						Exporter<?> exporter = protocol.export(invoker);
						exporters.add(exporter);
					}
				}
			}
			this.urls.add(url);
		}
}

拼装dubbo服务URL

该方法的逻辑是先根据服务配置、协议配置、发布服务的服务器信息、方法列表、dubbo版本等等信息组装成一个发布的URL对象。

主要根据之前map里的数据组装成URL。

例如

dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
anyhost=true&
application=dubbo-provider&
default.connections=5&
default.delay=-1&
default.retries=0&
default.timeout=10000&
default.version=1.0&
delay=-1&
dubbo=2.5.6&
generic=false&
interface=io.ymq.dubbo.api.DemoService&
methods=sayHello&
pid=21448&
side=provider&
threadpool=fixed&
threads=500&
timestamp=1524135271940

本地暴露和远程暴露

  1. 如果服务配置的scope是发布范围,配置为none不暴露服务,则会停止发布操作;
  2. 如果配置不是remote的情况下先做本地暴露,则调用本地暴露exportLocal方法;
  3. 如果配置不是local则暴露为远程服务,则注册服务registryProcotol;
//配置为none不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

	//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
	if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
		exportLocal(url);
	}
	//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
	if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
		省略更多
	}
}

本地暴露服务

@SuppressWarnings({"unchecked", "rawtypes"})
private void exportLocal(URL url) {
	// 入参 URL =
	/*
	dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
	anyhost=true&
	application=dubbo-provider&
	default.connections=5&
	default.delay=-1&
	default.retries=0&
	default.timeout=10000&
	default.version=1.0&
	delay=-1&
	dubbo=2.5.6&
	generic=false&
	interface=io.ymq.dubbo.api.DemoService&
	methods=sayHello&
	pid=11104&
	scope=local&
	side=provider&
	threadpool=fixed&
	threads=500&
	timestamp=1524193840984
	*/
	if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
	
		URL local = URL.valueOf(url.toFullString())
				.setProtocol(Constants.LOCAL_PROTOCOL)
				.setHost(NetUtils.LOCALHOST)
				.setPort(0);
			
	//这时候转成本地暴露的协议 url:			
	/* 
	injvm://127.0.0.1/io.ymq.dubbo.api.DemoService?
	anyhost=true&
	application=dubbo-provider&
	default.connections=5&
	default.delay=-1&
	default.retries=0&
	default.timeout=10000&
	default.version=1.0&
	delay=-1&
	dubbo=2.5.6&
	generic=false&
	interface=io.ymq.dubbo.api.DemoService&
	methods=sayHello&
	pid=11104&
	scope=local&
	side=provider&
	threadpool=fixed&
	threads=500&
	timestamp=1524193840984
	*/
	
	//首先还是先获得Invoker
	//然后导出成Exporter,并缓存
	//这里的proxyFactory实际是JavassistProxyFactory
	//有关详细的获得Invoke以及exporter会在下面的流程解析,在本地暴露这个流程就不再说明。
	
		Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
		exporters.add(exporter);
		logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
	}
}
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();

ServiceConfig.exportLocal(URL url) 方法中对url进行本地暴露,首先将URL的协议更改为了“injvm”、IP更改为了本地端口,端口变更为0。

主要有代理工厂创建Invoker代理、Protocol暴露Invoker从而生成Exporter两个处理逻辑。

  1. 获得代理工厂创建的Invoker代理,调用getInvoker方法,根据URL中的proxy参数选择具体的代理工厂,默认选择JavassistProxyFactory代理工厂进行处理。
  2. 调用protocl.export()方法完成,在protocol对象的export(Invoker)方法中创建的Exporter对象存入ServiceConfig对象的List<Exporter<?>> exporters 属性中,至此本地暴露过程完成。

暴露为远程服务

接下来是暴露为远程服务,跟本地暴露的流程一样还是先获取Invoker,然后导出成Exporter。

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();


//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
	if (logger.isInfoEnabled()) {
		logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
	}
	if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) {
		for (URL registryURL : registryURLs) {
			url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
			URL monitorUrl = loadMonitor(registryURL);
			if (monitorUrl != null) {
				url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
			}
			if (logger.isInfoEnabled()) {
				logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
			}

			// url 参数
			/**
			dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
			anyhost=true&
			application=dubbo-provider&
			default.connections=5&
			default.delay=-1&
			default.retries=0&
			default.timeout=10000&
			default.version=1.0&
			delay=-1&
			dubbo=2.5.6&
			generic=false&
			interface=io.ymq.dubbo.api.DemoService&methods=sayHello&
			monitor=dubbo%3A%2F%2F127.0.0.1%3A2181%2Fcom.alibaba.dubbo.registry.RegistryService%3Fapplication%3Ddubbo-provider%26dubbo%3D2.5.6%26file%3D%2Fdata%2Fdubbo%2Fcache%2Fdubbo-provider%26pid%3D26440%26protocol%3Dregistry%26refer%3Ddubbo%253D2.5.6%2526interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526pid%253D26440%2526timestamp%253D1524212561737%26registry%3Dzookeeper%26timestamp%3D1524212160841&
			pid=26440&
			side=provider&
			threadpool=fixed&
			threads=500&
			timestamp=1524212161760
			
			*/
			
			//根据服务具体实现,实现接口,以及registryUrl通过ProxyFactory将DemoService封装成一个本地执行的Invoker代理
			//invoker是对具体实现的一种代理。
			//这里proxyFactory是上面列出的生成的代码
			
			Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

			//使用Protocol将invoker导出成一个Exporter
			//暴露封装服务invoker
			//调用Protocol生成的适配类的export方法
			//这里的protocol是上面列出的生成的代码
 
			Exporter<?> exporter = protocol.export(invoker);
			exporters.add(exporter);
		}
	} else {
		Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

		Exporter<?> exporter = protocol.export(invoker);
		exporters.add(exporter);
	}
}

关于Invoker,Exporter等的解释参见最下面的内容。

暴露远程服务时的获取Invoker过程

服务实现类转换成Invoker,大概的步骤是:

  1. 根据上面生成的proxyFactory方法调用具体的ProxyFactory实现类的getInvoker方法获取Invoker。
  2. getInvoker的过程是,首先对实现类做一个包装,生成一个包装后的类。
  3. 然后新创建一个Invoker实例,这个Invoker中包含着生成的Wrapper类,Wrapper类中有具体的实现类。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
package com.alibaba.dubbo.rpc.proxy.wrapper;

public class StubProxyFactoryWrapper implements ProxyFactory {

    private final ProxyFactory proxyFactory;

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return proxyFactory.getInvoker(proxy, type, url);
    }
}

这行代码中包含服务实现类转换成Invoker的过程,其中proxyFactory是上面列出的动态生成的代码,其中getInvoker的代码为(做了精简)。

JavassistProxyFactory.java 类的 getInvoker(T proxy, Class<T> type, URL url) 方法。

package com.alibaba.dubbo.rpc.proxy.javassist;

public class JavassistProxyFactory extends AbstractProxyFactory {

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
	
		// TODO Wrapper类不能正确处理带$的类名
		//第一步封装一个包装类(wrapper)
		//该类是手动生成的
		//如果类是以$开头,就使用接口类型获取,其他的使用实现类获取

        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
		
		//返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod
		//关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用
	
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

Wrapper.java 类的 getWrapper(Class<?> c) 方法。

生成包装类(wrapper)的过程,首先看getWrapper方法

package com.alibaba.dubbo.common.bytecode;

public abstract class Wrapper {

	//缓存包装类
    private static final Map<Class<?>, Wrapper> WRAPPER_MAP = new ConcurrentHashMap<Class<?>, Wrapper>(); //class wrapper map
	
	public static Wrapper getWrapper(Class<?> c) {
		while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
			c = c.getSuperclass();
		//Object类型的
		if( c == Object.class )
			return OBJECT_WRAPPER;
		//先去Wrapper缓存中查找 
		// c 等于 class io.ymq.dubbo.provider.service.DemoServiceImpl
		Wrapper ret = WRAPPER_MAP.get(c);
		if( ret == null ) {
			//缓存中不存在,生成Wrapper类,放到缓存
			ret = makeWrapper(c);
			WRAPPER_MAP.put(c,ret);
		}
		return ret;
	}
}

生成完Wrapper以后,返回一个AbstractProxyInvoker实例。至此生成Invoker的步骤就完成了。 可以看到Invoker执行方法的时候,会调用Wrapper的invokeMethod, 这个方法中会有真实的实现类调用真实方法的代码

JavassistProxyFactory.java 类的 getInvoker(T proxy, Class<T> type, URL url) 方法。

package com.alibaba.dubbo.rpc.proxy.javassist;

public class JavassistProxyFactory extends AbstractProxyFactory {

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
	
		// TODO Wrapper类不能正确处理带$的类名
		//第一步封装一个包装类(wrapper)
		//该类是手动生成的
		//如果类是以$开头,就使用接口类型获取,其他的使用实现类获取

        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
		
		//返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod
		//关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用
	
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

暴露远程服务时导出Invoker为Exporter

Invoker导出为Exporter分为两种情况,第一种是Registry类型的Invoker,第二种是其他协议类型的Invoker,分开解析。

代码入口:

Exporter<?> exporter = protocol.export(invoker);

Registry类型的Invoker处理过程

大概的步骤是:

  1. 经过两个不用做任何处理的Wrapper类,然后到达RegistryProtocol中。
  2. 通过具体的协议导出Invoker为Exporter。
  3. 注册服务到注册中心。
  4. 订阅注册中心的服务。
  5. 生成一个新的Exporter实例,将上面的Exporter进行引入,然后返回。

rotocol是上面列出的动态生成的代码,会先调用ProtocolListenerWrapper,这个Wrapper负责初始化暴露和引用服务的监听器。对于Registry类型的不做处理,代码如下:

ProtocolFilterWrapper.java 类的 export(Invoker<T> invoker) 方法

package com.alibaba.dubbo.rpc.protocol;

public class ProtocolFilterWrapper implements Protocol {
	
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
		
		// REGISTRY_PROTOCOL = "registry";
		// 协议为 registry类型的Invoker,不需要做处理
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
		
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
}

消费者端和提供者端形成调用链都调用了同一段代码:buildInvokerChain() 通过last->next的方式统一添加,然后再新生成的Invoker中调用next然后最后调用last就是我们真正执行的Invoker,调用链分类(默认Filter), 代码如下:

ProtocolFilterWrapper.java 类的 buildInvokerChain(final Invoker<T> invoker, String key, String group) 方法。

package com.alibaba.dubbo.rpc.protocol;

public class ProtocolFilterWrapper implements Protocol {

	private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
		Invoker<T> last = invoker;
		 //只获取满足条件的Filter
		List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
		if (filters.size() > 0) {
		
			// filter从最后一个开始依次封装,最终形成一个链,调用顺序为filters的顺序    
			for (int i = filters.size() - 1; i >= 0; i--) {
				final Filter filter = filters.get(i);
				final Invoker<T> next = last;
				last = new Invoker<T>() {

					public Class<T> getInterface() {
						return invoker.getInterface();
					}

					public URL getUrl() {
						return invoker.getUrl();
					}

					public boolean isAvailable() {
						return invoker.isAvailable();
					}

					public Result invoke(Invocation invocation) throws RpcException {
						return filter.invoke(next, invocation);
					}

					public void destroy() {
						invoker.destroy();
					}

					@Override
					public String toString() {
						return invoker.toString();
					}
				};
			}
		}
		return last;
	}
}

RegistryProtocol.java 类的 export(final Invoker<T> originInvoker) 方法:

这里我们先解析的是Registry类型的Invoker,接着就会调用RegistryProtocol的export方法,RegistryProtocol负责注册服务到注册中心和向注册中心订阅服务。代码如下:

package com.alibaba.dubbo.registry.integration;

public class RegistryProtocol implements Protocol {

	public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
		//export invoker
		//这里就交给了具体的协议去暴露服务(先不解析,留在后面,可以先去后面看下导出过程)
		final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
		
		//registry provider
		//根据invoker中的url获取Registry实例
		//并且连接到注册中心
		//此时提供者作为消费者引用注册中心核心服务RegistryService
		final Registry registry = getRegistry(originInvoker);
		
		//注册到注册中心的URL
		final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
		
		//调用远端注册中心的register方法进行服务注册
		//若有消费者订阅此服务,则推送消息让消费者引用此服务。
		//注册中心缓存了所有提供者注册的服务以供消费者发现。
		registry.register(registedProviderUrl);
		
		// 订阅override数据
		// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
		final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
		final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
		overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
		registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
		
		//保证每次export都返回一个新的exporter实例
		//返回暴露后的Exporter给上层ServiceConfig进行缓存,便于后期撤销暴露。
		return new Exporter<T>() {
			public Invoker<T> getInvoker() {
				return exporter.getInvoker();
			}

			public void unexport() {
				try {
					exporter.unexport();
				} catch (Throwable t) {
					logger.warn(t.getMessage(), t);
				}
				try {
					registry.unregister(registedProviderUrl);
				} catch (Throwable t) {
					logger.warn(t.getMessage(), t);
				}
				try {
					overrideListeners.remove(overrideSubscribeUrl);
					registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
				} catch (Throwable t) {
					logger.warn(t.getMessage(), t);
				}
			}
		};
	}
}

交给具体的协议去暴露服务

先不解析,留在后面,可以先去后面看下导出过程,然后再回来接着看注册到注册中心的过程。具体协议暴露服务主要是打开服务器和端口,进行监听。

注册到注册中心

具体的协议进行暴露并且返回了一个ExporterChangeableWrapper之后,接下来看下一步连接注册中心并注册到注册中心,代码是在RegistryProtocol的export方法:

RegistryProtocol.java 类的 export(final Invoker<T> originInvoker) 方法

package com.alibaba.dubbo.registry.integration;

public class RegistryProtocol implements Protocol {

	//省略很多。。。
	
	public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
			
		//此步已经分析完
		final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
		//得到具体的注册中心,连接注册中心,此时提供者作为消费者引用注册中心核心服务RegistryService
		final Registry registry = getRegistry(originInvoker);
		//获取要注册到注册中心的url
		final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
		//调用远端注册中心的register方法进行服务注册
		//若有消费者订阅此服务,则推送消息让消费者引用此服务
		registry.register(registedProviderUrl);
		//提供者向注册中心订阅所有注册服务的覆盖配置
		registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
		//返回暴露后的Exporter给上层ServiceConfig进行缓存
		return new Exporter<T>() {。。。}
	}
}

RegistryProtocol.java 类的 getRegistry(final Invoker<?> originInvoker) 方法,会发现该方法会在 AbstractRegistryFactory.java 类中实现:

package com.alibaba.dubbo.registry.integration;

public class RegistryProtocol implements Protocol {

	 /**
     * 根据invoker的地址获取registry实例
     *
     * @param originInvoker
     * @return
     */
    private Registry getRegistry(final Invoker<?> originInvoker) {
		
		//获取invoker中的registryUrl
        URL registryUrl = originInvoker.getUrl();
		
		if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
			//获取registry的值,这里获得是zookeeper,默认值是dubbo
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
			
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
			
			// registryUrl
			
			/*
			 zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
			 application=dubbo-provider&
			 dubbo=2.5.6&
			 export=dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
			 anyhost=true&
			 application=dubbo-provider&
			 default.connections=5&
			 default.delay=-1&default.retries=0&
			 default.timeout=10000&
			 default.version=1.0&
			 delay=-1&
			 dubbo=2.5.6&
			 generic=false&
			 interface=io.ymq.dubbo.api.DemoService&
			 methods=sayHello&
			
			 monitor=dubbo://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService%3F
			 application=dubbo-provider&
			 dubbo=2.5.6&
			 file=/data/dubbo/cache/dubbo-provider&
			 pid=3D29100%26
			 protocol%3D
			 registry%26refer%3D
			 dubbo%253D2.5.6%2526
			 interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526
			 pid%253D29100%2526
			 timestamp%253D1524465704266%26
			 registry%3Dzookeeper%26
			 timestamp%3D1524465673571&
            
			 pid=29100&
			 side=provider&threadpool=fixed&threads=500&
			 timestamp=1524465674601&
			 file=/data/dubbo/cache/dubbo-provider&
			 pid=29100&
			 timestamp=1524465673571
			 */

        }
		
		//根据SPI机制获取具体的Registry实例,这里获取到的是ZookeeperRegistry
        return registryFactory.getRegistry(registryUrl);
    }
}

先看下 AbstractRegistryFactory.java 类的 getRegistry(URL url) 方法。

package com.alibaba.dubbo.registry.support;

public abstract class AbstractRegistryFactory implements RegistryFactory {

	public Registry getRegistry(URL url) {
		url = url.setPath(RegistryService.class.getName())
				 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
				 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
		// 入参url =	 
		// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
		// application=dubbo-provider&
		// dubbo=2.5.6&
		// file=/data/dubbo/cache/dubbo-provider&
		// interface=com.alibaba.dubbo.registry.RegistryService&
		// pid=30112&
		// timestamp=1524469125241

		//这里key为:
		// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService
		String key = url.toServiceString();
		// 锁定注册中心获取过程,保证注册中心单一实例
		LOCK.lock();
		try {
			//先从缓存中获取Registry实例
			Registry registry = REGISTRIES.get(key);
			if (registry != null) {
				return registry;
			}
			
			
			//创建registry,会直接new一个ZookeeperRegistry返回
			
			//********具体创建实例是子类来实现的********
			registry = createRegistry(url);
			if (registry == null) {
				throw new IllegalStateException("Can not create registry " + url);
			}
		   // 创建后的registry 
		   
			// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
			// application=dubbo-provider&
			// dubbo=2.5.6&
			// file=/data/dubbo/cache/dubbo-provider&
			// interface=com.alibaba.dubbo.registry.RegistryService&
			// pid=30344&
			// timestamp=1524473152100
			
			//放到缓存中
			REGISTRIES.put(key, registry);
			return registry;
		} finally {
			// 释放锁
			LOCK.unlock();
		}
	}
}

ZookeeperRegistryFactory.java 类的 createRegistry(URL url) 方法

方法是在子类中实现的,这里的子类是ZookeeperRegistry.java。

首先需要经过AbstractRegistry 类的构造方法。AbstractRegistry类的构造器初始化完,接着调用FailbackRegistry类的构造器初始化,最后回到ZookeeperRegistry类的构造初始化。

package com.alibaba.dubbo.registry.zookeeper;

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

ZookeeperRegistry.java 类的 ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) 构造方法

package com.alibaba.dubbo.registry.zookeeper;

public class ZookeeperRegistry extends FailbackRegistry {

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
	
	    super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
	}
}

FailbackRegistry.java 类的 AbstractRegistry 构造器初始化完,接着调用 FailbackRegistry 类的 FailbackRegistry(URL url) 构造器初始化:

package com.alibaba.dubbo.registry.support;

public abstract class FailbackRegistry extends AbstractRegistry {

	 public FailbackRegistry(URL url) {
        super(url);
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // 检测并连接注册中心
                try {
                    retry();
                } catch (Throwable t) { // 防御性容错
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
}

AbstractRegistry.java 类的 AbstractRegistry(URL url) 构造器初始化完,接着调用 FailbackRegistry 类的 FailbackRegistry(URL url) 构造器初始化:

package com.alibaba.dubbo.registry.support;

public abstract class AbstractRegistry implements Registry {

	public AbstractRegistry(URL url) {
		// 保存url
		// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
		// application=dubbo-provider&
		// dubbo=2.5.6&
		// file=/data/dubbo/cache/dubbo-provider&
		// interface=com.alibaba.dubbo.registry.RegistryService&
		// pid=28536&
		// timestamp=1524471115982

		setUrl(url);
		// 启动文件保存定时器
		
		syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
		String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
		// 保存的文件为:/data/dubbo/cache/dubbo-provider
		File file = null;
		if (ConfigUtils.isNotEmpty(filename)) {
			file = new File(filename);
			if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
				if (!file.getParentFile().mkdirs()) {
					throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
				}
			}
		}
		this.file = file;
		// 加载文件中的属性
		loadProperties();
		
		//通知订阅
		notify(url.getBackupUrls());
	}
	
}

AbstractRegistry.java 类的 notify(List<URL> urls) 方法:

package com.alibaba.dubbo.registry.support;

public abstract class AbstractRegistry implements Registry {

	protected void notify(List<URL> urls) {
		if(urls == null || urls.isEmpty()) return;
		//getSubscribed()方法获取订阅者列表
		//订阅者Entry里每个URL都对应着n个NotifyListener
		for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
			URL url = entry.getKey();

			if(! UrlUtils.isMatch(url, urls.get(0))) {
				continue;
			}

			Set<NotifyListener> listeners = entry.getValue();
			if (listeners != null) {
				for (NotifyListener listener : listeners) {
					try {
						//通知每个监听器
						notify(url, listener, filterEmpty(url, urls));
					} catch (Throwable t) {}
				}
			}
		}
	}
}

通知每个监听器

AbstractRegistry.java 类的 notify(URL url, NotifyListener listener, List<URL> urls) 方法:

package com.alibaba.dubbo.registry.support;

public abstract class AbstractRegistry implements Registry {

	protected void notify(URL url, NotifyListener listener, List<URL> urls) {
		
		// 省略。。。
		
		Map<String, List<URL>> result = new HashMap<String, List<URL>>();
		for (URL u : urls) {
			if (UrlUtils.isMatch(url, u)) {
			    //分类
				String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
				List<URL> categoryList = result.get(category);
				if (categoryList == null) {
					categoryList = new ArrayList<URL>();
					result.put(category, categoryList);
				}
				categoryList.add(u);
			}
		}
		if (result.size() == 0) {
			return;
		}
		Map<String, List<URL>> categoryNotified = notified.get(url);
		if (categoryNotified == null) {
			notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
			categoryNotified = notified.get(url);
		}
		for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
			String category = entry.getKey();
			List<URL> categoryList = entry.getValue();
			categoryNotified.put(category, categoryList);
			
			//保存到主目录下的.dubbo目录下
			saveProperties(url);
			//上面获取到的监听器进行通知
			listener.notify(categoryList);
		}
	}
}

FailbackRegistry.java 类的 AbstractRegistry 类的构造器初始化完,接着调用 FailbackRegistry 类的构造器初始化:

public abstract class FailbackRegistry extends AbstractRegistry {

	public FailbackRegistry(URL url) {
		super(url);
		//重试时间,默认5000ms
		int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
		//启动失败重试定时器
		this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
			public void run() {
				// 检测并连接注册中心
				try {
					//获取到注册失败的,然后尝试注册
					retry();
				} catch (Throwable t) { // 防御性容错
					logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
				}
			}
		}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
	}
}

最后回到 ZookeeperRegistry.java 类的构造初始化:

package com.alibaba.dubbo.registry.zookeeper;

public class ZookeeperRegistry extends FailbackRegistry {
		
	public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
		super(url);
		if (url.isAnyHost()) {
			throw new IllegalStateException("registry address == null");
		}
		//获得到注册中心中的分组,默认dubbo
		String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
		if (!group.startsWith(Constants.PATH_SEPARATOR)) {
			group = Constants.PATH_SEPARATOR + group;
		}
		
		//注册到注册中心的节点
		this.root = group;
		
		//使用zookeeperTansporter去连接
		//ZookeeperTransport这里是生成的自适应实现,默认使用ZkClientZookeeperTransporter
		//ZkClientZookeeperTransporter的connect去实例化一个ZkClient实例
		//并且订阅状态变化的监听器subscribeStateChanges
		//然后返回一个ZkClientZookeeperClient实例
		zkClient = zookeeperTransporter.connect(url);
		
		//ZkClientZookeeperClient添加状态改变监听器
		zkClient.addStateListener(new StateListener() {
			public void stateChanged(int state) {
				if (state == RECONNECTED) {
					try {
						recover();
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
				}
			}
		});
	}
}

注册到注册中心 部分重要代码

package com.alibaba.dubbo.registry.integration;

public class RegistryProtocol implements Protocol {

	public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
		//export invoker
		final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
		//registry provider
		final Registry registry = getRegistry(originInvoker);

		//取要注册到注册中心的url
		final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

		
		// URL 下面
		/*
		dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
		anyhost=true&
		application=dubbo-provider&
		default.connections=5&
		default.delay=-1&
		default.retries=0&
		default.timeout=10000&
		default.version=1.0&
		delay=-1&
		dubbo=2.5.6&
		generic=false&
		interface=io.ymq.dubbo.api.DemoService&
		methods=sayHello&
		pid=30344&
		side=provider&
		threadpool=fixed&
		threads=500&
		timestamp=1524473153090
		*/
	
	   // 省略很多。。。
		   
	   return new Exporter<T>() {
	   // 省略很多。。。
	   }
		 
	}
}

获取到了Registry,Registry实例中保存着连接到了zookeeper的zkClient实例之后,下一步获取要注册到注册中心的url(在RegistryProtocol中)。

取要注册到注册中心的url 的 RegistryProtocol.java 类的 getRegistedProviderUrl(final Invoker<?> originInvoker) 方法

package com.alibaba.dubbo.registry.integration;

public class RegistryProtocol implements Protocol {
	/**
	 * 返回注册到注册中心的URL,对URL参数进行一次过滤
	 *
	 * @param originInvoker
	 * @return
	 */
	private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
		URL providerUrl = getProviderUrl(originInvoker);
		//注册中心看到的地址
		final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY);
		return registedProviderUrl;
	}
}

然后调用 registry.register(registedProviderUrl) 注册到注册中心(在RegistryProtocol中)。register方法的实现在FailbackRegistry中:

FailbackRegistry.java 类的 register(URL url) 方法:

package com.alibaba.dubbo.registry.support;

public abstract class FailbackRegistry extends AbstractRegistry {

    @Override
    public void register(URL url) {
        if (destroyed.get()){
            return;
        }
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 向服务器端发送注册请求
             (url);
        } catch (Exception e) {
            Throwable t = e;

            // 如果开启了启动时检测,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // 将失败的注册请求记录到失败列表,定时重试
            failedRegistered.add(url);
        }
    }
}

doRegister(url) ;在这里是 ZookeeperRegistry 中具体实现的,这里将会注册到注册中心:

ZookeeperRegistry.java 类的 doRegister(URL url) 方法

package com.alibaba.dubbo.registry.zookeeper;

public class ZookeeperRegistry extends FailbackRegistry {
	
	protected void doRegister(URL url) {
		
		try {
			
			//这里zkClient就是我们上面调用构造的时候生成的
			//ZkClientZookeeperClient
			//保存着连接到Zookeeper的zkClient实例
			//开始注册,也就是在Zookeeper中创建节点
			
			//这里toUrlPath获取到的path为:
			
		
			/**
			/dubbo
				/io.ymq.dubbo.api.DemoService
					/providers
						/dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService?
						anyhost=true&
						application=dubbo-provider&
						default.connections=5&
						default.delay=-1&
						default.retries=0&
						default.timeout=10000&
						default.version=1.0&
						delay=-1&
						dubbo=2.5.6&
						generic=false&
						interface=io.ymq.dubbo.api.DemoService&
						methods=sayHello&
						pid=16856&
						side=provider&
						threadpool=fixed&
						threads=500&
						timestamp=1525419864880
			**/
			//默认创建的节点是临时节点 
			zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
		} catch (Throwable e) {
			throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
		}
	}
}

经过这一步之后,Zookeeper中就有节点存在了,具体节点为:

/dubbo
	/io.ymq.dubbo.api.DemoService
		/providers
			/dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService?
			anyhost=true&
			application=dubbo-provider&
			default.connections=5&
			default.delay=-1&
			default.retries=0&
			default.timeout=10000&
			default.version=1.0&
			delay=-1&
			dubbo=2.5.6&
			generic=false&
			interface=io.ymq.dubbo.api.DemoService&
			methods=sayHello&
			pid=16856&
			side=provider&
			threadpool=fixed&
			threads=500&
			timestamp=1525419864880

订阅注册中心的服务

在注册到注册中心之后,registry会去订阅覆盖配置的服务,这一步之后就会在 /dubbo/io.ymq.dubbo.api.DemoService 节点下多一个 configurators 节点。(具体过程暂先不解析)。

返回新Exporter实例

最后返回Exporter新实例,返回到ServiceConfig中。服务的发布就算完成了。

交给具体的协议进行服务暴露

这里也就是非Registry类型的Invoker的导出过程。主要的步骤是将本地ip和20880端口打开,进行监听。最后包装成exporter返回。

RegistryProtocol.java 类的 doLocalExport(invoker) 方法

package com.alibaba.dubbo.registry.integration;

public class RegistryProtocol implements Protocol {

    //用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露
    //providerurl <--> exporter
    private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
	
	
    @SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
	
		//原始的invoker中的url:
		
		/*
		registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
		application=dubbo-provider&dubbo=2.5.6&
		export=dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService?
		anyhost=true&
		application=dubbo-provider&
		default.connections=5&
		default.delay=-1&
		default.retries=0&
		default.timeout=10000&
		default.version=1.0&
		delay=-1&
		dubbo=2.5.6&
		generic=false&
		interface=io.ymq.dubbo.api.DemoService&
		methods=sayHello&
		
		monitor=dubbo%3A%2F%2F127.0.0.1%3A2181%2F
		com.alibaba.dubbo.registry.RegistryService%3Fapplication%3D
		dubbo-provider%26dubbo%3D2.5.6%26file%3D%2Fdata%2Fdubbo%2F
		cache%2Fdubbo-provider%26pid%3D2972%26protocol%3D
		registry%26refer%3Ddubbo%253D2.5.6%2526interface%253D
		com.alibaba.dubbo.monitor.MonitorService%2526
		pid%253D2972%2526timestamp%253D1525421418972%26
		registry%3Dzookeeper%26timestamp%3D1525421395956&
		pid=2972&
		side=provider&
		threadpool=fixed&
		threads=500&
		timestamp=1525421396922&
		file=/data/dubbo/cache/dubbo-provider&pid=2972&
		registry=zookeeper&
		timestamp=1525421395956
	
		//从原始的invoker中得到的key:
		
		dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService?
		anyhost=true&
		application=dubbo-provider&
		default.connections=5&
		default.delay=-1&
		default.retries=0&
		default.timeout=10000&
		default.version=1.0&
		delay=-1&
		dubbo=2.5.6&
		generic=false&
		interface=io.ymq.dubbo.api.DemoService&
		methods=sayHello&
		
		monitor=dubbo://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
		application=dubbo-provider&
		dubbo=2.5.6&
		file=/data/dubbo/cache/dubbo-provider&
		pid=4668&
		protocol=registry&
		refer=dubbo%3D2.5.6%26
		interface%3Dcom.alibaba.dubbo.monitor.MonitorService%26pid%3D4668%26
		timestamp%3D1525422343947&
		registry=zookeeper&
		timestamp=1525422334635&
		pid=4668&
		side=provider&
		threadpool=fixed&
		threads=500&
		timestamp=1525422335033
	
		*/
		
        String key = getCacheKey(originInvoker);
		
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {

					//得到一个Invoker代理,里面包含原来的Invoker
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
					
					//此处protocol还是最上面生成的代码,调用代码中的export方法,会根据协议名选择调用具体的实现类
					//这里我们需要调用DubboProtocol的export方法
					//这里的使用具体协议进行导出的invoker是个代理invoker
					//导出完之后,返回一个新的ExporterChangeableWrapper实例
					
					exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
					
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
}

这里protocol.export(invokerDelegete)就要去具体的 DubboProtocol.java 中执行了, DubboProtocol.java 的外面包裹着 ProtocolFilterWrapper.java ,再外面还包裹着 ProtocolListenerWrapper.java 。会先经过 ProtocolListenerWrapper.java

  1. 会先经过 ProtocolListenerWrapper.java 类的 export(Invoker<T> invoker) 方法
package com.alibaba.dubbo.rpc.protocol;

public class ProtocolListenerWrapper implements Protocol {

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

}
  1. 再经过 ProtocolFilterWrapper.java 类的 export(Invoker<T> invoker) 方法
package com.alibaba.dubbo.rpc.protocol;
	
public class ProtocolFilterWrapper implements Protocol {
	
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
		
		// REGISTRY_PROTOCOL = "registry";
		// 协议为 registry类型的Invoker,不需要做处理
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
		
		 //其他具体协议类型的Invoker
		//先构建Filter链,然后再导出
	
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
}

消费者端和提供者端形成调用链都调用了同一段代码:buildInvokerChain() 通过last->next的方式统一添加,然后再新生成的Invoker中调用next然后最后调用last就是我们真正执行的Invoker,调用链分类(默认Filter)

ProtocolFilterWrapper.java 类的 buildInvokerChain(final Invoker<T> invoker, String key, String group) 方法 生成新的 Invoker

package com.alibaba.dubbo.rpc.protocol;

public class ProtocolFilterWrapper implements Protocol {

	private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
		
		//我们要处理的那个Invoker作为处理链的最后一个
		Invoker<T> last = invoker;
		//根据key和group获取自动激活的Filter
		List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
		if (filters.size() > 0) {
		
			// 把所有的过滤器都挨个连接起来,最后一个是我们真正的Invoker,最终形成一个链,调用顺序为filters的顺序    
			for (int i = filters.size() - 1; i >= 0; i--) {
				final Filter filter = filters.get(i);
				final Invoker<T> next = last;
				last = new Invoker<T>() {

					public Class<T> getInterface() {
						return invoker.getInterface();
					}

					public URL getUrl() {
						return invoker.getUrl();
					}

					public boolean isAvailable() {
						return invoker.isAvailable();
					}

					public Result invoke(Invocation invocation) throws RpcException {
						return filter.invoke(next, invocation);
					}

					public void destroy() {
						invoker.destroy();
					}

					@Override
					public String toString() {
						return invoker.toString();
					}
				};
			}
		}
		return last;
	}
}

接着就到了 DubboProtocol.java 类的 export(Invoker<T> invoker) 方法,这里进行暴露服务:

package com.alibaba.dubbo.rpc.protocol.dubbo;

public class DubboProtocol extends AbstractProtocol {
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
	
        URL url = invoker.getUrl();
		
		/*
		入参: url
		
		dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService?
		anyhost=true&
		application=dubbo-provider&
		default.connections=5&
		default.delay=-1&
		default.retries=0&
		default.timeout=10000&
		default.version=1.0&
		delay=-1&
		dubbo=2.5.6&
		generic=false&
		interface=io.ymq.dubbo.api.DemoService&methods=sayHello&
		
		monitor=dubbo://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
		application=dubbo-provider&
		dubbo=2.5.6&
		file=/data/dubbo/cache/dubbo-provider&
		pid=13576&
		protocol=registry&
		refer=dubbo%3D2.5.6%26interface%3Dcom.alibaba.dubbo.monitor.MonitorService%26pid%3D13576%26timestamp%3D1525425274459&
		registry=zookeeper&
		timestamp=1525425266251&
        
		pid=13576&
		side=provider&
		threadpool=fixed&
		threads=500&
		timestamp=1525425266793
		*/

        // export service.
		
		//key 由serviceName,port,version,group组成
		//当nio客户端发起远程调用时,nio服务端通过此key来决定调用哪个Exporter,也就是执行的Invoker。
		
		// io.ymq.dubbo.api.DemoService:1.0:20880
		
        String key = serviceKey(url);
		
        //将Invoker转换成Exporter
		//直接new一个新实例
		//没做啥处理,就是做一些赋值操作
		//这里的exporter就包含了invoker
		DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
		
		//缓存要暴露的服务,key是上面生成的
        exporterMap.put(key, exporter);

        //export an stub service for dispaching event
		
		//是否支持本地存根
		//远程服务后,客户端通常只剩下接口,而实现全在服务器端,
		//但提供方有些时候想在客户端也执行部分逻辑,比如:做ThreadLocal缓存,
		//提前验证参数,调用失败后伪造容错数据等等,此时就需要在API中带上Stub,
		//客户端生成Proxy实,会把Proxy通过构造函数传给Stub,
		//然后把Stub暴露组给用户,Stub可以决定要不要去调Proxy。
		
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

		//根据URL绑定IP与端口,建立NIO框架的Server
        openServer(url);

        return exporter;
    }
}

上面得到的Exporter会被放到缓存中去,key就是上面生成的,客户端就可以发请求根据key找到Exporter,然后找到invoker进行调用了。接下来是创建服务器并监听端口。

接着调用openServer方法创建NIO Server进行监听:

DubboProtocol.java 类的 openServer(URL url) 方法

package com.alibaba.dubbo.rpc.protocol.dubbo;

public class DubboProtocol extends AbstractProtocol {

    private void openServer(URL url) {
        // find server.
		 //key是IP:PORT
		//169.254.8.253:20880
        String key = url.getAddress();
		
        //client 也可以暴露一个只有server可以调用的服务。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
			
			//同一JVM中,同协议的服务,共享同一个Server,
			//第一个暴露服务的时候创建server,
			//以后相同协议的服务都使用同一个server
			
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
				
				//同协议的服务后来暴露服务的则使用第一次创建的同一Server
				//accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化
				//这时需要重新设置Server
				
                server.reset(url);
            }
        }
    }
}

继续看createServer方法:

DubboProtocol.java 类的 createServer(URL url) 方法

package com.alibaba.dubbo.rpc.protocol.dubbo;

public class DubboProtocol extends AbstractProtocol {

    private ExchangeServer createServer(URL url) {
	
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
		//默认使用netty
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
		    //Exchangers是门面类,里面封装的是Exchanger的逻辑。
			//Exchanger默认只有一个实现HeaderExchanger.
			//Exchanger负责数据交换和网络通信。
			//从Protocol进入Exchanger,标志着程序进入了remote层。
			//这里requestHandler是ExchangeHandlerAdapter
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
	}
}

Exchangers.java 类的 ExchangeServer bind(URL url, ExchangeHandler handler) 方法:

package com.alibaba.dubbo.remoting.exchange;

public class Exchangers {
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
		 //getExchanger方法根据url获取到一个默认的实现HeaderExchanger
		//调用HeaderExchanger的bind方法
	
        return getExchanger(url).bind(url, handler);
    }
}

HeaderExchanger.java 类的 bind(URL url, ExchangeHandler handler) 方法:

package com.alibaba.dubbo.remoting.exchange.support.header;

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
		//直接返回一个HeaderExchangeServer
		//先创建一个HeaderExchangeHandler
		//再创建一个DecodeHandler
		//最后调用Transporters.bind
	
		return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

这里会先创建一个 HeaderExchangerHandler ,包含着ExchangeHandlerAdapter,接着创建一个DecodeHandler,会包含前面的handler,接下来调用Transporters的bind方法,返回一个Server,接着用HeaderExchangeServer包装一下,就返回给Protocol层了。

在HeaderExchangerServer包装的时候会启动心跳定时器startHeatbeatTimer();,暂不解析。

Transporters.java 类的 bind(URL url, ChannelHandler... handlers) 方法

package com.alibaba.dubbo.remoting;

public class Transporters {
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
		// 省略。。。
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
		   //如果有多个handler的话,需要使用分发器包装下
            handler = new ChannelHandlerDispatcher(handlers);
        }
		//getTransporter()获取一个Adaptive的Transporter
		//然后调用bind方法(默认是NettyTransporter的bind方法)
        return getTransporter().bind(url, handler);
    }
	
	
    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
	
}

NettyTransporter.java 类的 bind(URL url, ChannelHandler listener) 方法:

package com.alibaba.dubbo.remoting.transport.netty4;

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty4";
    
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
		//创建一个Server
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

NettyServer.java 类的 NettyServer(URL url, ChannelHandler handler) 构造方法:

package com.alibaba.dubbo.remoting.transport.netty4;

public class NettyServer extends AbstractServer implements Server {

   // 省略。。。

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
		//handler先经过ChannelHandlers的包装方法
		//然后再初始化
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

ChannelHandlers.wrap 方法中会根据SPI扩展机制动态生成Dispatcher的自适应类,生成的代码不在列出,默认使用AllDispatcher处理,会返回一个AllChannelHandler,会把线程池和DataStore都初始化了。然后经过HeartbeatHandler封装,再经过MultiMessageHandler封装后返回。

NettyServer构造,会依次经过AbstractPeer,AbstractEndpoint,AbstractServer,NettyServer的初始化。重点看下AbstractServer的构造方法:

AbstractServer.java 类的 AbstractServer(URL url, ChannelHandler handler) 构造方法

package com.alibaba.dubbo.remoting.transport;

public abstract class AbstractServer extends AbstractEndpoint implements Server {
	
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false)
                || NetUtils.isInvalidLocalHost(getUrl().getHost())
                ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
		
			//初始化的时候会打开Server
			//具体实现这里是NettyServer中
			
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
}

然后调用doOpen方法:

NettyServer.java 类的 doOpen() 方法

package com.alibaba.dubbo.remoting.transport.netty4;

public class NettyServer extends AbstractServer implements Server {
	
	protected void doOpen() throws Throwable {
		NettyHelper.setNettyLoggerFactory();
		//boss线程池
		ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
		//worker线程池
		ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
		//ChannelFactory,没有指定工作者线程数量,就使用cpu+1
		ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
		bootstrap = new ServerBootstrap(channelFactory);

		final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
		channels = nettyHandler.getChannels();
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			public ChannelPipeline getPipeline() {
				NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
				ChannelPipeline pipeline = Channels.pipeline();
				pipeline.addLast("decoder", adapter.getDecoder());
				pipeline.addLast("encoder", adapter.getEncoder());
				pipeline.addLast("handler", nettyHandler);
				return pipeline;
			}
		});
		// bind之后返回一个Channel
		channel = bootstrap.bind(getBindAddress());
	}
}

doOpen方法创建Netty的Server端并打开,具体的事情就交给Netty去处理了,Netty的过程,原理,代码有时间再另行研究。

  • NIO框架接受到消息后,先由NettyCodecAdapter解码,再由NettyHandler处理具体的业务逻辑,再由NettyCodecAdapter编码后发送。
  • NettyServer既是Server又是Handler。
  • HeaderExchangerServer只是Server。
  • MultiMessageHandler是多消息处理Handler。
  • HeartbeatHandler是处理心跳事件的Handler。
  • AllChannelHandler是消息派发器,负责将请求放入线程池,并执行请求。
  • DecodeHandler是编解码Handler。
  • HeaderExchangerHandler是信息交换Handler,将请求转化成请求响应模式与同步转异步模式。
  • RequestHandler是最后执行的Handler,会在协议层选择Exporter后选择Invoker,进而执行Filter与Invoker,最终执行请求服务实现类方法。
  • Channel直接触发事件并执行Handler,Channel在有客户端连接Server的时候触发创建并封装成NettyChannel,再由HeaderExchangerHandler创建HeaderExchangerChannel,负责请求响应模式的处理。
  • NettyChannel其实是个Handler,HeaderExchangerChannel是个Channel,
  • 消息的序列化与反序列化工作在NettyCodecAdapter中发起完成。

当有客户端连接Server时的连接过程:

  • NettyHandler.connected()
  • NettyServer.connected()
  • MultiMessageHandler.connected()
  • HeartbeatHandler.connected()
  • AllChannelHandler.connected()
  • DecodeHandler.connected()
  • HeaderExchangerHandler.connected()
  • requestHandler.connected()
  • 执行服务的onconnect事件的监听方法

名词解释

Invoker

可执行的对象,执行具体的远程调用,能够根据方法名称,参数得到相应的执行结果。

Invocation,包含了需要执行的方法,参数等信息。目前实现类只有RpcInvocation。

有三种类型的Invoker:

  • 本地执行类的Invoker。
  • 远程通信执行类的Invoker。
  • 多个远程通信执行类的Invoker聚合成集群版的Invoker。

以sayHello为例:

  • 本地执行类的Invoker:在Server端有DemoServiceImpl实现,要执行该接口,只需要通过反射执行对应的实现类即可。
  • 远程通信执行类的Invoker:在Client端要想执行该接口的实现方法,需要先进行远程通信,发送要执行的参数信息给Server端,Server端利用本地执行Invoker的方式执行,最后将结果发送给Client。
  • 集群版的Invoker:Client端使用的时候,通过集群版的Invoker操作,Invoker会挑选一个远程通信类型的Invoker来执行。

提供者端的Invoker封装了服务实现类,URL,Type,状态都是只读并且线程安全。通过发起invoke来具体调用服务类。

ProxyFactory

在服务提供者端,ProxyFactory主要服务的实现统一包装成一个Invoker,Invoker通过反射来执行具体的Service实现对象的方法。默认的实现是JavassistProxyFactory,代码如下:

JavassistProxyFactory.java 类的 getInvoker(T proxy, Class<T> type, URL url) 方法

package com.alibaba.dubbo.rpc.proxy.javassist;

public class JavassistProxyFactory extends AbstractProxyFactory {

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
	
		// TODO Wrapper类不能正确处理带$的类名
		//第一步封装一个包装类(wrapper)
		//该类是手动生成的
		//如果类是以$开头,就使用接口类型获取,其他的使用实现类获取

        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
		
		//返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod
		//关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用
	
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

Protocol

服务地址的发布和订阅。

Protocol是dubbo中的服务域,只在服务启用时加载,无状态,线程安全,是实体域Invoker暴露和引用的主功能入口,负责Invoker的生命周期管理,是Dubbo中远程服务调用层。

Protocol根据指定协议对外公布服务,当客户端根据协议调用这个服务时,Protocol会将客户端传递过来的Invocation参数交给Invoker去执行。

Protocol加入了远程通信协议,会根据客户端的请求来获取参数Invocation。

package com.alibaba.dubbo.rpc;

@Extension("dubbo")
public interface Protocol {

    int getDefaultPort();

    //对于服务提供端,将本地执行类的Invoker通过协议暴漏给外部
    //外部可以通过协议发送执行参数Invocation,然后交给本地Invoker来执行
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    //这个是针对服务消费端的,服务消费者从注册中心获取服务提供者发布的服务信息
    //通过服务信息得知服务提供者使用的协议,然后服务消费者仍然使用该协议构造一个Invoker。这个Invoker是远程通信类的Invoker。
    //执行时,需要将执行信息通过指定协议发送给服务提供者,服务提供者接收到参数Invocation,然后交给服务提供者的本地Invoker来执行
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();

}

关于RegistryProtocol和DubboProtocol的疑惑

以下是官方文档说明 :

暴露服务:

(1) 只暴露服务端口:

在没有注册中心,直接暴露提供者的情况下,即:

<dubbo:service regisrty="N/A" /> or <dubbo:registry address="N/A" />

ServiceConfig解析出的URL的格式为:

dubbo://service-host/com.foo.FooService?version=1.0.0

基于扩展点的Adaptive机制,通过URL的”dubbo://”协议头识别,直接调用DubboProtocol的export()方法,打开服务端口。

(2) 向注册中心暴露服务:

在有注册中心,需要注册提供者地址的情况下,即:

<dubbo:registry address="zookeeper://10.20.153.10:2181" />

ServiceConfig解析出的URL的格式为:

registry://registry-host/com.alibaba.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0")

基于扩展点的Adaptive机制,通过URL的”registry://”协议头识别,就会调用RegistryProtocol的export()方法,将export参数中的提供者URL,先注册到注册中心,再重新传给Protocol扩展点进行暴露:

dubbo://service-host/com.foo.FooService?version=1.0.0

基于扩展点的Adaptive机制,通过提供者URL的”dubbo://”协议头识别,就会调用DubboProtocol的export()方法,打开服务端口。

RegistryProtocol,注册中心协议集成,装饰真正暴露引用服务的协议,增强注册发布功能。

ServiceConfig中的protocol是被多层装饰的Protocol,是DubboProtocol+RegistryProtocol+ProtocolListenerWrapper+ProtocolFilterWrapper。

  • ProtocolFilterWrapper负责初始化invoker所有的Filter。
  • ProtocolListenerWrapper负责初始化暴露或引用服务的监听器。
  • RegistryProtocol负责注册服务到注册中心和向注册中心订阅服务。
  • DubboProtocol负责服务的具体暴露与引用,也负责网络传输层,信息交换层的初始化,以及底层NIO框架的初始化。

Exporter

负责invoker的生命周期,包含一个Invoker对象,可以撤销服务。

Exchanger

负责数据交换和网络通信的组件。每个Invoker都维护了一个ExchangeClient的 引用,并通过它和远端server进行通信。

原文  https://juejin.im/post/5af322c5518825670c45eed8
正文到此结束
Loading...