目标:介绍远程调用中跟http协议相关的设计和实现,介绍dubbo-rpc-http的源码。
基于HTTP表单的远程调用协议,采用 Spring 的HttpInvoker实现,关于http协议就不用多说了吧。
该类继承了RemoteInvocation类,是在RemoteInvocation上增加了泛化调用的参数设置,以及增加了dubbo本身需要的附加值设置。
public class HttpRemoteInvocation extends RemoteInvocation { private static final long serialVersionUID = 1L; /** * dubbo的附加值名称 */ private static final String dubboAttachmentsAttrName = "dubbo.attachments"; public HttpRemoteInvocation(MethodInvocation methodInvocation) { super(methodInvocation); // 把附加值加入到会话域的属性里面 addAttribute(dubboAttachmentsAttrName, new HashMap<String, String>(RpcContext.getContext().getAttachments())); } @Override public Object invoke(Object targetObject) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { // 获得上下文 RpcContext context = RpcContext.getContext(); // 获得附加值 context.setAttachments((Map<String, String>) getAttribute(dubboAttachmentsAttrName)); // 泛化标志 String generic = (String) getAttribute(Constants.GENERIC_KEY); // 如果不为空,则设置泛化标志 if (StringUtils.isNotEmpty(generic)) { context.setAttachment(Constants.GENERIC_KEY, generic); } try { // 调用下一个调用链 return super.invoke(targetObject); } finally { context.setAttachments(null); } } } 复制代码
该类是http实现的核心,跟我在《dubbo源码解析(二十五)远程调用——hessian协议》中讲到的HessianProtocol实现有很多地方相似。
/** * 默认的端口号 */ public static final int DEFAULT_PORT = 80; /** * http服务器集合 */ private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>(); /** * Spring HttpInvokerServiceExporter 集合 */ private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>(); /** * HttpBinder对象 */ private HttpBinder httpBinder; 复制代码
@Override protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException { // 获得ip地址 String addr = getAddr(url); // 获得http服务器 HttpServer server = serverMap.get(addr); // 如果服务器为空,则重新创建服务器,并且加入到集合 if (server == null) { server = httpBinder.bind(url, new InternalHandler()); serverMap.put(addr, server); } // 获得服务path final String path = url.getAbsolutePath(); // 加入集合 skeletonMap.put(path, createExporter(impl, type)); // 通用path final String genericPath = path + "/" + Constants.GENERIC_KEY; // 添加泛化的服务调用 skeletonMap.put(genericPath, createExporter(impl, GenericService.class)); return new Runnable() { @Override public void run() { skeletonMap.remove(path); skeletonMap.remove(genericPath); } }; } 复制代码
该方法是暴露服务等逻辑,因为dubbo实现http协议采用了Spring 的HttpInvoker实现,所以调用了createExporter方法来创建创建HttpInvokerServiceExporter。
private <T> HttpInvokerServiceExporter createExporter(T impl, Class<?> type) { // 创建HttpInvokerServiceExporter final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter(); // 设置要访问的服务的接口 httpServiceExporter.setServiceInterface(type); // 设置服务实现 httpServiceExporter.setService(impl); try { // 在BeanFactory设置了所有提供的bean属性,初始化bean的时候执行,可以针对某个具体的bean进行配 httpServiceExporter.afterPropertiesSet(); } catch (Exception e) { throw new RpcException(e.getMessage(), e); } return httpServiceExporter; } 复制代码
该方法是创建一个spring 的HttpInvokerServiceExporter。
@Override @SuppressWarnings("unchecked") protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException { // 获得泛化配置 final String generic = url.getParameter(Constants.GENERIC_KEY); // 是否为泛化调用 final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class); // 创建HttpInvokerProxyFactoryBean final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean(); // 设置RemoteInvocation的工厂类 httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() { /** * 为给定的AOP方法调用创建一个新的RemoteInvocation对象。 * @param methodInvocation * @return */ @Override public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) { // 新建一个HttpRemoteInvocation RemoteInvocation invocation = new HttpRemoteInvocation(methodInvocation); // 如果是泛化调用 if (isGeneric) { // 设置标志 invocation.addAttribute(Constants.GENERIC_KEY, generic); } return invocation; } }); // 获得identity message String key = url.toIdentityString(); // 如果是泛化调用 if (isGeneric) { key = key + "/" + Constants.GENERIC_KEY; } // 设置服务url httpProxyFactoryBean.setServiceUrl(key); // 设置服务接口 httpProxyFactoryBean.setServiceInterface(serviceType); // 获得客户端参数 String client = url.getParameter(Constants.CLIENT_KEY); if (client == null || client.length() == 0 || "simple".equals(client)) { // 创建SimpleHttpInvokerRequestExecutor连接池 使用的是JDK HttpClient SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() { @Override protected void prepareConnection(HttpURLConnection con, int contentLength) throws IOException { super.prepareConnection(con, contentLength); // 设置读取超时时间 con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); // 设置连接超时时间 con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); } }; httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor); } else if ("commons".equals(client)) { // 创建 HttpComponentsHttpInvokerRequestExecutor连接池 使用的是Apache HttpClient HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor(); // 设置读取超时时间 httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); // 设置连接超时时间 httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor); } else { throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons"); } httpProxyFactoryBean.afterPropertiesSet(); // 返回HttpInvokerProxyFactoryBean对象 return (T) httpProxyFactoryBean.getObject(); } 复制代码
该方法是服务引用的方法,其中根据url配置simple还是commons来选择创建连接池的方式。其中的区别就是SimpleHttpInvokerRequestExecutor使用的是JDK HttpClient,HttpComponentsHttpInvokerRequestExecutor 使用的是Apache HttpClient。
@Override protected int getErrorCode(Throwable e) { if (e instanceof RemoteAccessException) { e = e.getCause(); } if (e != null) { Class<?> cls = e.getClass(); if (SocketTimeoutException.class.equals(cls)) { // 返回超时异常 return RpcException.TIMEOUT_EXCEPTION; } else if (IOException.class.isAssignableFrom(cls)) { // 返回网络异常 return RpcException.NETWORK_EXCEPTION; } else if (ClassNotFoundException.class.isAssignableFrom(cls)) { // 返回序列化异常 return RpcException.SERIALIZATION_EXCEPTION; } } return super.getErrorCode(e); } 复制代码
该方法是处理异常情况,设置错误码。
private class InternalHandler implements HttpHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { // 获得请求uri String uri = request.getRequestURI(); // 获得服务暴露者HttpInvokerServiceExporter对象 HttpInvokerServiceExporter skeleton = skeletonMap.get(uri); // 如果不是post,则返回码设置500 if (!request.getMethod().equalsIgnoreCase("POST")) { response.setStatus(500); } else { // 远程地址放到上下文 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); try { // 调用下一个调用 skeleton.handleRequest(request, response); } catch (Throwable e) { throw new ServletException(e); } } } } 复制代码
该内部类实现了HttpHandler,做了设置远程地址的逻辑。