作者:大程熙
地址:cxis.me/2017/02/19/…
示例项目: github.com/souyunku/sp…
Dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的ServiceBean类进行回调onApplicationEvent 事件方法,dubbo会在这个方法中调用ServiceBean父类ServiceConfig的export方法,而该方法真正实现了服务的(异步或者非异步)发布。
Spring容器在启动的时候,会读取到Spring默认的一些schema以及Dubbo自定义的schema,每个schema都会对应一个自己的NamespaceHandler,NamespaceHandler里面通过BeanDefinitionParser来解析配置信息并转化为需要加载的bean对象。
遇到dubbo名称空间 ,首先会调用DubboNamespaceHandler类的 init方法 进行初始化操作。
根据命名空间去获取具体的处理器NamespaceHandler。那具体的处理器是在哪定义的呢,在” META-INF/spring.handlers
”文件中,Spring在会自动加载该文件中所有内容。
META-INF/spring.handlers
http/://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
META-INF/spring.schemas
http/://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
根据不同的XML节点,会委托NamespaceHandlerSupport 类找出合适的BeanDefinitionParser,其中Dubbo所有的标签都使用 DubboBeanDefinitionParser进行解析,基于一对一属性映射,将XML标签解析为Bean对象。
DubboNamespaceHandler.java
类代码如下
package com.alibaba.dubbo.config.spring.schema; import org.springframework.beans.factory.xml.NamespaceHandlerSupport; public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }
由于DubboBeanDefinitionParser 类中 parse转换的过程代码还是比较复杂,只抽离出来bean的注册这一块的代码如下
DubboBeanDefinitionParser.java
类代码如下
package com.alibaba.dubbo.config.spring.schema; public class DubboBeanDefinitionParser implements BeanDefinitionParser { @SuppressWarnings("unchecked") private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) { RootBeanDefinition beanDefinition = new RootBeanDefinition(); beanDefinition.setBeanClass(beanClass); beanDefinition.setLazyInit(false); String id = element.getAttribute("id"); //省略...... if(id != null && id.length() > 0) { if(parserContext.getRegistry().containsBeanDefinition(id)) { throw new IllegalStateException("Duplicate spring bean id " + id); } //registerBeanDefinition 注册Bean的定义 //具体的id如下 applicationProvider.xml解析后的显示 id, //如id="dubbo_provider" beanDefinition = "ApplicationConfig" parserContext.getRegistry().registerBeanDefinition(id, beanDefinition); beanDefinition.getPropertyValues().addPropertyValue("id", id); } } }
通过DubboBeanDefinitionParser 类的 parse方法会将class信息封装成BeanDefinition,然后将BeanDefinition再放进DefaultListableBeanFactory的beanDefinitionMap中。
最后通过Spring bean 的加载机制进行加载。
Dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的ServiceBean类进行回调onApplicationEvent 事件方法,dubbo会在这个方法中调用ServiceBean父类ServiceConfig的export方法,而该方法真正实现了服务的(异步或者非异步)发布。
由服务配置类 ServiceConfig 进行初始化工作及服务暴露入口,首先进去执行该类的export()方法。
ServiceConfig.java
类的 export
方法
export方法先判断是否需要延迟暴露(这里我们使用的是不延迟暴露),然后执行doExport方法。
doExport方法先执行一系列的检查方法,然后调用doExportUrls方法。检查方法会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。
doExportUrls方法先调用loadRegistries获取所有的注册中心url,然后遍历调用doExportUrlsFor1Protocol方法。对于在标签中指定了registry属性的Bean,会在加载BeanDefinition的时候就加载了注册中心。
ServiceConfig.java
类的 export
方法
package com.alibaba.dubbo.config; public class ServiceConfig<T> extends AbstractServiceConfig { public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
可以看出发布发布是支持延迟暴露发布服务的,这样可以用于当我们发布的服务非常多,影响到应用启动的问题,前提是应用允许服务发布的延迟特性。
接下来就进入到 ServiceConfig.java
类的 doExport()
方法。
ServiceConfig.java
类的 doExport方法。检查DUBBO配置的合法性,并调用doExportUrls 方法。
package com.alibaba.dubbo.config; public class ServiceConfig<T> extends AbstractServiceConfig { protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:service interface=/"/" /> interface not allow null!"); } checkDefault(); if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); checkRef(); generic = Boolean.FALSE.toString(); } if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName); } } if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName); } } checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } doExportUrls(); } }
我们可以看出该方法的实现的逻辑包含了根据配置的优先级将 ProviderConfig,ModuleConfig,MonitorConfig,ApplicaitonConfig
等一些配置信息进行组装和合并。还有一些逻辑是检查配置信息的合法性。最后又调用了doExportUrls方法。
ServiceConfig.java
类的 doExportUrls()
方法
package com.alibaba.dubbo.config; public class ServiceConfig<T> extends AbstractServiceConfig { @SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } }
该方法第一步是加载注册中心列表
loadRegistries(true);
加载注册中心列表响应示例
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.5.6&file=/data/dubbo/cache/dubbo-provider&pid=21448®istry=zookeeper×tamp=1524134852031
第二部是将服务发布到多种协议的url上,并且携带注册中心列表的参数,从这里我们可以看出dubbo是支持同时将一个服务发布成为多种协议的,这个需求也是很正常的,客户端也需要支持多协议,根据不同的场景选择合适的协议。
ServiceConfig.java
类的 doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
方法。
package com.alibaba.dubbo.config; public class ServiceConfig<T> extends AbstractServiceConfig { private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); if (name == null || name.length() == 0) { name = "dubbo"; } String host = protocolConfig.getHost(); //host = 10.4.81.95 if (provider != null && (host == null || host.length() == 0)) { host = provider.getHost(); } boolean anyhost = false; if (NetUtils.isInvalidLocalHost(host)) { anyhost = true; try { host = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { logger.warn(e.getMessage(), e); } if (NetUtils.isInvalidLocalHost(host)) { if (registryURLs != null && registryURLs.size() > 0) { for (URL registryURL : registryURLs) { try { Socket socket = new Socket(); try { SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort()); socket.connect(addr, 1000); host = socket.getLocalAddress().getHostAddress(); break; } finally { try { socket.close(); } catch (Throwable e) { } } } catch (Exception e) { logger.warn(e.getMessage(), e); } } } if (NetUtils.isInvalidLocalHost(host)) { host = NetUtils.getLocalHost(); } } } Integer port = protocolConfig.getPort(); if (provider != null && (port == null || port == 0)) { port = provider.getPort(); } final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort(); if (port == null || port == 0) { port = defaultPort; } if (port == null || port <= 0) { port = getRandomPort(name); if (port == null || port < 0) { port = NetUtils.getAvailablePort(defaultPort); putRandomPort(name, port); } logger.warn("Use random available port(" + port + ") for protocol " + name); } Map<String, String> map = new HashMap<String, String>(); if (anyhost) { map.put(Constants.ANYHOST_KEY, "true"); } map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); if (methods != null && methods.size() > 0) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (arguments != null && arguments.size() > 0) { for (ArgumentConfig argument : arguments) { //类型自动转换. if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); //遍历所有方法 if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); //匹配方法名称,获取方法签名. if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); //一个方法中单个callback if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { //一个方法中多个callback for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } if (ProtocolUtils.isGeneric(generic)) { map.put("generic", generic); map.put("methods", Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put("token", UUID.randomUUID().toString()); } else { map.put("token", token); } } if ("injvm".equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // 服务发布 map 中的一些关键配置信息 /* map = {HashMap@6276} size = 17 0 = {HashMap$Node@6516} "side" -> "provider" 1 = {HashMap$Node@6517} "default.version" -> "1.0" 2 = {HashMap$Node@6518} "methods" -> "sayHello" 3 = {HashMap$Node@6519} "dubbo" -> "2.5.6" 4 = {HashMap$Node@6520} "threads" -> "500" 5 = {HashMap$Node@6521} "pid" -> "21448" 6 = {HashMap$Node@6522} "interface" -> "io.ymq.dubbo.api.DemoService" 7 = {HashMap$Node@6523} "threadpool" -> "fixed" 8 = {HashMap$Node@6524} "generic" -> "false" 9 = {HashMap$Node@6525} "default.retries" -> "0" 10 = {HashMap$Node@6526} "delay" -> "-1" 11 = {HashMap$Node@6527} "application" -> "dubbo-provider" 12 = {HashMap$Node@6528} "default.connections" -> "5" 13 = {HashMap$Node@6529} "default.delay" -> "-1" 14 = {HashMap$Node@6530} "default.timeout" -> "10000" 15 = {HashMap$Node@6531} "anyhost" -> "true" 16 = {HashMap$Node@6532} "timestamp" -> "1524135271940" */ // 导出服务 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); // url /* dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubboprovider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& pid=21448& side=provider& threadpool=fixed& threads=500& timestamp=1524135271940 */ if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url); } }
该方法的逻辑是先根据服务配置、协议配置、发布服务的服务器信息、方法列表、dubbo版本等等信息组装成一个发布的URL对象。
主要根据之前map里的数据组装成URL。
例如
dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& pid=21448& side=provider& threadpool=fixed& threads=500& timestamp=1524135271940
//配置为none不暴露 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 省略更多 } }
@SuppressWarnings({"unchecked", "rawtypes"}) private void exportLocal(URL url) { // 入参 URL = /* dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& pid=11104& scope=local& side=provider& threadpool=fixed& threads=500& timestamp=1524193840984 */ if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(NetUtils.LOCALHOST) .setPort(0); //这时候转成本地暴露的协议 url: /* injvm://127.0.0.1/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& pid=11104& scope=local& side=provider& threadpool=fixed& threads=500& timestamp=1524193840984 */ //首先还是先获得Invoker //然后导出成Exporter,并缓存 //这里的proxyFactory实际是JavassistProxyFactory //有关详细的获得Invoke以及exporter会在下面的流程解析,在本地暴露这个流程就不再说明。 Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();
在 ServiceConfig.exportLocal(URL url)
方法中对url进行本地暴露,首先将URL的协议更改为了“injvm”、IP更改为了本地端口,端口变更为0。
主要有代理工厂创建Invoker代理、Protocol暴露Invoker从而生成Exporter两个处理逻辑。
接下来是暴露为远程服务,跟本地暴露的流程一样还是先获取Invoker,然后导出成Exporter。
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(); //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // url 参数 /** dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService&methods=sayHello& monitor=dubbo%3A%2F%2F127.0.0.1%3A2181%2Fcom.alibaba.dubbo.registry.RegistryService%3Fapplication%3Ddubbo-provider%26dubbo%3D2.5.6%26file%3D%2Fdata%2Fdubbo%2Fcache%2Fdubbo-provider%26pid%3D26440%26protocol%3Dregistry%26refer%3Ddubbo%253D2.5.6%2526interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526pid%253D26440%2526timestamp%253D1524212561737%26registry%3Dzookeeper%26timestamp%3D1524212160841& pid=26440& side=provider& threadpool=fixed& threads=500& timestamp=1524212161760 */ //根据服务具体实现,实现接口,以及registryUrl通过ProxyFactory将DemoService封装成一个本地执行的Invoker代理 //invoker是对具体实现的一种代理。 //这里proxyFactory是上面列出的生成的代码 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //使用Protocol将invoker导出成一个Exporter //暴露封装服务invoker //调用Protocol生成的适配类的export方法 //这里的protocol是上面列出的生成的代码 Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } }
暴露远程服务时的获取Invoker过程
服务实现类转换成Invoker,大概的步骤是:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
package com.alibaba.dubbo.rpc.proxy.wrapper; public class StubProxyFactoryWrapper implements ProxyFactory { private final ProxyFactory proxyFactory; public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException { return proxyFactory.getInvoker(proxy, type, url); } }
这行代码中包含服务实现类转换成Invoker的过程,其中proxyFactory是上面列出的动态生成的代码,其中getInvoker的代码为(做了精简)。
JavassistProxyFactory.java
类的 getInvoker(T proxy, Class<T> type, URL url)
方法。
package com.alibaba.dubbo.rpc.proxy.javassist; public class JavassistProxyFactory extends AbstractProxyFactory { public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 //第一步封装一个包装类(wrapper) //该类是手动生成的 //如果类是以$开头,就使用接口类型获取,其他的使用实现类获取 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod //关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
Wrapper.java
类的 getWrapper(Class<?> c)
方法。
生成包装类(wrapper)的过程,首先看getWrapper方法
package com.alibaba.dubbo.common.bytecode; public abstract class Wrapper { //缓存包装类 private static final Map<Class<?>, Wrapper> WRAPPER_MAP = new ConcurrentHashMap<Class<?>, Wrapper>(); //class wrapper map public static Wrapper getWrapper(Class<?> c) { while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class. c = c.getSuperclass(); //Object类型的 if( c == Object.class ) return OBJECT_WRAPPER; //先去Wrapper缓存中查找 // c 等于 class io.ymq.dubbo.provider.service.DemoServiceImpl Wrapper ret = WRAPPER_MAP.get(c); if( ret == null ) { //缓存中不存在,生成Wrapper类,放到缓存 ret = makeWrapper(c); WRAPPER_MAP.put(c,ret); } return ret; } }
生成完Wrapper以后,返回一个AbstractProxyInvoker实例。至此生成Invoker的步骤就完成了。 可以看到Invoker执行方法的时候,会调用Wrapper的invokeMethod, 这个方法中会有真实的实现类调用真实方法的代码 。
JavassistProxyFactory.java
类的 getInvoker(T proxy, Class<T> type, URL url)
方法。
package com.alibaba.dubbo.rpc.proxy.javassist; public class JavassistProxyFactory extends AbstractProxyFactory { public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 //第一步封装一个包装类(wrapper) //该类是手动生成的 //如果类是以$开头,就使用接口类型获取,其他的使用实现类获取 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod //关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
Invoker导出为Exporter分为两种情况,第一种是Registry类型的Invoker,第二种是其他协议类型的Invoker,分开解析。
代码入口:
Exporter<?> exporter = protocol.export(invoker);
大概的步骤是:
rotocol是上面列出的动态生成的代码,会先调用ProtocolListenerWrapper,这个Wrapper负责初始化暴露和引用服务的监听器。对于Registry类型的不做处理,代码如下:
ProtocolFilterWrapper.java
类的 export(Invoker<T> invoker)
方法
package com.alibaba.dubbo.rpc.protocol; public class ProtocolFilterWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // REGISTRY_PROTOCOL = "registry"; // 协议为 registry类型的Invoker,不需要做处理 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } }
消费者端和提供者端形成调用链都调用了同一段代码:buildInvokerChain() 通过last->next的方式统一添加,然后再新生成的Invoker中调用next然后最后调用last就是我们真正执行的Invoker,调用链分类(默认Filter), 代码如下:
ProtocolFilterWrapper.java
类的 buildInvokerChain(final Invoker<T> invoker, String key, String group)
方法。
package com.alibaba.dubbo.rpc.protocol; public class ProtocolFilterWrapper implements Protocol { private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; //只获取满足条件的Filter List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { // filter从最后一个开始依次封装,最终形成一个链,调用顺序为filters的顺序 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } }
RegistryProtocol.java
类的 export(final Invoker<T> originInvoker)
方法:
这里我们先解析的是Registry类型的Invoker,接着就会调用RegistryProtocol的export方法,RegistryProtocol负责注册服务到注册中心和向注册中心订阅服务。代码如下:
package com.alibaba.dubbo.registry.integration; public class RegistryProtocol implements Protocol { public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker //这里就交给了具体的协议去暴露服务(先不解析,留在后面,可以先去后面看下导出过程) final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider //根据invoker中的url获取Registry实例 //并且连接到注册中心 //此时提供者作为消费者引用注册中心核心服务RegistryService final Registry registry = getRegistry(originInvoker); //注册到注册中心的URL final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //调用远端注册中心的register方法进行服务注册 //若有消费者订阅此服务,则推送消息让消费者引用此服务。 //注册中心缓存了所有提供者注册的服务以供消费者发现。 registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 //返回暴露后的Exporter给上层ServiceConfig进行缓存,便于后期撤销暴露。 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; } }
先不解析,留在后面,可以先去后面看下导出过程,然后再回来接着看注册到注册中心的过程。具体协议暴露服务主要是打开服务器和端口,进行监听。
具体的协议进行暴露并且返回了一个ExporterChangeableWrapper之后,接下来看下一步连接注册中心并注册到注册中心,代码是在RegistryProtocol的export方法:
RegistryProtocol.java
类的 export(final Invoker<T> originInvoker)
方法
package com.alibaba.dubbo.registry.integration; public class RegistryProtocol implements Protocol { //省略很多。。。 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //此步已经分析完 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //得到具体的注册中心,连接注册中心,此时提供者作为消费者引用注册中心核心服务RegistryService final Registry registry = getRegistry(originInvoker); //获取要注册到注册中心的url final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //调用远端注册中心的register方法进行服务注册 //若有消费者订阅此服务,则推送消息让消费者引用此服务 registry.register(registedProviderUrl); //提供者向注册中心订阅所有注册服务的覆盖配置 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //返回暴露后的Exporter给上层ServiceConfig进行缓存 return new Exporter<T>() {。。。} } }
RegistryProtocol.java
类的 getRegistry(final Invoker<?> originInvoker)
方法,会发现该方法会在 AbstractRegistryFactory.java
类中实现:
package com.alibaba.dubbo.registry.integration; public class RegistryProtocol implements Protocol { /** * 根据invoker的地址获取registry实例 * * @param originInvoker * @return */ private Registry getRegistry(final Invoker<?> originInvoker) { //获取invoker中的registryUrl URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { //获取registry的值,这里获得是zookeeper,默认值是dubbo String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); // registryUrl /* zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? application=dubbo-provider& dubbo=2.5.6& export=dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1&default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& monitor=dubbo://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService%3F application=dubbo-provider& dubbo=2.5.6& file=/data/dubbo/cache/dubbo-provider& pid=3D29100%26 protocol%3D registry%26refer%3D dubbo%253D2.5.6%2526 interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526 pid%253D29100%2526 timestamp%253D1524465704266%26 registry%3Dzookeeper%26 timestamp%3D1524465673571& pid=29100& side=provider&threadpool=fixed&threads=500& timestamp=1524465674601& file=/data/dubbo/cache/dubbo-provider& pid=29100& timestamp=1524465673571 */ } //根据SPI机制获取具体的Registry实例,这里获取到的是ZookeeperRegistry return registryFactory.getRegistry(registryUrl); } }
先看下 AbstractRegistryFactory.java
类的 getRegistry(URL url)
方法。
package com.alibaba.dubbo.registry.support; public abstract class AbstractRegistryFactory implements RegistryFactory { public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); // 入参url = // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? // application=dubbo-provider& // dubbo=2.5.6& // file=/data/dubbo/cache/dubbo-provider& // interface=com.alibaba.dubbo.registry.RegistryService& // pid=30112& // timestamp=1524469125241 //这里key为: // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService String key = url.toServiceString(); // 锁定注册中心获取过程,保证注册中心单一实例 LOCK.lock(); try { //先从缓存中获取Registry实例 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //创建registry,会直接new一个ZookeeperRegistry返回 //********具体创建实例是子类来实现的******** registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } // 创建后的registry // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? // application=dubbo-provider& // dubbo=2.5.6& // file=/data/dubbo/cache/dubbo-provider& // interface=com.alibaba.dubbo.registry.RegistryService& // pid=30344& // timestamp=1524473152100 //放到缓存中 REGISTRIES.put(key, registry); return registry; } finally { // 释放锁 LOCK.unlock(); } } }
ZookeeperRegistryFactory.java
类的 createRegistry(URL url)
方法
方法是在子类中实现的,这里的子类是ZookeeperRegistry.java。
首先需要经过AbstractRegistry 类的构造方法。AbstractRegistry类的构造器初始化完,接着调用FailbackRegistry类的构造器初始化,最后回到ZookeeperRegistry类的构造初始化。
package com.alibaba.dubbo.registry.zookeeper; public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } }
ZookeeperRegistry.java
类的 ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
构造方法
package com.alibaba.dubbo.registry.zookeeper; public class ZookeeperRegistry extends FailbackRegistry { public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } }
FailbackRegistry.java
类的 AbstractRegistry
构造器初始化完,接着调用 FailbackRegistry
类的 FailbackRegistry(URL url)
构造器初始化:
package com.alibaba.dubbo.registry.support; public abstract class FailbackRegistry extends AbstractRegistry { public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并连接注册中心 try { retry(); } catch (Throwable t) { // 防御性容错 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } }
AbstractRegistry.java
类的 AbstractRegistry(URL url)
构造器初始化完,接着调用 FailbackRegistry
类的 FailbackRegistry(URL url)
构造器初始化:
package com.alibaba.dubbo.registry.support; public abstract class AbstractRegistry implements Registry { public AbstractRegistry(URL url) { // 保存url // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? // application=dubbo-provider& // dubbo=2.5.6& // file=/data/dubbo/cache/dubbo-provider& // interface=com.alibaba.dubbo.registry.RegistryService& // pid=28536& // timestamp=1524471115982 setUrl(url); // 启动文件保存定时器 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"); // 保存的文件为:/data/dubbo/cache/dubbo-provider File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; // 加载文件中的属性 loadProperties(); //通知订阅 notify(url.getBackupUrls()); } }
AbstractRegistry.java
类的 notify(List<URL> urls)
方法:
package com.alibaba.dubbo.registry.support; public abstract class AbstractRegistry implements Registry { protected void notify(List<URL> urls) { if(urls == null || urls.isEmpty()) return; //getSubscribed()方法获取订阅者列表 //订阅者Entry里每个URL都对应着n个NotifyListener for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); if(! UrlUtils.isMatch(url, urls.get(0))) { continue; } Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { //通知每个监听器 notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) {} } } } } }
通知每个监听器
AbstractRegistry.java
类的 notify(URL url, NotifyListener listener, List<URL> urls)
方法:
package com.alibaba.dubbo.registry.support; public abstract class AbstractRegistry implements Registry { protected void notify(URL url, NotifyListener listener, List<URL> urls) { // 省略。。。 Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { //分类 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); //保存到主目录下的.dubbo目录下 saveProperties(url); //上面获取到的监听器进行通知 listener.notify(categoryList); } } }
FailbackRegistry.java
类的 AbstractRegistry
类的构造器初始化完,接着调用 FailbackRegistry
类的构造器初始化:
public abstract class FailbackRegistry extends AbstractRegistry { public FailbackRegistry(URL url) { super(url); //重试时间,默认5000ms int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); //启动失败重试定时器 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并连接注册中心 try { //获取到注册失败的,然后尝试注册 retry(); } catch (Throwable t) { // 防御性容错 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } }
最后回到 ZookeeperRegistry.java
类的构造初始化:
package com.alibaba.dubbo.registry.zookeeper; public class ZookeeperRegistry extends FailbackRegistry { public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } //获得到注册中心中的分组,默认dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } //注册到注册中心的节点 this.root = group; //使用zookeeperTansporter去连接 //ZookeeperTransport这里是生成的自适应实现,默认使用ZkClientZookeeperTransporter //ZkClientZookeeperTransporter的connect去实例化一个ZkClient实例 //并且订阅状态变化的监听器subscribeStateChanges //然后返回一个ZkClientZookeeperClient实例 zkClient = zookeeperTransporter.connect(url); //ZkClientZookeeperClient添加状态改变监听器 zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } }
注册到注册中心 部分重要代码
package com.alibaba.dubbo.registry.integration; public class RegistryProtocol implements Protocol { public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); //取要注册到注册中心的url final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); // URL 下面 /* dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& pid=30344& side=provider& threadpool=fixed& threads=500& timestamp=1524473153090 */ // 省略很多。。。 return new Exporter<T>() { // 省略很多。。。 } } }
获取到了Registry,Registry实例中保存着连接到了zookeeper的zkClient实例之后,下一步获取要注册到注册中心的url(在RegistryProtocol中)。
取要注册到注册中心的url 的 RegistryProtocol.java
类的 getRegistedProviderUrl(final Invoker<?> originInvoker)
方法
package com.alibaba.dubbo.registry.integration; public class RegistryProtocol implements Protocol { /** * 返回注册到注册中心的URL,对URL参数进行一次过滤 * * @param originInvoker * @return */ private URL getRegistedProviderUrl(final Invoker<?> originInvoker) { URL providerUrl = getProviderUrl(originInvoker); //注册中心看到的地址 final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY); return registedProviderUrl; } }
然后调用 registry.register(registedProviderUrl)
注册到注册中心(在RegistryProtocol中)。register方法的实现在FailbackRegistry中:
FailbackRegistry.java
类的 register(URL url)
方法:
package com.alibaba.dubbo.registry.support; public abstract class FailbackRegistry extends AbstractRegistry { @Override public void register(URL url) { if (destroyed.get()){ return; } super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服务器端发送注册请求 (url); } catch (Exception e) { Throwable t = e; // 如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 将失败的注册请求记录到失败列表,定时重试 failedRegistered.add(url); } } }
doRegister(url)
;在这里是 ZookeeperRegistry
中具体实现的,这里将会注册到注册中心:
ZookeeperRegistry.java
类的 doRegister(URL url)
方法
package com.alibaba.dubbo.registry.zookeeper; public class ZookeeperRegistry extends FailbackRegistry { protected void doRegister(URL url) { try { //这里zkClient就是我们上面调用构造的时候生成的 //ZkClientZookeeperClient //保存着连接到Zookeeper的zkClient实例 //开始注册,也就是在Zookeeper中创建节点 //这里toUrlPath获取到的path为: /** /dubbo /io.ymq.dubbo.api.DemoService /providers /dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& pid=16856& side=provider& threadpool=fixed& threads=500& timestamp=1525419864880 **/ //默认创建的节点是临时节点 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } }
经过这一步之后,Zookeeper中就有节点存在了,具体节点为:
/dubbo /io.ymq.dubbo.api.DemoService /providers /dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& pid=16856& side=provider& threadpool=fixed& threads=500& timestamp=1525419864880
在注册到注册中心之后,registry会去订阅覆盖配置的服务,这一步之后就会在 /dubbo/io.ymq.dubbo.api.DemoService
节点下多一个 configurators
节点。(具体过程暂先不解析)。
最后返回Exporter新实例,返回到ServiceConfig中。服务的发布就算完成了。
这里也就是非Registry类型的Invoker的导出过程。主要的步骤是将本地ip和20880端口打开,进行监听。最后包装成exporter返回。
RegistryProtocol.java
类的 doLocalExport(invoker) 方法
package com.alibaba.dubbo.registry.integration; public class RegistryProtocol implements Protocol { //用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露 //providerurl <--> exporter private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>(); @SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { //原始的invoker中的url: /* registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? application=dubbo-provider&dubbo=2.5.6& export=dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& monitor=dubbo%3A%2F%2F127.0.0.1%3A2181%2F com.alibaba.dubbo.registry.RegistryService%3Fapplication%3D dubbo-provider%26dubbo%3D2.5.6%26file%3D%2Fdata%2Fdubbo%2F cache%2Fdubbo-provider%26pid%3D2972%26protocol%3D registry%26refer%3Ddubbo%253D2.5.6%2526interface%253D com.alibaba.dubbo.monitor.MonitorService%2526 pid%253D2972%2526timestamp%253D1525421418972%26 registry%3Dzookeeper%26timestamp%3D1525421395956& pid=2972& side=provider& threadpool=fixed& threads=500& timestamp=1525421396922& file=/data/dubbo/cache/dubbo-provider&pid=2972& registry=zookeeper& timestamp=1525421395956 //从原始的invoker中得到的key: dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService& methods=sayHello& monitor=dubbo://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? application=dubbo-provider& dubbo=2.5.6& file=/data/dubbo/cache/dubbo-provider& pid=4668& protocol=registry& refer=dubbo%3D2.5.6%26 interface%3Dcom.alibaba.dubbo.monitor.MonitorService%26pid%3D4668%26 timestamp%3D1525422343947& registry=zookeeper& timestamp=1525422334635& pid=4668& side=provider& threadpool=fixed& threads=500& timestamp=1525422335033 */ String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { //得到一个Invoker代理,里面包含原来的Invoker final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //此处protocol还是最上面生成的代码,调用代码中的export方法,会根据协议名选择调用具体的实现类 //这里我们需要调用DubboProtocol的export方法 //这里的使用具体协议进行导出的invoker是个代理invoker //导出完之后,返回一个新的ExporterChangeableWrapper实例 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; } }
这里protocol.export(invokerDelegete)就要去具体的 DubboProtocol.java
中执行了, DubboProtocol.java
的外面包裹着 ProtocolFilterWrapper.java
,再外面还包裹着 ProtocolListenerWrapper.java
。会先经过 ProtocolListenerWrapper.java
:
ProtocolListenerWrapper.java
类的 export(Invoker<T> invoker)
方法 package com.alibaba.dubbo.rpc.protocol; public class ProtocolListenerWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); } }
ProtocolFilterWrapper.java
类的 export(Invoker<T> invoker)
方法 package com.alibaba.dubbo.rpc.protocol; public class ProtocolFilterWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // REGISTRY_PROTOCOL = "registry"; // 协议为 registry类型的Invoker,不需要做处理 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //其他具体协议类型的Invoker //先构建Filter链,然后再导出 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } }
消费者端和提供者端形成调用链都调用了同一段代码:buildInvokerChain() 通过last->next的方式统一添加,然后再新生成的Invoker中调用next然后最后调用last就是我们真正执行的Invoker,调用链分类(默认Filter)
ProtocolFilterWrapper.java
类的 buildInvokerChain(final Invoker<T> invoker, String key, String group)
方法 生成新的 Invoker
package com.alibaba.dubbo.rpc.protocol; public class ProtocolFilterWrapper implements Protocol { private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { //我们要处理的那个Invoker作为处理链的最后一个 Invoker<T> last = invoker; //根据key和group获取自动激活的Filter List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { // 把所有的过滤器都挨个连接起来,最后一个是我们真正的Invoker,最终形成一个链,调用顺序为filters的顺序 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } }
接着就到了 DubboProtocol.java
类的 export(Invoker<T> invoker)
方法,这里进行暴露服务:
package com.alibaba.dubbo.rpc.protocol.dubbo; public class DubboProtocol extends AbstractProtocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); /* 入参: url dubbo://169.254.8.253:20880/io.ymq.dubbo.api.DemoService? anyhost=true& application=dubbo-provider& default.connections=5& default.delay=-1& default.retries=0& default.timeout=10000& default.version=1.0& delay=-1& dubbo=2.5.6& generic=false& interface=io.ymq.dubbo.api.DemoService&methods=sayHello& monitor=dubbo://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? application=dubbo-provider& dubbo=2.5.6& file=/data/dubbo/cache/dubbo-provider& pid=13576& protocol=registry& refer=dubbo%3D2.5.6%26interface%3Dcom.alibaba.dubbo.monitor.MonitorService%26pid%3D13576%26timestamp%3D1525425274459& registry=zookeeper& timestamp=1525425266251& pid=13576& side=provider& threadpool=fixed& threads=500& timestamp=1525425266793 */ // export service. //key 由serviceName,port,version,group组成 //当nio客户端发起远程调用时,nio服务端通过此key来决定调用哪个Exporter,也就是执行的Invoker。 // io.ymq.dubbo.api.DemoService:1.0:20880 String key = serviceKey(url); //将Invoker转换成Exporter //直接new一个新实例 //没做啥处理,就是做一些赋值操作 //这里的exporter就包含了invoker DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //缓存要暴露的服务,key是上面生成的 exporterMap.put(key, exporter); //export an stub service for dispaching event //是否支持本地存根 //远程服务后,客户端通常只剩下接口,而实现全在服务器端, //但提供方有些时候想在客户端也执行部分逻辑,比如:做ThreadLocal缓存, //提前验证参数,调用失败后伪造容错数据等等,此时就需要在API中带上Stub, //客户端生成Proxy实,会把Proxy通过构造函数传给Stub, //然后把Stub暴露组给用户,Stub可以决定要不要去调Proxy。 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根据URL绑定IP与端口,建立NIO框架的Server openServer(url); return exporter; } }
上面得到的Exporter会被放到缓存中去,key就是上面生成的,客户端就可以发请求根据key找到Exporter,然后找到invoker进行调用了。接下来是创建服务器并监听端口。
接着调用openServer方法创建NIO Server进行监听:
DubboProtocol.java
类的 openServer(URL url)
方法
package com.alibaba.dubbo.rpc.protocol.dubbo; public class DubboProtocol extends AbstractProtocol { private void openServer(URL url) { // find server. //key是IP:PORT //169.254.8.253:20880 String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); //同一JVM中,同协议的服务,共享同一个Server, //第一个暴露服务的时候创建server, //以后相同协议的服务都使用同一个server if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 //同协议的服务后来暴露服务的则使用第一次创建的同一Server //accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化 //这时需要重新设置Server server.reset(url); } } } }
继续看createServer方法:
DubboProtocol.java
类的 createServer(URL url)
方法
package com.alibaba.dubbo.rpc.protocol.dubbo; public class DubboProtocol extends AbstractProtocol { private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); //默认使用netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { //Exchangers是门面类,里面封装的是Exchanger的逻辑。 //Exchanger默认只有一个实现HeaderExchanger. //Exchanger负责数据交换和网络通信。 //从Protocol进入Exchanger,标志着程序进入了remote层。 //这里requestHandler是ExchangeHandlerAdapter server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; } }
Exchangers.java
类的 ExchangeServer bind(URL url, ExchangeHandler handler)
方法:
package com.alibaba.dubbo.remoting.exchange; public class Exchangers { public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //getExchanger方法根据url获取到一个默认的实现HeaderExchanger //调用HeaderExchanger的bind方法 return getExchanger(url).bind(url, handler); } }
HeaderExchanger.java
类的 bind(URL url, ExchangeHandler handler)
方法:
package com.alibaba.dubbo.remoting.exchange.support.header; public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //直接返回一个HeaderExchangeServer //先创建一个HeaderExchangeHandler //再创建一个DecodeHandler //最后调用Transporters.bind return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
这里会先创建一个 HeaderExchangerHandler ,包含着ExchangeHandlerAdapter,接着创建一个DecodeHandler,会包含前面的handler,接下来调用Transporters的bind方法,返回一个Server,接着用HeaderExchangeServer包装一下,就返回给Protocol层了。
在HeaderExchangerServer包装的时候会启动心跳定时器startHeatbeatTimer();,暂不解析。
Transporters.java
类的 bind(URL url, ChannelHandler... handlers)
方法
package com.alibaba.dubbo.remoting; public class Transporters { public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { // 省略。。。 ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { //如果有多个handler的话,需要使用分发器包装下 handler = new ChannelHandlerDispatcher(handlers); } //getTransporter()获取一个Adaptive的Transporter //然后调用bind方法(默认是NettyTransporter的bind方法) return getTransporter().bind(url, handler); } public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } }
NettyTransporter.java
类的 bind(URL url, ChannelHandler listener)
方法:
package com.alibaba.dubbo.remoting.transport.netty4; public class NettyTransporter implements Transporter { public static final String NAME = "netty4"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { //创建一个Server return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
NettyServer.java
类的 NettyServer(URL url, ChannelHandler handler)
构造方法:
package com.alibaba.dubbo.remoting.transport.netty4; public class NettyServer extends AbstractServer implements Server { // 省略。。。 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { //handler先经过ChannelHandlers的包装方法 //然后再初始化 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
ChannelHandlers.wrap
方法中会根据SPI扩展机制动态生成Dispatcher的自适应类,生成的代码不在列出,默认使用AllDispatcher处理,会返回一个AllChannelHandler,会把线程池和DataStore都初始化了。然后经过HeartbeatHandler封装,再经过MultiMessageHandler封装后返回。
NettyServer构造,会依次经过AbstractPeer,AbstractEndpoint,AbstractServer,NettyServer的初始化。重点看下AbstractServer的构造方法:
AbstractServer.java
类的 AbstractServer(URL url, ChannelHandler handler)
构造方法
package com.alibaba.dubbo.remoting.transport; public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { //初始化的时候会打开Server //具体实现这里是NettyServer中 doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); } }
NettyServer.java
类的 doOpen()
方法
package com.alibaba.dubbo.remoting.transport.netty4; public class NettyServer extends AbstractServer implements Server { protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); //boss线程池 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); //worker线程池 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); //ChannelFactory,没有指定工作者线程数量,就使用cpu+1 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind之后返回一个Channel channel = bootstrap.bind(getBindAddress()); } }
doOpen方法创建Netty的Server端并打开,具体的事情就交给Netty去处理了,Netty的过程,原理,代码有时间再另行研究。
可执行的对象,执行具体的远程调用,能够根据方法名称,参数得到相应的执行结果。
Invocation,包含了需要执行的方法,参数等信息。目前实现类只有RpcInvocation。
以sayHello为例:
提供者端的Invoker封装了服务实现类,URL,Type,状态都是只读并且线程安全。通过发起invoke来具体调用服务类。
ProxyFactory
在服务提供者端,ProxyFactory主要服务的实现统一包装成一个Invoker,Invoker通过反射来执行具体的Service实现对象的方法。默认的实现是JavassistProxyFactory,代码如下:
JavassistProxyFactory.java
类的 getInvoker(T proxy, Class<T> type, URL url)
方法
package com.alibaba.dubbo.rpc.proxy.javassist; public class JavassistProxyFactory extends AbstractProxyFactory { public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 //第一步封装一个包装类(wrapper) //该类是手动生成的 //如果类是以$开头,就使用接口类型获取,其他的使用实现类获取 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod //关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
服务地址的发布和订阅。
Protocol是dubbo中的服务域,只在服务启用时加载,无状态,线程安全,是实体域Invoker暴露和引用的主功能入口,负责Invoker的生命周期管理,是Dubbo中远程服务调用层。
Protocol根据指定协议对外公布服务,当客户端根据协议调用这个服务时,Protocol会将客户端传递过来的Invocation参数交给Invoker去执行。
Protocol加入了远程通信协议,会根据客户端的请求来获取参数Invocation。
package com.alibaba.dubbo.rpc; @Extension("dubbo") public interface Protocol { int getDefaultPort(); //对于服务提供端,将本地执行类的Invoker通过协议暴漏给外部 //外部可以通过协议发送执行参数Invocation,然后交给本地Invoker来执行 @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; //这个是针对服务消费端的,服务消费者从注册中心获取服务提供者发布的服务信息 //通过服务信息得知服务提供者使用的协议,然后服务消费者仍然使用该协议构造一个Invoker。这个Invoker是远程通信类的Invoker。 //执行时,需要将执行信息通过指定协议发送给服务提供者,服务提供者接收到参数Invocation,然后交给服务提供者的本地Invoker来执行 @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; void destroy(); }
暴露服务:
在没有注册中心,直接暴露提供者的情况下,即:
<dubbo:service regisrty="N/A" /> or <dubbo:registry address="N/A" />
ServiceConfig解析出的URL的格式为:
dubbo://service-host/com.foo.FooService?version=1.0.0
基于扩展点的Adaptive机制,通过URL的”dubbo://”协议头识别,直接调用DubboProtocol的export()方法,打开服务端口。
在有注册中心,需要注册提供者地址的情况下,即:
<dubbo:registry address="zookeeper://10.20.153.10:2181" />
ServiceConfig解析出的URL的格式为:
registry://registry-host/com.alibaba.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0")
基于扩展点的Adaptive机制,通过URL的”registry://”协议头识别,就会调用RegistryProtocol的export()方法,将export参数中的提供者URL,先注册到注册中心,再重新传给Protocol扩展点进行暴露:
dubbo://service-host/com.foo.FooService?version=1.0.0
基于扩展点的Adaptive机制,通过提供者URL的”dubbo://”协议头识别,就会调用DubboProtocol的export()方法,打开服务端口。
RegistryProtocol,注册中心协议集成,装饰真正暴露引用服务的协议,增强注册发布功能。
ServiceConfig中的protocol是被多层装饰的Protocol,是DubboProtocol+RegistryProtocol+ProtocolListenerWrapper+ProtocolFilterWrapper。
负责invoker的生命周期,包含一个Invoker对象,可以撤销服务。
负责数据交换和网络通信的组件。每个Invoker都维护了一个ExchangeClient的 引用,并通过它和远端server进行通信。