各个配置信息、配置中心准备好了,开始服务暴露的过程
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); }
private void doExportUrls() { // 把application、config、path、protocol、运行参数等拼接到URL中 List<URL> registryURLs = loadRegistries(true); // 每个协议都暴露 for (ProtocolConfig protocolConfig : protocols) { // 拼接path、group、version String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); // 创建providerModel,构造方法里有个init方法,会把方法存入到method中 ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); // 存入到PROVIDED_SERVICES中 ApplicationModel.initProviderModel(pathKey, providerModel); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } // 把各个信息放入map中 Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); // remove 'default.' prefix for configs from ProviderConfig // appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, provider); appendParameters(map, protocolConfig); appendParameters(map, this); // 如果有设置methods,则设置方法的各自参数 if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if (Boolean.FALSE.toString().equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { for (ArgumentConfig argument : arguments) { // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } //设置泛化、版本、method if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // 设置token if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } // export service // 获取ip,如果没有设置则默认本机 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); // 获取端口,没有设置则为20880 Integer port = this.findConfigedPorts(protocolConfig, name, map); // 拼接暴露的URL URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); // 如果已经扩展了协议,则配置信息 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // don't export when none is configured if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { // 暴露本地服务 exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) // 暴露远程服务 if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // 监听 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } } // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // 暴露服务 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 这个protocol是RegistryProtocol,看下面的暴露远程服务 Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); }
在SPI#createExtension中,我们看到了Extension的创建,是 wrapperClass
装饰的,而InjvmProtocol是由 org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
、 org.apache.dubbo.qos.protocol.QosProtocolWrapper
、 org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
这三个装饰的。调用的时候,会先调用 ProtocolListenerWrapper
的 export
方法,然后会调用 QosProtocolWrapper
的 export
方法,然后是 ProtocolFilterWrapper
的 export
方法,这里会获取filter,最后是 InjvmProtocol
的 InjvmProtocol
。从 InjvmProtocol
到 ProtocolFilterWrapper
到 QosProtocolWrapper
的 export
方法,都执行完后,就开始创建一个 ListenerExporterWrapper
,负责监听export和unexport。
private void exportLocal(URL url) { URL local = URLBuilder.from(url) .setProtocol(LOCAL_PROTOCOL) .setHost(LOCALHOST_VALUE) .setPort(0) .build(); // PROXY_FACTORY.getInvoker是包装实现类的信息 // local是injvm:开头的,所以这个protocol是InjvmProtocol Exporter<?> exporter = protocol.export( PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local)); // 添加到exporters exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local); } // ProtocolListenerWrapper public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY))); } // QosProtocolWrapper public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { startQosServer(invoker.getUrl()); return protocol.export(invoker); } return protocol.export(invoker); } // ProtocolFilterWrapper,获取filter public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); } // InjvmProtocol public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
通过netty开启服务,并把信息注册到注册中心
//RegistryProtocol public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 获取注册中心的地址 URL registryUrl = getRegistryUrl(originInvoker); // url to export locally // 获取服务方地址 URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. // 订阅覆盖的地址 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); // 监听 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker // 这里会调用DubboProtocol的export方法 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry // 创建一个registry final Registry registry = getRegistry(originInvoker); // 获取已经注册的URL final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { register(registryUrl, registeredProviderUrl); // 设置为已注册 providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. // 订阅 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 获取地址 URL url = invoker.getUrl(); // export service. //获取服务 String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event // 是否为调度事件暴露stub服务 Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 开启服务 openServer(url); // 实现了SerializationOptimizer接口的先实例化 optimizeSerialization(url); return exporter; }
private void openServer(URL url) { // find server. // 获取服务地址 String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // ExchangeServer如果为空,则创建一个 ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // Exchangers的Transporters的bind里,通过netty绑定开启服务 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } }