接上一篇 dubbo-server 之后,再来看一下 dubbo-client 是如何工作的。
dubbo提供者服务示例, 其结构是这样的!
dubbo://192.168.11.6:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=12720&side=provider×tamp=1534902103892
dubbo消费者示例,其结构是这样的!
dubbo://192.168.11.6:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=17400&qos.port=33333®ister.ip=192.168.11.6&remote.timestamp=1537448440181&side=consumer×tamp=1537871015998
官网可以运行起来的实例:
// 提供者: public class Provider { public static void main(String[] args) throws Exception { //Prevent to get IPV6 address,this way only work in debug mode //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not System.setProperty("java.net.preferIPv4Stack", "true"); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"}); context.start(); // 这个比较巧妙, 只需停留在当前点,不按下enter键,服务就会一直存在,等待消费者连接,而且无需真正提供一个监听服务 System.in.read(); // press any key to exit } } // 消费者: public class Consumer { public static void main(String[] args) { //Prevent to get IPV6 address,this way only work in debug mode //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not System.setProperty("java.net.preferIPv4Stack", "true"); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); context.start(); DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy String hello = demoService.sayHello("world"); // call remote method, 调用远程服务和本地服务一样,这是其优势所在 System.out.println(hello); // get result } } }
以上代码可以直接运行起来,但是建议还是个搭建一个Zookeeper来用用,因为线上基本都是Zk的,搭建也很简单,可参考:ZooKeeper 搭建笔记
初始化Context,如上一篇server过程! Rpc框架dubbo-server(v2.6.3) 源码阅读(一)
// DubboNamespaceHandler, provider与consumer主要的差别在于xml标签的获取不一致, public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } @Override public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); // provider解析 registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); // consumer解析 registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser()); } }
// 主要看获取bean的过程
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
// spring, 获取bean, 通过 org.springframework.beans.factory.support.DefaultListableBeanFactory, 获取 @Override public Object getBean(String name) throws BeansException { assertBeanFactoryActive(); return getBeanFactory().getBean(name); } protected <T> T doGetBean( final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly) throws BeansException { final String beanName = transformedBeanName(name); Object bean; // Eagerly check singleton cache for manually registered singletons. // 通过该方法获取bean代理实例,com.alibaba.dubbo.config.spring.ReferenceBean Object sharedInstance = getSingleton(beanName); if (sharedInstance != null && args == null) { if (logger.isDebugEnabled()) { if (isSingletonCurrentlyInCreation(beanName)) { logger.debug("Returning eagerly cached instance of singleton bean '" + beanName + "' that is not fully initialized yet - a consequence of a circular reference"); } else { logger.debug("Returning cached instance of singleton bean '" + beanName + "'"); } } bean = getObjectForBeanInstance(sharedInstance, name, beanName, null); } else { // Fail if we're already creating this bean instance: // We're assumably within a circular reference. if (isPrototypeCurrentlyInCreation(beanName)) { throw new BeanCurrentlyInCreationException(beanName); } // Check if bean definition exists in this factory. BeanFactory parentBeanFactory = getParentBeanFactory(); if (parentBeanFactory != null && !containsBeanDefinition(beanName)) { // Not found -> check parent. String nameToLookup = originalBeanName(name); if (args != null) { // Delegation to parent with explicit args. return (T) parentBeanFactory.getBean(nameToLookup, args); } else { // No args -> delegate to standard getBean method. return parentBeanFactory.getBean(nameToLookup, requiredType); } } if (!typeCheckOnly) { markBeanAsCreated(beanName); } try { final RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName); checkMergedBeanDefinition(mbd, beanName, args); // Guarantee initialization of beans that the current bean depends on. String[] dependsOn = mbd.getDependsOn(); if (dependsOn != null) { for (String dep : dependsOn) { if (isDependent(beanName, dep)) { throw new BeanCreationException(mbd.getResourceDescription(), beanName, "Circular depends-on relationship between '" + beanName + "' and '" + dep + "'"); } registerDependentBean(dep, beanName); try { getBean(dep); } catch (NoSuchBeanDefinitionException ex) { throw new BeanCreationException(mbd.getResourceDescription(), beanName, "'" + beanName + "' depends on missing bean '" + dep + "'", ex); } } } // Create bean instance. if (mbd.isSingleton()) { sharedInstance = getSingleton(beanName, new ObjectFactory<Object>() { @Override public Object getObject() throws BeansException { try { return createBean(beanName, mbd, args); } catch (BeansException ex) { // Explicitly remove instance from singleton cache: It might have been put there // eagerly by the creation process, to allow for circular reference resolution. // Also remove any beans that received a temporary reference to the bean. destroySingleton(beanName); throw ex; } } }); bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd); } else if (mbd.isPrototype()) { // It's a prototype -> create a new instance. Object prototypeInstance = null; try { beforePrototypeCreation(beanName); prototypeInstance = createBean(beanName, mbd, args); } finally { afterPrototypeCreation(beanName); } bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd); } else { String scopeName = mbd.getScope(); final Scope scope = this.scopes.get(scopeName); if (scope == null) { throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'"); } try { Object scopedInstance = scope.get(beanName, new ObjectFactory<Object>() { @Override public Object getObject() throws BeansException { beforePrototypeCreation(beanName); try { return createBean(beanName, mbd, args); } finally { afterPrototypeCreation(beanName); } } }); bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd); } catch (IllegalStateException ex) { throw new BeanCreationException(beanName, "Scope '" + scopeName + "' is not active for the current thread; consider " + "defining a scoped proxy for this bean if you intend to refer to it from a singleton", ex); } } } catch (BeansException ex) { cleanupAfterBeanCreationFailure(beanName); throw ex; } } // Check if required type matches the type of the actual bean instance. if (requiredType != null && bean != null && !requiredType.isInstance(bean)) { try { return getTypeConverter().convertIfNecessary(bean, requiredType); } catch (TypeMismatchException ex) { if (logger.isDebugEnabled()) { logger.debug("Failed to convert bean '" + name + "' to required type '" + ClassUtils.getQualifiedName(requiredType) + "'", ex); } throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass()); } } return (T) bean; } View Code
// ApplicationConfig 初始化时,调用dubbo实例进行初始化
// org.springframework.beans.BeanUtils.instantiateClass 初始化 ReferenceConfig 类,通过反射调用 // public com.alibaba.dubbo.config.RegistryConfig() public static <T> T instantiateClass(Constructor<T> ctor, Object... args) throws BeanInstantiationException { Assert.notNull(ctor, "Constructor must not be null"); try { ReflectionUtils.makeAccessible(ctor); // 生成新的实例 return ctor.newInstance(args); } catch (InstantiationException ex) { throw new BeanInstantiationException(ctor, "Is it an abstract class?", ex); } catch (IllegalAccessException ex) { throw new BeanInstantiationException(ctor, "Is the constructor accessible?", ex); } catch (IllegalArgumentException ex) { throw new BeanInstantiationException(ctor, "Illegal arguments for constructor", ex); } catch (InvocationTargetException ex) { throw new BeanInstantiationException(ctor, "Constructor threw exception", ex.getTargetException()); } }
// 通过getBean, 转移到 ReferenceBean.getObject() 触发代理初始化
// org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean, 触发 创建bean操作,触发 getObject() 进行初始化 protected <T> T doGetBean( final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly) throws BeansException { final String beanName = transformedBeanName(name); Object bean; // Eagerly check singleton cache for manually registered singletons. Object sharedInstance = getSingleton(beanName); if (sharedInstance != null && args == null) { if (logger.isDebugEnabled()) { if (isSingletonCurrentlyInCreation(beanName)) { logger.debug("Returning eagerly cached instance of singleton bean '" + beanName + "' that is not fully initialized yet - a consequence of a circular reference"); } else { logger.debug("Returning cached instance of singleton bean '" + beanName + "'"); } } bean = getObjectForBeanInstance(sharedInstance, name, beanName, null); } else { // Fail if we're already creating this bean instance: // We're assumably within a circular reference. if (isPrototypeCurrentlyInCreation(beanName)) { throw new BeanCurrentlyInCreationException(beanName); } // Check if bean definition exists in this factory. BeanFactory parentBeanFactory = getParentBeanFactory(); if (parentBeanFactory != null && !containsBeanDefinition(beanName)) { // Not found -> check parent. String nameToLookup = originalBeanName(name); if (args != null) { // Delegation to parent with explicit args. return (T) parentBeanFactory.getBean(nameToLookup, args); } else { // No args -> delegate to standard getBean method. return parentBeanFactory.getBean(nameToLookup, requiredType); } } if (!typeCheckOnly) { markBeanAsCreated(beanName); } try { final RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName); checkMergedBeanDefinition(mbd, beanName, args); // Guarantee initialization of beans that the current bean depends on. String[] dependsOn = mbd.getDependsOn(); if (dependsOn != null) { for (String dep : dependsOn) { if (isDependent(beanName, dep)) { throw new BeanCreationException(mbd.getResourceDescription(), beanName, "Circular depends-on relationship between '" + beanName + "' and '" + dep + "'"); } registerDependentBean(dep, beanName); try { getBean(dep); } catch (NoSuchBeanDefinitionException ex) { throw new BeanCreationException(mbd.getResourceDescription(), beanName, "'" + beanName + "' depends on missing bean '" + dep + "'", ex); } } } // Create bean instance. if (mbd.isSingleton()) { // 此处进行bean创建 sharedInstance = getSingleton(beanName, new ObjectFactory<Object>() { @Override public Object getObject() throws BeansException { try { return createBean(beanName, mbd, args); } catch (BeansException ex) { // Explicitly remove instance from singleton cache: It might have been put there // eagerly by the creation process, to allow for circular reference resolution. // Also remove any beans that received a temporary reference to the bean. destroySingleton(beanName); throw ex; } } }); bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd); } else if (mbd.isPrototype()) { // It's a prototype -> create a new instance. Object prototypeInstance = null; try { beforePrototypeCreation(beanName); prototypeInstance = createBean(beanName, mbd, args); } finally { afterPrototypeCreation(beanName); } bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd); } else { String scopeName = mbd.getScope(); final Scope scope = this.scopes.get(scopeName); if (scope == null) { throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'"); } try { Object scopedInstance = scope.get(beanName, new ObjectFactory<Object>() { @Override public Object getObject() throws BeansException { beforePrototypeCreation(beanName); try { return createBean(beanName, mbd, args); } finally { afterPrototypeCreation(beanName); } } }); bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd); } catch (IllegalStateException ex) { throw new BeanCreationException(beanName, "Scope '" + scopeName + "' is not active for the current thread; consider " + "defining a scoped proxy for this bean if you intend to refer to it from a singleton", ex); } } } catch (BeansException ex) { cleanupAfterBeanCreationFailure(beanName); throw ex; } } // Check if required type matches the type of the actual bean instance. if (requiredType != null && bean != null && !requiredType.isInstance(bean)) { try { return getTypeConverter().convertIfNecessary(bean, requiredType); } catch (TypeMismatchException ex) { if (logger.isDebugEnabled()) { logger.debug("Failed to convert bean '" + name + "' to required type '" + ClassUtils.getQualifiedName(requiredType) + "'", ex); } throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass()); } } return (T) bean; } // org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton public Object getSingleton(String beanName, ObjectFactory<?> singletonFactory) { Assert.notNull(beanName, "'beanName' must not be null"); synchronized (this.singletonObjects) { Object singletonObject = this.singletonObjects.get(beanName); if (singletonObject == null) { if (this.singletonsCurrentlyInDestruction) { throw new BeanCreationNotAllowedException(beanName, "Singleton bean creation not allowed while singletons of this factory are in destruction " + "(Do not request a bean from a BeanFactory in a destroy method implementation!)"); } if (logger.isDebugEnabled()) { logger.debug("Creating shared instance of singleton bean '" + beanName + "'"); } beforeSingletonCreation(beanName); boolean newSingleton = false; boolean recordSuppressedExceptions = (this.suppressedExceptions == null); if (recordSuppressedExceptions) { this.suppressedExceptions = new LinkedHashSet<Exception>(); } try { singletonObject = singletonFactory.getObject(); newSingleton = true; } catch (IllegalStateException ex) { // Has the singleton object implicitly appeared in the meantime -> // if yes, proceed with it since the exception indicates that state. singletonObject = this.singletonObjects.get(beanName); if (singletonObject == null) { throw ex; } } catch (BeanCreationException ex) { if (recordSuppressedExceptions) { for (Exception suppressedException : this.suppressedExceptions) { ex.addRelatedCause(suppressedException); } } throw ex; } finally { if (recordSuppressedExceptions) { this.suppressedExceptions = null; } afterSingletonCreation(beanName); } if (newSingleton) { // ref是在此处进行初始化的,有点神奇 addSingleton(beanName, singletonObject); } } return (singletonObject != NULL_OBJECT ? singletonObject : null); } } // org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.addSingleton protected void addSingleton(String beanName, Object singletonObject) { // 就是该 synchronized 方法,触发了 ReferenceConfig.ref 的初始化,实际上是触发了 AbstractConfig.toString() 方法 synchronized (this.singletonObjects) { this.singletonObjects.put(beanName, (singletonObject != null ? singletonObject : NULL_OBJECT)); this.singletonFactories.remove(beanName); this.earlySingletonObjects.remove(beanName); this.registeredSingletons.add(beanName); } } View Code
通过 ReferenceBean.getObject() 方法,获取代理:
// 由 com.alibaba.dubbo.config.spring.ReferenceBean.getObject 进行获取代理,从而进行代理创建 @Override public Object getObject() throws Exception { return get(); } // 调用父类 com.alibaba.dubbo.config.ReferenceConfig.get() 方法, 获取实例 public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } if (ref == null) { // 未初始化时,触发一次初始化 init(); } return ref; }
真正的代理之路开始了:
// 初始化代理,并设值到 ref 变量中 private void init() { if (initialized) { return; } initialized = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:reference interface=/"/" /> interface not allow null!"); } // 在此处添加堆栈打印,排除路径,如下 new Throwable("*******************************ReferenceConfig.init trace dump").printStackTrace(); new Throwable().printStackTrace(); // get consumer's global configuration checkDefault(); appendProperties(this); if (getGeneric() == null && getConsumer() != null) { setGeneric(getConsumer().getGeneric()); } if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); } String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { url = resolve; if (logger.isWarnEnabled()) { if (resolveFile != null) { logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); } else { logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); } } } if (consumer != null) { if (application == null) { application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } checkApplication(); // 检查mock配置情况,尝试调用mock实例 checkStubAndMock(interfaceClass); Map<String, String> map = new HashMap<String, String>(); Map<Object, Object> attributes = new HashMap<Object, Object>(); map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (!isGeneric()) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } // 获取wrapper方法 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } map.put(Constants.INTERFACE_KEY, interfaceName); appendParameters(map, application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); String prefix = StringUtils.getServiceKey(map); if (methods != null && !methods.isEmpty()) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } appendAttributes(attributes, method, prefix + "." + method.getName()); checkAndConvertImplicitConfig(method, map, attributes); } } String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); if (hostToRegistry == null || hostToRegistry.length() == 0) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //attributes are stored by system context. StaticContext.getSystemContext().putAll(attributes); ref = createProxy(map); ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); }
控制权留给mock,先检查下是否有设置,有则先检查是否正确。
// AbstractInterfaceConfig.checkStubAndMock() protected void checkStubAndMock(Class<?> interfaceClass) { if (ConfigUtils.isNotEmpty(local)) { Class<?> localClass = ConfigUtils.isDefault(local) ? ReflectUtils.forName(interfaceClass.getName() + "Local") : ReflectUtils.forName(local); if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName()); } try { ReflectUtils.findConstructor(localClass, interfaceClass); } catch (NoSuchMethodException e) { throw new IllegalStateException("No such constructor /"public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")/" in local implementation class " + localClass.getName()); } } if (ConfigUtils.isNotEmpty(stub)) { Class<?> localClass = ConfigUtils.isDefault(stub) ? ReflectUtils.forName(interfaceClass.getName() + "Stub") : ReflectUtils.forName(stub); if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName()); } try { ReflectUtils.findConstructor(localClass, interfaceClass); } catch (NoSuchMethodException e) { throw new IllegalStateException("No such constructor /"public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")/" in local implementation class " + localClass.getName()); } } if (ConfigUtils.isNotEmpty(mock)) { if (mock.startsWith(Constants.RETURN_PREFIX)) { String value = mock.substring(Constants.RETURN_PREFIX.length()); try { MockInvoker.parseMockValue(value); } catch (Exception e) { throw new IllegalStateException("Illegal mock json value in <dubbo:service ... mock=/"" + mock + "/" />"); } } else { Class<?> mockClass = ConfigUtils.isDefault(mock) ? ReflectUtils.forName(interfaceClass.getName() + "Mock") : ReflectUtils.forName(mock); if (!interfaceClass.isAssignableFrom(mockClass)) { throw new IllegalStateException("The mock implementation class " + mockClass.getName() + " not implement interface " + interfaceClass.getName()); } try { mockClass.getConstructor(new Class<?>[0]); } catch (NoSuchMethodException e) { throw new IllegalStateException("No such empty constructor /"public " + mockClass.getSimpleName() + "()/" in mock implementation class " + mockClass.getName()); } } } }
获取wrapper,以备后续调用:
// com.alibaba.dubbo.common.bytecode.Wrapper public static Wrapper getWrapper(Class<?> c) { while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class. c = c.getSuperclass(); if (c == Object.class) return OBJECT_WRAPPER; Wrapper ret = WRAPPER_MAP.get(c); if (ret == null) { ret = makeWrapper(c); WRAPPER_MAP.put(c, ret); } return ret; } private static Wrapper makeWrapper(Class<?> c) { if (c.isPrimitive()) throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c); String name = c.getName(); ClassLoader cl = ClassHelper.getClassLoader(c); StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ "); StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ "); StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ "); c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // <property name, property types> Map<String, Method> ms = new LinkedHashMap<String, Method>(); // <method desc, Method instance> List<String> mns = new ArrayList<String>(); // method names. List<String> dmns = new ArrayList<String>(); // declaring method names. // get all public field. for (Field f : c.getFields()) { String fn = f.getName(); Class<?> ft = f.getType(); if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) continue; c1.append(" if( $2.equals(/"").append(fn).append("/") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }"); c2.append(" if( $2.equals(/"").append(fn).append("/") ){ return ($w)w.").append(fn).append("; }"); pts.put(fn, ft); } Method[] methods = c.getMethods(); // get all public method. boolean hasMethod = hasMethods(methods); if (hasMethod) { c3.append(" try{"); } for (Method m : methods) { if (m.getDeclaringClass() == Object.class) //ignore Object's method. continue; String mn = m.getName(); c3.append(" if( /"").append(mn).append("/".equals( $2 ) "); int len = m.getParameterTypes().length; c3.append(" && ").append(" $3.length == ").append(len); boolean override = false; for (Method m2 : methods) { if (m != m2 && m.getName().equals(m2.getName())) { override = true; break; } } if (override) { if (len > 0) { for (int l = 0; l < len; l++) { c3.append(" && ").append(" $3[").append(l).append("].getName().equals(/"") .append(m.getParameterTypes()[l].getName()).append("/")"); } } } c3.append(" ) { "); if (m.getReturnType() == Void.TYPE) c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;"); else c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");"); c3.append(" }"); mns.add(mn); if (m.getDeclaringClass() == c) dmns.add(mn); ms.put(ReflectUtils.getDesc(m), m); } if (hasMethod) { c3.append(" } catch(Throwable e) { "); c3.append(" throw new java.lang.reflect.InvocationTargetException(e); "); c3.append(" }"); } c3.append(" throw new " + NoSuchMethodException.class.getName() + "(/"Not found method ///"/"+$2+/"///" in class " + c.getName() + "./"); }"); // deal with get/set method. Matcher matcher; for (Map.Entry<String, Method> entry : ms.entrySet()) { String md = entry.getKey(); Method method = (Method) entry.getValue(); if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { String pn = propertyName(matcher.group(1)); c2.append(" if( $2.equals(/"").append(pn).append("/") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) { String pn = propertyName(matcher.group(1)); c2.append(" if( $2.equals(/"").append(pn).append("/") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { Class<?> pt = method.getParameterTypes()[0]; String pn = propertyName(matcher.group(1)); c1.append(" if( $2.equals(/"").append(pn).append("/") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }"); pts.put(pn, pt); } } c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(/"Not found property ///"/"+$2+/"///" filed or setter method in class " + c.getName() + "./"); }"); c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(/"Not found property ///"/"+$2+/"///" filed or setter method in class " + c.getName() + "./"); }"); // make class long id = WRAPPER_CLASS_COUNTER.getAndIncrement(); ClassGenerator cc = ClassGenerator.newInstance(cl); cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id); cc.setSuperClass(Wrapper.class); cc.addDefaultConstructor(); cc.addField("public static String[] pns;"); // property name array. cc.addField("public static " + Map.class.getName() + " pts;"); // property type map. cc.addField("public static String[] mns;"); // all method name array. cc.addField("public static String[] dmns;"); // declared method name array. for (int i = 0, len = ms.size(); i < len; i++) cc.addField("public static Class[] mts" + i + ";"); cc.addMethod("public String[] getPropertyNames(){ return pns; }"); cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }"); cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }"); cc.addMethod("public String[] getMethodNames(){ return mns; }"); cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }"); cc.addMethod(c1.toString()); cc.addMethod(c2.toString()); cc.addMethod(c3.toString()); try { Class<?> wc = cc.toClass(); // setup static field. wc.getField("pts").set(null, pts); wc.getField("pns").set(null, pts.keySet().toArray(new String[0])); wc.getField("mns").set(null, mns.toArray(new String[0])); wc.getField("dmns").set(null, dmns.toArray(new String[0])); int ix = 0; for (Method m : ms.values()) wc.getField("mts" + ix++).set(null, m.getParameterTypes()); return (Wrapper) wc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Throwable e) { throw new RuntimeException(e.getMessage(), e); } finally { cc.release(); ms.clear(); mns.clear(); dmns.clear(); } }
// InvokerInvocationHandler, 代理所有的dubbo请求处理
// InvokerInvocationHandler, 代理所有的dubbo请求处理 public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
// 小插曲,日志打印
//FailsafeLogger.warn(), 格式为: No spring extension(bean) named:defaultCompiler, try to find an extension(bean) of type java.lang.String, dubbo version: , current host: 192.168.11.6 @Override public void warn(String msg, Throwable e) { try { logger.warn(appendContextMessage(msg), e); } catch (Throwable t) { } } private String appendContextMessage(String msg) { return " [DUBBO] " + msg + ", dubbo version: " + Version.getVersion() + ", current host: " + NetUtils.getLocalHost(); } private Class<?> createAdaptiveExtensionClass() { String code = createAdaptiveExtensionClassCode(); ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); }
// wrapper0
// 生成的包装类如下: com.alibaba.dubbo.common.bytecode.Wrapper0 package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC; import com.alibaba.dubbo.demo.DemoService; import java.lang.reflect.InvocationTargetException; import java.util.Map; public class Wrapper0 extends Wrapper implements DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; public String[] getPropertyNames() { return pns; } public boolean hasProperty(String var1) { return pts.containsKey(var1); } public Class getPropertyType(String var1) { return (Class)pts.get(var1); } public String[] getMethodNames() { return mns; } public String[] getDeclaredMethodNames() { return dmns; } public void setPropertyValue(Object var1, String var2, Object var3) { try { DemoService var4 = (DemoService)var1; } catch (Throwable var6) { throw new IllegalArgumentException(var6); } throw new NoSuchPropertyException("Not found property /"" + var2 + "/" filed or setter method in class com.alibaba.dubbo.demo.DemoService."); } public Object getPropertyValue(Object var1, String var2) { try { DemoService var3 = (DemoService)var1; } catch (Throwable var5) { throw new IllegalArgumentException(var5); } throw new NoSuchPropertyException("Not found property /"" + var2 + "/" filed or setter method in class com.alibaba.dubbo.demo.DemoService."); } public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException { DemoService var5; try { var5 = (DemoService)var1; } catch (Throwable var8) { throw new IllegalArgumentException(var8); } try { if ("sayHello".equals(var2) && var3.length == 1) { return var5.sayHello((String)var4[0]); } } catch (Throwable var9) { throw new InvocationTargetException(var9); } throw new NoSuchMethodException("Not found method /"" + var2 + "/" in class com.alibaba.dubbo.demo.DemoService."); } public Wrapper0() { } }
// createProxy
// com.alibaba.dubbo.common.bytecode.ClassGenerator, 生成class实例 public Class<?> toClass() { return toClass(ClassHelper.getClassLoader(ClassGenerator.class), getClass().getProtectionDomain()); } public Class<?> toClass(ClassLoader loader, ProtectionDomain pd) { if (mCtc != null) mCtc.detach(); long id = CLASS_NAME_COUNTER.getAndIncrement(); try { CtClass ctcs = mSuperClass == null ? null : mPool.get(mSuperClass); if (mClassName == null) mClassName = (mSuperClass == null || javassist.Modifier.isPublic(ctcs.getModifiers()) ? ClassGenerator.class.getName() : mSuperClass + "$sc") + id; mCtc = mPool.makeClass(mClassName); if (mSuperClass != null) mCtc.setSuperclass(ctcs); // 添加dubbo ClassGenerator 生成的动态类的标志,接口方法为空 mCtc.addInterface(mPool.get(DC.class.getName())); // add dynamic class tag. if (mInterfaces != null) for (String cl : mInterfaces) mCtc.addInterface(mPool.get(cl)); if (mFields != null) for (String code : mFields) mCtc.addField(CtField.make(code, mCtc)); if (mMethods != null) { for (String code : mMethods) { if (code.charAt(0) == ':') mCtc.addMethod(CtNewMethod.copy(getCtMethod(mCopyMethods.get(code.substring(1))), code.substring(1, code.indexOf('(')), mCtc, null)); else mCtc.addMethod(CtNewMethod.make(code, mCtc)); } } if (mDefaultConstructor) mCtc.addConstructor(CtNewConstructor.defaultConstructor(mCtc)); if (mConstructors != null) { for (String code : mConstructors) { if (code.charAt(0) == ':') { mCtc.addConstructor(CtNewConstructor.copy(getCtConstructor(mCopyConstructors.get(code.substring(1))), mCtc, null)); } else { String[] sn = mCtc.getSimpleName().split("//$+"); // inner class name include $. mCtc.addConstructor(CtNewConstructor.make(code.replaceFirst(SIMPLE_NAME_TAG, sn[sn.length - 1]), mCtc)); } } } return mCtc.toClass(loader, pd); } catch (RuntimeException e) { throw e; } catch (NotFoundException e) { throw new RuntimeException(e.getMessage(), e); } catch (CannotCompileException e) { throw new RuntimeException(e.getMessage(), e); } } // 创建操作代理类 // ref = createProxy(map); 生成新proxy {methods=sayHello, timestamp=1537527140342, dubbo=2.0.2, register.ip=192.168.11.6, application=demo-consumer, check=false, side=consumer, pid=8776, interface=com.alibaba.dubbo.demo.DemoService, qos.port=33333} @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { // if a url is specified, don't do local reference isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { // by default, reference local service if there is isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration //[registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=8776&qos.port=33333®istry=zookeeper&timeout=2000×tamp=1537528008773] // 最后得到的地址是这样的: registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=14040&qos.port=33333&refer=application=demo-consumer%26check=false%26dubbo=2.0.2%26interface=com.alibaba.dubbo.demo.DemoService%26methods=sayHello%26pid=14040%26qos.port=33333%26register.ip=192.168.11.6%26side=consumer%26timestamp=1537844804936®istry=zookeeper&timeout=2000×tamp=1537847505648 List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { // 加载监控地址,以便进行上报数据 URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=/".../" /> to your spring config."); } } if (urls.size() == 1) { // Protocol$Adaptive, 动态生成的代理类,调用远程方法 com.alibaba.dubbo.remoting.transport.AbstractClient(), 先将自己注册到注册中心,再调用提供者方法 /** "main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at com.alibaba.dubbo.remoting.transport.AbstractClient.<init>(AbstractClient.java:80) at com.alibaba.dubbo.remoting.transport.netty.NettyClient.<init>(NettyClient.java:59) at com.alibaba.dubbo.remoting.transport.netty.NettyTransporter.connect(NettyTransporter.java:37) at com.alibaba.dubbo.remoting.Transporter$Adaptive.connect(Transporter$Adaptive.java:-1) at com.alibaba.dubbo.remoting.Transporters.connect(Transporters.java:75) at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger.connect(HeaderExchanger.java:39) at com.alibaba.dubbo.remoting.exchange.Exchangers.connect(Exchangers.java:109) at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.initClient(DubboProtocol.java:417) at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getSharedClient(DubboProtocol.java:384) - locked <0xb4f> (a java.lang.Object) at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getClients(DubboProtocol.java:355) at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.refer(DubboProtocol.java:337) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:108) at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:67) at com.alibaba.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1) at com.alibaba.dubbo.registry.integration.RegistryDirectory.toInvokers(RegistryDirectory.java:387) at com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(RegistryDirectory.java:253) at com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(RegistryDirectory.java:223) - locked <0xb2f> (a com.alibaba.dubbo.registry.integration.RegistryDirectory) at com.alibaba.dubbo.registry.support.AbstractRegistry.notify(AbstractRegistry.java:414) at com.alibaba.dubbo.registry.support.FailbackRegistry.doNotify(FailbackRegistry.java:280) at com.alibaba.dubbo.registry.support.FailbackRegistry.notify(FailbackRegistry.java:266) at com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe(ZookeeperRegistry.java:190) at com.alibaba.dubbo.registry.support.FailbackRegistry.subscribe(FailbackRegistry.java:196) at com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe(RegistryDirectory.java:159) at com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:307) at com.alibaba.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:288) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:106) at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:65) // 从此处开始进行调用 at com.alibaba.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1) at com.alibaba.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:395) at com.alibaba.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:334) at com.alibaba.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:163) - locked <0x76b> (a com.alibaba.dubbo.config.spring.ReferenceBean) at com.alibaba.dubbo.config.spring.ReferenceBean.getObject(ReferenceBean.java:66) at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:170) at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103) - locked <0x790> (a java.util.concurrent.ConcurrentHashMap) at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1640) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:254) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1080) at com.alibaba.dubbo.demo.consumer.Consumer.main(Consumer.java:30) */ invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && !invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // create service proxy return (T) proxyFactory.getProxy(invoker); } // loadRegistries protected List<URL> loadRegistries(boolean provider) { // 加载前,先检查是否已初始化 checkRegistry(); List<URL> registryList = new ArrayList<URL>(); if (registries != null && !registries.isEmpty()) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); appendParameters(map, application); appendParameters(map, config); map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 解析注册中心地址列表 List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); url = url.setProtocol(Constants.REGISTRY_PROTOCOL); if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
//
// com.alibaba.dubbo.rpc.Protocol$Adaptive, 动态生成的类,去调用远程方法 public class Protocol$Adaptive implements Protocol { public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public Invoker refer(Class var1, URL var2) throws RpcException { if (var2 == null) { throw new IllegalArgumentException("url == null"); } else { String var4 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol(); if (var4 == null) { throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])"); } else { Protocol var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var4); return var5.refer(var1, var2); } } } public Exporter export(Invoker var1) throws RpcException { if (var1 == null) { throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); } else if (var1.getUrl() == null) { throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); } else { URL var2 = var1.getUrl(); String var3 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol(); if (var3 == null) { throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])"); } else { Protocol var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3); return var4.export(var1); } } } public Protocol$Adaptive() { } }
// com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe, 订阅服务 public void subscribe(URL url) { setConsumerUrl(url); registry.subscribe(url, this); } // 以zookeeper订阅为例,看一下订阅过程,com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe() @Override protected void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } // com.alibaba.dubbo.registry.support.AbstractRegistry.notify, 获取服务提供者地址 protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } // 通知观察者 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); listener.notify(categoryList); } }
// 获取registry,以决定调用哪个协议实现 @Override public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 创建registry, 该方法为抽象方法只能由具体的实现类实现 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } } // 该方法为在生成代理时自动生成实现 protected abstract Registry createRegistry(URL url); // 生成的registry代理类如下 package com.alibaba.dubbo.registry; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class RegistryFactory$Adaptive implements RegistryFactory { public Registry getRegistry(URL var1) { if (var1 == null) { throw new IllegalArgumentException("url == null"); } else { String var3 = var1.getProtocol() == null ? "dubbo" : var1.getProtocol(); if (var3 == null) { throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + var1.toString() + ") use keys([protocol])"); } else { RegistryFactory var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(var3); return var4.getRegistry(var1); } } } public RegistryFactory$Adaptive() { } }
// 创建 Registry, todo: 返回xxxRegistry 实例 @SuppressWarnings("unchecked") private T createExtension(String name) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && !wrapperClasses.isEmpty()) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } } // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.refer, @Override public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. 初始化调用远程方法 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } // private ExchangeClient[] getClients(URL url) { // whether to share connection boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
// netty 连接远程服务过程
// 初始化调用端,此处开始连接netty private ExchangeClient initClient(URL url) { // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client; try { // connection should be lazy if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; } // com.alibaba.dubbo.remoting.exchange.Exchangers.connect public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler); } // com.alibaba.dubbo.remoting.Transporters.connect public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().connect(url, handler); } // com.alibaba.dubbo.remoting.transport.netty public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } } // com.alibaba.dubbo.remoting.transport.AbstractClient public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); // The default reconnection interval is 2s, 1800 means warning interval is 1 hour. reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); try { // 模板方法,由子类实现 doOpen(); } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } try { // connect. connect(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); } } catch (RemotingException t) { if (url.getParameter(Constants.CHECK_KEY, true)) { close(); throw t; } else { logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t); } } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); } // 将连接步骤固化,调用实现类的 doConnect protected void connect() throws RemotingException { connectLock.lock(); try { if (isConnected()) { return; } initConnectStatusCheckCommand(); doConnect(); if (!isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getTimeout() + "ms."); } else { if (logger.isInfoEnabled()) { logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", channel is " + this.getChannel()); } } reconnect_count.set(0); reconnect_error_log_flag.set(false); } catch (RemotingException e) { throw e; } catch (Throwable e) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); } finally { connectLock.unlock(); } } // com.alibaba.dubbo.remoting.transport.netty.NettyClient. public class NettyClient extends AbstractClient { private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); // ChannelFactory's closure has a DirectMemory leak, using static to avoid // https://issues.jboss.org/browse/NETTY-424 private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)), Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)), Constants.DEFAULT_IO_THREADS); private ClientBootstrap bootstrap; private volatile Channel channel; // volatile, please copy reference to use public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { super(url, wrapChannelHandler(url, handler)); } @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getTimeout()); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); } @Override protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); ChannelFuture future = bootstrap.connect(getConnectAddress()); try { boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.getChannel(); newChannel.setInterestOps(Channel.OP_READ_WRITE); try { // Close old channel Channel oldChannel = NettyClient.this.channel; // copy reference if (oldChannel != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); } oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { if (NettyClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info("Close new netty channel " + newChannel + ", because the client closed."); } newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } else if (future.getCause() != null) { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause()); } else { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); } } finally { if (!isConnected()) { future.cancel(); } } } @Override protected void doDisConnect() throws Throwable { try { NettyChannel.removeChannelIfDisconnected(channel); } catch (Throwable t) { logger.warn(t.getMessage()); } } @Override protected void doClose() throws Throwable { /*try { bootstrap.releaseExternalResources(); } catch (Throwable t) { logger.warn(t.getMessage()); }*/ } @Override protected com.alibaba.dubbo.remoting.Channel getChannel() { Channel c = channel; if (c == null || !c.isConnected()) return null; return NettyChannel.getOrAddChannel(c, getUrl(), this); } } // com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper.getProxy @Override @SuppressWarnings({"unchecked", "rawtypes"}) public <T> T getProxy(Invoker<T> invoker) throws RpcException { T proxy = proxyFactory.getProxy(invoker); if (GenericService.class != invoker.getInterface()) { String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY)); if (ConfigUtils.isNotEmpty(stub)) { Class<?> serviceType = invoker.getInterface(); if (ConfigUtils.isDefault(stub)) { if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) { stub = serviceType.getName() + "Stub"; } else { stub = serviceType.getName() + "Local"; } } try { Class<?> stubClass = ReflectUtils.forName(stub); if (!serviceType.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName()); } try { Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType); proxy = (T) constructor.newInstance(new Object[]{proxy}); //export stub service URL url = invoker.getUrl(); if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) { url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ",")); url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString()); try { export(proxy, (Class) invoker.getInterface(), url); } catch (Exception e) { LOGGER.error("export a stub service error.", e); } } } catch (NoSuchMethodException e) { throw new IllegalStateException("No such constructor /"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")/" in stub implementation class " + stubClass.getName(), e); } } catch (Throwable t) { LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t); // ignore } } } return proxy; } View Code
/** * AbstractProxyFactory com.alibaba.dubbo.rpc.proxy.AbstractProxyFactory */ public abstract class AbstractProxyFactory implements ProxyFactory { @Override public <T> T getProxy(Invoker<T> invoker) throws RpcException { return getProxy(invoker, false); } @Override public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null; String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; interfaces = new Class<?>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); interfaces[len] = GenericService.class; } return getProxy(invoker, interfaces); } public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types); } // com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory, 调用 wrapper 去处理 public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } } // 保存调用路径 public static boolean initConsumerModel(String serviceName, ConsumerModel consumerModel) { if (consumedServices.putIfAbsent(serviceName, consumerModel) != null) { logger.warn("Already register the same consumer:" + serviceName); return false; } return true; }
// String hello = demoService.sayHello("world"); // call remote method,
// 启用该远程方法,其实是调用的一个代理类,动态生成
/** "main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:38) at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1) at com.alibaba.dubbo.demo.consumer.Consumer.main(Consumer.java:35) */ // 调用动态代理类的方法,其实为调用 InvocationHandler方法 public class proxy0 implements DC, EchoService, DemoService { public static Method[] methods; private InvocationHandler handler; public String sayHello(String var1) { Object[] var2 = new Object[]{var1}; Object var3 = this.handler.invoke(this, methods[0], var2); return (String)var3; } public Object $echo(Object var1) { Object[] var2 = new Object[]{var1}; Object var3 = this.handler.invoke(this, methods[1], var2); return (Object)var3; } public proxy0() { } public proxy0(InvocationHandler var1) { this.handler = var1; } } // com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke 方法,使用反射调用 invoker方法 public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 调用通用方法 return invoker.invoke(new RpcInvocation(method, args)).recreate(); } } // com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker. @Override public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { //no mock, 调用真实代理 result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; } // com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke @Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { // 获取负载均衡策略,默认为 random loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } // 如果是异步调用,会先创建一个调用id,以便在需要的时候使用 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } // 模板方法 protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; // 快速失败 invoker 调用 public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } // 调用AbstractClusterInvoker的负载均衡算法 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
// 负载均衡
// protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { //ignore overloaded method if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); if (sticky) { stickyInvoker = invoker; } return invoker; } private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 如果只有一个提供者,就不用负载均衡了 if (invokers.size() == 1) return invokers.get(0); if (loadbalance == null) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. int index = invokers.indexOf(invoker); try { //Avoid collision invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; } // 随机负载均衡策略的选择算法 public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights boolean sameWeight = true; // Every invoker has the same weight? for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); } } // filter 过滤器调用 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
// filter调用链
@Activate(group = Constants.CONSUMER, order = -10000) public class ConsumerContextFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { // 同步异步调用 RpcResult result = (RpcResult) invoker.invoke(invocation); RpcContext.getServerContext().setAttachments(result.getAttachments()); return result; } finally { RpcContext.getContext().clearAttachments(); } } } // com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke @Activate(group = Constants.CONSUMER) public class FutureFilter implements Filter { protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class); @Override public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); fireInvokeCallback(invoker, invocation); // need to configure if there's return value before the invocation in order to help invoker to judge if it's // necessary to return future. // 此处为链式调用,先调用 monitor, 再调用具体的方法 DubboInvoker Result result = invoker.invoke(invocation); if (isAsync) { // 如果url参数中标识为异步调用,则执行异步调用 asyncCallback(invoker, invocation); } else { // 否则同步调用 syncCallback(invoker, invocation, result); } return result; } private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) { if (result.hasException()) { fireThrowCallback(invoker, invocation, result.getException()); } else { fireReturnCallback(invoker, invocation, result.getValue()); } } private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) { Future<?> f = RpcContext.getContext().getFuture(); if (f instanceof FutureAdapter) { ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); future.setCallback(new ResponseCallback() { @Override public void done(Object rpcResult) { if (rpcResult == null) { logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName())); return; } ///must be rpcResult if (!(rpcResult instanceof Result)) { logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName())); return; } Result result = (Result) rpcResult; // 结果通知回调 if (result.hasException()) { fireThrowCallback(invoker, invocation, result.getException()); } else { fireReturnCallback(invoker, invocation, result.getValue()); } } @Override public void caught(Throwable exception) { fireThrowCallback(invoker, invocation, exception); } }); } } private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) { final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); if (onInvokeMethod == null && onInvokeInst == null) { return; } if (onInvokeMethod == null || onInvokeInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onInvokeMethod.isAccessible()) { onInvokeMethod.setAccessible(true); } Object[] params = invocation.getArguments(); try { onInvokeMethod.invoke(onInvokeInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } } private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) { final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY)); final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY)); //not set onreturn callback, 回调设置了 onreturn 方法 if (onReturnMethod == null && onReturnInst == null) { return; } if (onReturnMethod == null || onReturnInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onReturnMethod.isAccessible()) { onReturnMethod.setAccessible(true); } Object[] args = invocation.getArguments(); Object[] params; Class<?>[] rParaTypes = onReturnMethod.getParameterTypes(); if (rParaTypes.length > 1) { if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = result; params[1] = args; } else { params = new Object[args.length + 1]; params[0] = result; System.arraycopy(args, 0, params, 1, args.length); } } else { params = new Object[]{result}; } try { onReturnMethod.invoke(onReturnInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } } private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) { final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY)); final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY)); //onthrow callback not configured if (onthrowMethod == null && onthrowInst == null) { return; } if (onthrowMethod == null || onthrowInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onthrowMethod.isAccessible()) { onthrowMethod.setAccessible(true); } Class<?>[] rParaTypes = onthrowMethod.getParameterTypes(); if (rParaTypes[0].isAssignableFrom(exception.getClass())) { try { Object[] args = invocation.getArguments(); Object[] params; if (rParaTypes.length > 1) { if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = exception; params[1] = args; } else { params = new Object[args.length + 1]; params[0] = exception; System.arraycopy(args, 0, params, 1, args.length); } } else { params = new Object[]{exception}; } onthrowMethod.invoke(onthrowInst, params); } catch (Throwable e) { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e); } } else { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception); } } }
//
// 调用具体的远程方法类 public class DubboInvoker<T> extends AbstractInvoker<T> { private final ExchangeClient[] clients; private final AtomicPositiveInteger index = new AtomicPositiveInteger(); private final String version; private final ReentrantLock destroyLock = new ReentrantLock(); private final Set<Invoker<?>> invokers; public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) { this(serviceType, url, clients, null); } public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) { super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY}); this.clients = clients; // get version. this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0"); this.invokers = invokers; } // 实际调用 @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { // 单向调用,立马返回 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // 异步调用,立马返回,后续接收结果 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 一般调用都是这个,同步+超时调用 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override public boolean isAvailable() { if (!super.isAvailable()) return false; for (ExchangeClient client : clients) { if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) { //cannot write == not Available ? return true; } } return false; } }