微信公众号:如有问题或建议,请在下方留言;
最近更新:2019-01-03
因篇幅原因,上一部分内容请看: Spring Cloud Netflix Zuul源码分析之请求处理篇-上
该类的作用就是查找对应的路由信息,获取后端微服务的地址,保存到请求上下文,提供给路由过滤器使用。
1@Override 2public Object run() { 3 RequestContext ctx = RequestContext.getCurrentContext(); 4 final String requestURI = this.urlPathHelper.getPathWithinApplication(ctx.getRequest()); 5 // 根据URI从路由规则里获取对应的路由 6 Route route = this.routeLocator.getMatchingRoute(requestURI); 7 if (route != null) { 8 // 将获取到路由信息放入请求上下文 9 } else{ 10 // 找不到路由,则fallback到DispatcherServlet 11 } 12} 复制代码
这里特地列出简单URL是如何查找路由的源码,请看:
1// SimpleRouteLocator 2protected ZuulRoute getZuulRoute(String adjustedPath) { 3 if (!matchesIgnoredPatterns(adjustedPath)) { 4 for (Entry<String, ZuulRoute> entry : getRoutesMap().entrySet()) { 5 String pattern = entry.getKey(); 6 log.debug("Matching pattern:" + pattern); 7 // 利用正则表达式进行path匹配,找到对应的路由 8 if (this.pathMatcher.match(pattern, adjustedPath)) { 9 return entry.getValue(); 10 } 11 } 12 } 13 return null; 14} 复制代码
经过该过滤器执行后,请求上下文RequestContext内容为:
该过滤器的作用是调用原生httpClient发送请求到后端微服务,解析响应结果,写入请求上下文,提供给后置过滤器使用。
1@Override 2public Object run() { 3 RequestContext context = RequestContext.getCurrentContext(); 4 HttpServletRequest request = context.getRequest(); 5 // 根据请求构建Zuul请求的Headers 6 MultiValueMap<String, String> headers = this.helper 7 .buildZuulRequestHeaders(request); 8 // 根据请求构建Zuul请求的queryParams 9 MultiValueMap<String, String> params = this.helper 10 .buildZuulRequestQueryParams(request); 11 String verb = getVerb(request); 12 InputStream requestEntity = getRequestBody(request); 13 if (getContentLength(request) < 0) { 14 context.setChunkedRequestBody(); 15 } 16 // 根据请求构建Zuul请求的URI 17 String uri = this.helper.buildZuulRequestURI(request); 18 this.helper.addIgnoredHeaders(); 19 20 try { 21 // 调用原生httpClient转发请求 22 CloseableHttpResponse response = forward(this.httpClient, verb, uri, request, 23 headers, params, requestEntity); 24 // 保存响应结果 25 setResponse(response); 26 } 27 catch (Exception ex) { 28 throw new ZuulRuntimeException(handleException(ex)); 29 } 30 return null; 31} 复制代码
1private CloseableHttpResponse forward(CloseableHttpClient httpclient, String verb, 2 String uri, HttpServletRequest request, MultiValueMap<String, String> headers, 3 MultiValueMap<String, String> params, InputStream requestEntity) 4 throws Exception { 5 Map<String, Object> info = this.helper.debug(verb, uri, headers, params, 6 requestEntity); 7 // routeHost就是在前置过滤器PreDecorationFilter中添加的 8 URL host = RequestContext.getCurrentContext().getRouteHost(); 9 // 创建HttpHost,指定请求目标地址 10 HttpHost httpHost = getHttpHost(host); 11 uri = StringUtils.cleanPath(MULTIPLE_SLASH_PATTERN.matcher(host.getPath() + uri).replaceAll("/")); 12 long contentLength = getContentLength(request); 13 14 ContentType contentType = null; 15 16 if (request.getContentType() != null) { 17 contentType = ContentType.parse(request.getContentType()); 18 } 19 20 InputStreamEntity entity = new InputStreamEntity(requestEntity, contentLength, 21 contentType); 22 23 // 创建HttpRequest 24 HttpRequest httpRequest = buildHttpRequest(verb, uri, entity, headers, params, 25 request); 26 try { 27 log.debug(httpHost.getHostName() + " " + httpHost.getPort() + " " 28 + httpHost.getSchemeName()); 29 // 调用原生httpClient发送请求 30 CloseableHttpResponse zuulResponse = forwardRequest(httpclient, httpHost, 31 httpRequest); 32 this.helper.appendDebug(info, zuulResponse.getStatusLine().getStatusCode(), 33 revertHeaders(zuulResponse.getAllHeaders())); 34 return zuulResponse; 35 } 36 finally { 37 // When HttpClient instance is no longer needed, 38 // shut down the connection manager to ensure 39 // immediate deallocation of all system resources 40 // httpclient.getConnectionManager().shutdown(); 41 } 42} 43 44private CloseableHttpResponse forwardRequest(CloseableHttpClient httpclient, 45 HttpHost httpHost, HttpRequest httpRequest) throws IOException { 46 return httpclient.execute(httpHost, httpRequest); 47} 复制代码
1private void setResponse(HttpResponse response) throws IOException { 2 RequestContext.getCurrentContext().set("zuulResponse", response); 3 this.helper.setResponse(response.getStatusLine().getStatusCode(), 4 response.getEntity() == null ? null : response.getEntity().getContent(), 5 revertHeaders(response.getAllHeaders())); 6} 7// ProxyRequestHelper 8public void setResponse(int status, InputStream entity, 9 MultiValueMap<String, String> headers) throws IOException { 10 RequestContext context = RequestContext.getCurrentContext(); 11 context.setResponseStatusCode(status); 12 if (entity != null) { 13 context.setResponseDataStream(entity); 14 } 15 16 boolean isOriginResponseGzipped = false; 17 for (Entry<String, List<String>> header : headers.entrySet()) { 18 String name = header.getKey(); 19 for (String value : header.getValue()) { 20 context.addOriginResponseHeader(name, value); 21 22 if (name.equalsIgnoreCase(HttpHeaders.CONTENT_ENCODING) 23 && HTTPRequestUtils.getInstance().isGzipped(value)) { 24 isOriginResponseGzipped = true; 25 } 26 if (name.equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) { 27 context.setOriginContentLength(value); 28 } 29 if (isIncludedHeader(name)) { 30 context.addZuulResponseHeader(name, value); 31 } 32 } 33 } 34 context.setResponseGZipped(isOriginResponseGzipped); 35} 复制代码
该过滤器的作用是将请求上下文里的响应信息写入到响应内容,返回给请求客户端。
1@Override 2public Object run() { 3 try { 4 addResponseHeaders(); 5 writeResponse(); 6 } 7 catch (Exception ex) { 8 ReflectionUtils.rethrowRuntimeException(ex); 9 } 10 return null; 11} 复制代码
1private void addResponseHeaders() { 2 RequestContext context = RequestContext.getCurrentContext(); 3 HttpServletResponse servletResponse = context.getResponse(); 4 if (this.zuulProperties.isIncludeDebugHeader()) { 5 @SuppressWarnings("unchecked") 6 List<String> rd = (List<String>) context.get(ROUTING_DEBUG_KEY); 7 if (rd != null) { 8 StringBuilder debugHeader = new StringBuilder(); 9 for (String it : rd) { 10 debugHeader.append("[[[" + it + "]]]"); 11 } 12 servletResponse.addHeader(X_ZUUL_DEBUG_HEADER, debugHeader.toString()); 13 } 14 } 15 List<Pair<String, String>> zuulResponseHeaders = context.getZuulResponseHeaders(); 16 if (zuulResponseHeaders != null) { 17 for (Pair<String, String> it : zuulResponseHeaders) { 18 servletResponse.addHeader(it.first(), it.second()); 19 } 20 } 21 if (includeContentLengthHeader(context)) { 22 Long contentLength = context.getOriginContentLength(); 23 if(useServlet31) { 24 servletResponse.setContentLengthLong(contentLength); 25 } else { 26 //Try and set some kind of content length if we can safely convert the Long to an int 27 if (isLongSafe(contentLength)) { 28 servletResponse.setContentLength(contentLength.intValue()); 29 } 30 } 31 } 32} 复制代码
1private void writeResponse() throws Exception { 2 RequestContext context = RequestContext.getCurrentContext(); 3 // there is no body to send 4 if (context.getResponseBody() == null 5 && context.getResponseDataStream() == null) { 6 return; 7 } 8 HttpServletResponse servletResponse = context.getResponse(); 9 if (servletResponse.getCharacterEncoding() == null) { // only set if not set 10 servletResponse.setCharacterEncoding("UTF-8"); 11 } 12 13 OutputStream outStream = servletResponse.getOutputStream(); 14 InputStream is = null; 15 try { 16 if (context.getResponseBody() != null) { 17 String body = context.getResponseBody(); 18 is = new ByteArrayInputStream( 19 body.getBytes(servletResponse.getCharacterEncoding())); 20 } 21 else { 22 is = context.getResponseDataStream(); 23 if (is!=null && context.getResponseGZipped()) { 24 // if origin response is gzipped, and client has not requested gzip, 25 // decompress stream before sending to client 26 // else, stream gzip directly to client 27 if (isGzipRequested(context)) { 28 servletResponse.setHeader(ZuulHeaders.CONTENT_ENCODING, "gzip"); 29 } 30 else { 31 is = handleGzipStream(is); 32 } 33 } 34 } 35 36 if (is!=null) { 37 writeResponse(is, outStream); 38 } 39 } 40 finally { 41 if (is != null) { 42 try { 43 is.close(); 44 } 45 catch (Exception ex) { 46 log.warn("Error while closing upstream input stream", ex); 47 } 48 } 49 50 try { 51 Object zuulResponse = context.get("zuulResponse"); 52 if (zuulResponse instanceof Closeable) { 53 ((Closeable) zuulResponse).close(); 54 } 55 outStream.flush(); 56 // The container will close the stream for us 57 } 58 catch (IOException ex) { 59 log.warn("Error while sending response to client: " + ex.getMessage()); 60 } 61 } 62} 63 64private void writeResponse(InputStream zin, OutputStream out) throws Exception { 65 byte[] bytes = buffers.get(); 66 int bytesRead = -1; 67 while ((bytesRead = zin.read(bytes)) != -1) { 68 out.write(bytes, 0, bytesRead); 69 } 70} 复制代码
通过上述分析,简单URL请求,Zuul做的哪些事情,就一目了然了:
不知道大家有没有注意到,从始至终有一个类一直贯穿整个处理的过程。谁?Filter?不,是RequestContext。ZuulServlet处理过程就是靠它在前置、路由、后置各过滤器间实现信息传递,由此可见它的重要性。接下来,我们就走进RequestContext,揭开这位“信使”的神秘“面纱”。
既然RequestContext用来传递信息,那么它的正确性就必须得保证。在多线程情况下,如何做到这一点呢?请往下看:
1public class RequestContext extends ConcurrentHashMap<String, Object> { 2 3 private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class); 4 5 protected static Class<? extends RequestContext> contextClass = RequestContext.class; 6 7 private static RequestContext testContext = null; 8 9 protected static final ThreadLocal<? extends RequestContext> threadLocal = new ThreadLocal<RequestContext>() { 10 // 覆盖ThreadLocal里initialValue方法,设置ThreadLocal的值 11 @Override 12 protected RequestContext initialValue() { 13 try { 14 return contextClass.newInstance(); 15 } catch (Throwable e) { 16 throw new RuntimeException(e); 17 } 18 } 19 }; 20} 复制代码
是不是明白了一些,对,就是这两大并发神器:ConcurrentHashMap和ThreadLocal。前面提到的几个过滤器里,使用RequestContext时,都是如下写法:
1RequestContext context = RequestContext.getCurrentContext(); 2context.set***(); 3context.get***(); 复制代码
getCurrentContext()就是入口:
1public static RequestContext getCurrentContext() { 2 if (testContext != null) return testContext; 3 4 RequestContext context = threadLocal.get(); 5 return context; 6} 复制代码
从ThreadLocal里获取RequestContext:
1public T get() { 2 Thread t = Thread.currentThread(); 3 ThreadLocalMap map = getMap(t); 4 if (map != null) { 5 ThreadLocalMap.Entry e = map.getEntry(this); 6 if (e != null) { 7 @SuppressWarnings("unchecked") 8 T result = (T)e.value; 9 return result; 10 } 11 } 12 return setInitialValue(); 13} 14 15private T setInitialValue() { 16 // 覆盖该方法,设置ThreadLocal的值 17 T value = initialValue(); 18 Thread t = Thread.currentThread(); 19 ThreadLocalMap map = getMap(t); 20 if (map != null) 21 map.set(this, value); 22 else 23 createMap(t, value); 24 return value; 25} 复制代码
通过覆盖ThreadLocal的initialValue方法,首次调用时设置值,后续调用判断当前线程已经绑定了值,则直接返回。这样就保证了不同的请求都有自己的RequestContext。因为RequestContext继承自ConcurrentHashMap,是一个线程安全的map,从而保证了并发下的正确性。
补充:这里只是简单讲解了ThreadLocal和ConcurrentHashMap,不在本文中详细展开,后续会写文章去做深入分析。
到这里,我们就讲完了一个简单URL请求在Zuul中整个处理过程。写作过程中,笔者一直在思考,如何行文能让大家更好的理解。虽然修改了很多次,但是还是觉得不够完美,只能一边写一边总结一边改进。希望大家多多留言,给出意见和建议,那笔者真是感激不尽!!!最后,感谢大家的支持,祝新年快乐,祁琛,2019年1月3日。