上一篇文章 OkHttp 3.14.x 源码解析-执行流程 我们分析了OKHttp的整体执行流程,也从中提到了OKHttp会调用各拦截器来获取响应数据,但是并没有展开来讲,所以这篇文章我们将来详细分析各个拦截器的职责。
从上篇文章我们知道OKHttp有七大拦截器,按添加顺序为:
其中第1个应用拦截器和第6个网络拦截器为自定义配置,在这里我们将假设用户没有自定义配置拦截器,故不分析这两个拦截器,下面将按顺序详细分析其它五个拦截器。
由于这里我们假设没有加入自定义拦截器,所以RetryAndFollowUpInterceptor将成为责任链中最先被调用的拦截器,这个拦截器的主要作用就是负责失败重试以及重定向,我们先看看RetryAndFollowUpInterceptor的intercept方法
RetryAndFollowUpInterceptor#intercept
@Override public Response intercept(Chain chain) throws IOException { //当前请求 Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; //重点关注这个Transmitter类 Transmitter transmitter = realChain.transmitter(); //重定向的次数 int followUpCount = 0; //记录上一个请求 Response priorResponse = null; while (true) {//启动循环 //会创建ExchangeFinder对象,在连接池中会提到 transmitter.prepareToConnect(request); if (transmitter.isCanceled()) { throw new IOException("Canceled"); } Response response; boolean success = false; try { //执行责任链下一结点的proceed方法,其实就是执行BridgeInterceptor的intercept response = realChain.proceed(request, transmitter, null); success = true; } catch (RouteException e) { //路由异常,尝试恢复,如果再次失败则抛出异常 if (!recover(e.getLastConnectException(), transmitter, false, request)) { throw e.getFirstConnectException(); } //继续重试 continue; } catch (IOException e) { //重试与服务器进行连接 boolean requestSendStarted = !(e instanceof ConnectionShutdownException); //连接关闭异常 if (!recover(e, transmitter, requestSendStarted, request)) throw e; //继续重试 continue; } finally { // The network call threw an exception. Release any resources. if (!success) { transmitter.exchangeDoneDueToException(); } } //执行到这说明没有出现异常 //priorResponse为前一个重试得到的Response if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } Exchange exchange = Internal.instance.exchange(response); Route route = exchange != null ? exchange.connection().route() : null; //根据得到的响应进行处理,可能会增加一些认证信息,重定向或处理超时请求 //如果该请求无法继续被处理或出现的错误不需要继续处理,会返回null Request followUp = followUpRequest(response, route); //当请求不可重定向时,返回response if (followUp == null) { if (exchange != null && exchange.isDuplex()) { transmitter.timeoutEarlyExit(); } return response; } RequestBody followUpBody = followUp.body(); if (followUpBody != null && followUpBody.isOneShot()) { return response; } //关闭资源 closeQuietly(response.body()); if (transmitter.hasExchange()) { exchange.detachWithViolence(); } //达到了重定向的最大次数,就抛出一个异常 if (++followUpCount > MAX_FOLLOW_UPS) { throw new ProtocolException("Too many follow-up requests: " + followUpCount); } //得到处理之后的Request,用来沿着拦截器链继续请求 request = followUp; //由priorResponse持有 priorResponse = response; } } 复制代码
上面的流程可以概括为:
上面需要注意的是Transmitter这个类,这个类相当于管理类,维护了服务器连接,并发流和请求之间的关系,在这个拦截器中调用了它来准备连接,其实只是初始化了ConnectInterceptor拦截器中所需要用到的ExchangeFinder对象,实际上并没有用到这些类,也许你又有疑问了,那么这个Transmitter又是在哪里实例化的呢?
这个就涉及到了上篇文章所讲的OkHttp的执行流程了!你可以不断的往上追踪,然后最终会在构造Call对象这一流程见证它的出生!
RealCall#newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { RealCall call = new RealCall(client, originalRequest, forWebSocket); //以请求和Realcall对象为参数构造Transmitter对象 call.transmitter = new Transmitter(client, call); return call; } 复制代码
然后在Transmitter构造时还会伴随着连接池connectionPool的构造,在这里我们不必深究这些类的具体用途。因为当它们传递到连接拦截器ConnectInterceptor时才真正发挥出它们的作用!
Bridge在计算机网络中的意思是网桥,但是在OkHttp中这个拦截器可不像计算机网络中的网桥一样工作在数据链路层,在OKhttp中这个拦截器的主要作用是桥接应用层和网络层,添加必要的头。简单的来说,就是对请求进行包装,并将服务器响应转换成用户友好的响应。
BridgeInterceptor#intercept
@Override public Response intercept(Chain chain) throws IOException { //当前请求request Request userRequest = chain.request(); //从网络请求中获取网络请求构建者 Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); //网络请求前的头处理 if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { //添加请求头Content-Type requestBuilder.header("Content-Type", contentType.toString()); } long contentLength = body.contentLength(); if (contentLength != -1) { //添加请求头Content-Length requestBuilder.header("Content-Length", Long.toString(contentLength)); //移除请求头Transfer-Encoding requestBuilder.removeHeader("Transfer-Encoding"); } else { requestBuilder.header("Transfer-Encoding", "chunked"); requestBuilder.removeHeader("Content-Length"); } } if (userRequest.header("Host") == null) { //添加请求头Host requestBuilder.header("Host", hostHeader(userRequest.url(), false)); } if (userRequest.header("Connection") == null) { //添加请求头Connection requestBuilder.header("Connection", "Keep-Alive"); } boolean transparentGzip = false; //判断是否需要gzip压缩 if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; //添加请求头Accept-Encoding,进行gzip压缩 requestBuilder.header("Accept-Encoding", "gzip"); } //加载cookie List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { //添加请求头Cookie requestBuilder.header("Cookie", cookieHeader(cookies)); } //添加请求头User-Agent if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent()); } //执行网络请求,执行缓存拦截器CacheInterceptor的intercept方法 Response networkResponse = chain.proceed(requestBuilder.build()); //保存cookie,如果没有自定义配置cookie,不会解析 HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); //3.从网络响应中获取响应构建者 Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); //判断服务器是否支持gzip压缩格式,然后交给okio处理 if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { //gzip解压缩,GzipSource是再okio的类 GzipSource responseBody = new GzipSource(networkResponse.body().source()); //移除响应header的Content-Encoding和Content-Length Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type"); //处理完成,交给okio生成一个新的response responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody))); } //返回最终网络响应 return responseBuilder.build(); } 复制代码
这个拦截器的主要工作为:
这里需要注意的是Gzip解压缩和生成新的网络响应时都会交给okio这个库来处理的。这个okio你可以简单的当成是Java中IO操作的升级版,它能够更加方便,快速的访问和处理数据。如果有想要进一步了解的可以看看这篇文章 OKHttp源码解析(五)--OKIO简介及FileSystem ,感觉这个博主写的OkHttp源码系列还是挺不错的!
按顺序我们本来应该要介绍缓存拦截器CacheInterceptor的intercept方法了,但是在介绍缓存拦截器之前很有必要先了解一下Http缓存机制,因为缓存拦截器的缓存策略与之密切相关。
如果想要深入了解Http缓存机制,可参考浏览器缓存机制这篇文章,讲的不错而且通俗易懂!
其中Expires,Last-Modified-Since,If-Modified-Since是属于HTTP1.0
Cache-control,Etag,If-None-Match属于HTTP1.1
所以一开始都是先检查HTTP1.1的字段,如果有HTTP1.1的字段就不再检查HTTP1.0的字段
另外Last-Modified-Since和If-Modified-Since,Etag和If-None-Match是两两配对的
这一张是网上找到的非常经典的Http缓存机制图:
整体流程如下:
讲完HTTP的缓存机制后,让我们来看看HTTP缓存机制在OkHttp缓存策略的体现
OKHttp的缓存策略是由缓存策略类CacheStrategy的 networkRequest 和 cacheResponse 的值来决定的
策略 | networkRequest | cacheResponse | 结果 |
---|---|---|---|
网络请求策略 | not-null | null | 需要进行网络请求 |
缓存策略 | null | not-null | 不进行网络请求,直接使用缓存 |
同时使用网络请求策略和缓存策略 | not-null | not-null | 响应头包含Etag或Last-Modified,需要网络请求进行验证是否可以使用缓存 |
不使用网络请求和缓存策略 | null | null | 表明不进行网络请求,并且缓存不存在或者过期,此时一定会返回503错误 |
缓存策略的主要实现是在于CacheStrategy这个策略类,由于CacheStrategy使用了工厂模式的设计模式,会提供get方法给外部来获取到当前策略类的实例,所以我们直接看CacheStrategy内部类Factory的get方法
CacheStrategy.Factory#get
public CacheStrategy get() { //获取当前的缓存策略 CacheStrategy candidate = getCandidate(); //如果网络请求不为null当时请求里面的cacheControl设置的为只用缓存 if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) { //返回网络请求和缓存都为null的策略 return new CacheStrategy(null, null); } return candidate; } 复制代码
在get方法会首先调用getCandidate来获取当前的缓存策略,我们直接看这个方法
CacheStrategy.Factory#getCandidate
private CacheStrategy getCandidate() { if (cacheResponse == null) { //没有缓存,使用网络请求的策略 return new CacheStrategy(request, null); } if (request.isHttps() && cacheResponse.handshake() == null) { //https,但是丢失了握手,使用网络请求的策略 return new CacheStrategy(request, null); } if (!isCacheable(cacheResponse, request)) { //响应不能缓存,使用网络请求的策略 return new CacheStrategy(request, null); } //获取请求头中的cacheControl CacheControl requestCaching = request.cacheControl(); //请求头设置了不缓存或者请求头包含if-modified-since或if-none-match //包含if-modified-since或者if-none-match意味着本地缓存过期 //需要服务器验证本地缓存是否还能继续使用 if (requestCaching.noCache() || hasConditions(request)) { //使用网络请求的策略 return new CacheStrategy(request, null); } //获取缓存中的cacheControl CacheControl responseCaching = cacheResponse.cacheControl(); //获取相应的年龄 long ageMillis = cacheResponseAge(); //获取上次响应刷新的时间 long freshMillis = computeFreshnessLifetime(); //如果请求里面有最大持久时间要求,则选择刷新时间和最大持久时间的较小值 if (requestCaching.maxAgeSeconds() != -1) { freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds())); } long minFreshMillis = 0; //如果请求里面有最小刷新时间的限制 if (requestCaching.minFreshSeconds() != -1) { //更新最小刷新时间的限制 minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds()); } //最大验证时间 long maxStaleMillis = 0; //如果响应(服务器)那边不是必须验证并且请求里面有最大验证时间 if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) { //更新最大验证时间 maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds()); } //可缓存 //并且持续时间+最短刷新时间<上次刷新时间+最大验证时间 //(意味着虽然过期,但可用,只是会在响应头添加warning) if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { Response.Builder builder = cacheResponse.newBuilder(); if (ageMillis + minFreshMillis >= freshMillis) { builder.addHeader("Warning", "110 HttpURLConnection /"Response is stale/""); } long oneDayMillis = 24 * 60 * 60 * 1000L; if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { builder.addHeader("Warning", "113 HttpURLConnection /"Heuristic expiration/""); } //使用缓存的策略 return new CacheStrategy(null, builder.build()); } //流程走到这,说明缓存已经过期了 //需要添加请求头:If-Modified-Since或者If-None-Match //If-None-Match与etag配合使用 //If-Modified-Since与LastModified配合使用 //If-None-Match和If-Modified-Since为请求头 //If-Modified-Since或者If-None-Match是用来与服务器的资源进行对比,看看资源是否改变 //如果匹配成功,表示本地资源虽然过期但是可用 String conditionName; String conditionValue; if (etag != null) { conditionName = "If-None-Match"; conditionValue = etag; } else if (lastModified != null) { conditionName = "If-Modified-Since"; conditionValue = lastModifiedString; } else if (servedDate != null) { conditionName = "If-Modified-Since"; conditionValue = servedDateString; } else { //使用网络请求策略 return new CacheStrategy(request, null); // No condition! Make a regular request. } Headers.Builder conditionalRequestHeaders = request.headers().newBuilder(); //添加请求头 Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue); Request conditionalRequest = request.newBuilder() .headers(conditionalRequestHeaders.build()) .build(); //使用网络请求和缓存request的策略 return new CacheStrategy(conditionalRequest, cacheResponse); } 复制代码
可以发现在OkHttp中的缓存策略其实与HTTP缓存机制类似。大概分为这四种情况:
持续时间+最短刷新时间<上次刷新时间+最大验证时间,意味着缓存虽然过期,但是还是有效的,只是会在响应头添加warning
响应头包含Last-Modified或Etag字段,然后会在请求头添加与之配对的If-Modified-Since或If-None-Match字段,然后使用网络请求策略和缓存策略
网络请求策略不为null,但是请求头的cache-control设置了只用缓存的情况
终于要分析CacheInterceptor缓存拦截器了,有了前面的知识储备,这个拦截器的分析就会比较得心应手了,我们直接看这个拦截器的intercept方法来看看缓存拦截器的主要流程。
CacheInterceptor#intercept
@Override public Response intercept(Chain chain) throws IOException { //如果存在缓存,则从缓存中取出,有可能为null Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; long now = System.currentTimeMillis(); //获取缓存策略对象 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); //策略中的请求 Request networkRequest = strategy.networkRequest; //策略中的缓存响应 Response cacheResponse = strategy.cacheResponse; //监测缓存 if (cache != null) { cache.trackResponse(strategy); } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } //不使用网络请求和缓存策略,拦截该请求,因为已经没必要交给下一级(网络拦截器)执行 if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) //返回码为504 .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } //使用缓存策略,拦截该请求 //从缓存中拿结果,同时也没必要交给下一级(网络拦截器)执行 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } //网络可用 Response networkResponse = null; try { //交付给下一级执行,此时chain为网络拦截器ConnectInterceptor networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } //当拦截器链的末尾执行后返回响应数据后,如果使用了缓存就把响应数据更新到缓存里 if (cacheResponse != null) { //服务器返回的结果是304,证明缓存有效,则合并网络响应和缓存结果 if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); cache.trackConditionalCacheHit(); //更新缓存 cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } //走到这表示此时缓存里是没有数据的 Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); //如果在okHttpClient构建时设置了磁盘缓存,就把请求的结果放进缓存 if (cache != null) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { //缓存到本地 CacheRequest cacheRequest = cache.put(response); return cacheWritingResponse(cacheRequest, response); } if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } } return response; } 复制代码
从上面发现,一开始我们是通过全局变量cache来获取缓存,它是InternalCache类型的变量,而InternalCache是一个接口,在OkHttp只有一个实现类Cache。在Cache内部,使用了DiskLruCache算法来将数据存储到磁盘中,这个算法就不多说了,有兴趣的可以看郭神的博客 Android DiskLruCache完全解析,硬盘缓存的最佳方案
在这里我们只需要知道Cache类提供了磁盘缓存的增删改查的基本功能,所以我们可以通过InternalCache的get方法得到磁盘缓存中的数据。而OkHttp默认是没有设置磁盘缓存的,可以通过构建OkHttpClient时进行配置。
当获取到磁盘缓存的数据后就调用了CacheStrategy工厂类的get方法来获取OkHttp的缓存策略,返回的缓存策略有四种,即缓存策略原理中提到的四种情况,然后就根据这四种情况做相应的处理。总体流程图如下:
这么一看OkHttp的缓存机制其实就是基于HTTP缓存机制与Cache缓存的结合体,到这里缓存拦截器的整体流程也就梳理了一遍!
连接拦截器,顾名思义,是用来连接服务器的,但并没有发送请求到服务器。我们直接看ConnectInterceptor的intercept方法.
ConnectInterceptor#intercept
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Transmitter transmitter = realChain.transmitter(); //我们需要网络来满足这个请求,为了验证是否为GET请求 boolean doExtensiveHealthChecks = !request.method().equals("GET"); //重点关注 Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); //调用CallServerInterceptor拦截器来从服务器中获取响应信息,传递连接拦截器中的exchange和transmitter return realChain.proceed(request, transmitter, exchange); } 复制代码
看上面的代码,估计这时候你应该乐坏了,心里可能偷偷乐着终于碰到这么少代码量的拦截器了。但是在这里只能告诉你连接拦截器可是个表里不一的拦截器,表面上看起来很少,实际上大部分的功能都封装起来了。但是我们不得不承认这个表里不一的拦截器是OkHttp效率和框架的核心。这个拦截器的主要任务就是:
从上面可以发现,在这里我们调用了transmitter的newExchange方法,而transmitter我们早在第一个拦截器重试拦截器RetryAndFollowUpInterceptor时就提到过它了,我们直接看newExchange方法
Transmitter#newExchange
//返回一个新的交换,封装了新的请求和响应 Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) { synchronized (connectionPool) { ..... //重点关注 ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks); Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec); synchronized (connectionPool) { this.exchange = result; this.exchangeRequestDone = false; this.exchangeResponseDone = false; return result; } } 复制代码
在这个方法中,会通过exchangeFinder的find方法找到一个可用的连接,而exchangeFinder就是在RetryAndFollowUpInterceptor拦截器中通过调用Transmitter的prepareToConnect方法创建的。
Transmitter#prepareToConnect
public void prepareToConnect(Request request) { //复用连接 if (this.request != null) { if (sameConnection(this.request.url(), request.url()) && exchangeFinder.hasRouteToTry()) { return; // Already ready. } if (exchange != null) throw new IllegalStateException(); if (exchangeFinder != null) { maybeReleaseConnection(null, true); exchangeFinder = null; } } this.request = request; //创建ExchangeFinder对象,在连接池中会使用到 this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()), call, eventListener); } 复制代码
我们继续看下去,接下来会调用exchangeFinder的find方法找到一个连接
Exchange
public ExchangeCodec find( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { ....... try { //重点关注,找到一个可用的连接(如果连接不可用,这个过程会一直持续) RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); return resultConnection.newCodec(client, chain); } catch (RouteException e) { trackFailure(); throw e; } catch (IOException e) { trackFailure(); throw new RouteException(e); } } /** * 不断循环,直到找到一个健康可用的连接 */ private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) {//死循环 //找到一个连接,重点关注 RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // 如果是一个新的连接,直接返回 synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } } //判断是否可用 if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges();//不可用,则移除连接池 continue;//持续这个过程 } return candidate; } } /** * 返回一个可用的连接 */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); hasStreamFailure = false; // This is a fresh attempt. // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new exchanges. releasedConnection = transmitter.connection; //如果连接不能创建Stream,则释放资源,返回待关闭的close socket toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null; //证明连接可用 if (transmitter.connection != null) { //存在可使用的已分配连接 result = transmitter.connection; //将releasedConnection置为null,说明该连接是有效的 releasedConnection = null; } //没有可以使用的连接,就去连接池中寻找 if (result == null) { //从连接池获取连接 if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } else if (nextRouteToTry != null) { selectedRoute = nextRouteToTry; nextRouteToTry = null; } else if (retryCurrentRoute()) { selectedRoute = transmitter.connection.route(); } } } closeQuietly(toClose); //回调 if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { //找到了一个已分配或者连接池中的连接,过程结束,返回该连接 return result; } //否则,我们需要一个路由信息,这是个阻塞操作 boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } List<Route> routes = null; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); if (newRouteSelection) { //通过更加全面的路由信息,再次从连接池中获取连接 routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } } //如果还是没找到,则生成新的连接 if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } //创建一个连接 result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; } } //如果连接是从连接池中找到,则说明是可复用的。不是新生成的 //如果新生成的连接则需要连接服务器才能使用 if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } //走到这说明是新生成的连接 //tcp和tls握手,阻塞操作,连接server result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); //将路由信息添加到routeDatabase的白名单中,证明该路由是可以连接到指定服务器的 connectionPool.routeDatabase.connected(result.route()); Socket socket = null; synchronized (connectionPool) { connectingConnection = null; //连接合并的最后一次尝试,只有我们尝试多次时才会发生 //同一主机的并发连接 if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { //关闭创建的连接并返回连接池中的连接 result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; //有可能获得一个不健康的连接,如果是这种情况,将重试刚刚成功连接的路由 nextRouteToTry = selectedRoute; } else { //将新生成的连接放入连接池中 connectionPool.put(result); //引用计数加一 transmitter.acquireConnectionNoEvents(result); } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; } 复制代码
上面的方法中,我们可以发现在这个拦截中会不断的循环来找到一个可用的连接。可用连接的优先级为: 当前连接 > 连接池中的连接 > 新的连接 。大致的流程如下:
从流程中我们也可用发现连接复用可以省去TCP和TLS握手的过程,从而提高网络访问的效率。而能做到这点少不了连接复用池的功劳,让我们来见识一下这个连接复用池的是如何来管理连接的!
在OkHttp中使用了类似引用计数的方式来跟踪Socket流的调用,这里的计数对象是Transmitter类,而Transmitter其实就是连接管理类
Transmitter
void acquireConnectionNoEvents(RealConnection connection) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); this.connection = connection; //引用计数加1 connection.transmitters.add(new TransmitterReference(this, callStackTrace)); } @Nullable Socket releaseConnectionNoEvents() { assert (Thread.holdsLock(connectionPool)); int index = -1; for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) { Reference<Transmitter> reference = this.connection.transmitters.get(i); if (reference.get() == this) { index = i; break; } } if (index == -1) throw new IllegalStateException(); RealConnection released = this.connection; //引用计数减一 released.transmitters.remove(index); this.connection = null; if (released.transmitters.isEmpty()) { released.idleAtNanos = System.nanoTime(); if (connectionPool.connectionBecameIdle(released)) { return released.socket(); } } return null; } 复制代码
上面计数加一和计数减一的操作其实是在改变List<Reference>列表的大小,而跟踪下去会发现List<Reference>的维护类是RealConnection,在这里我们只需要记住RealConnection是Socket物理连接的包装。List中的Transmitter弱引用数量就是socket被引用的计数,当计数为0时表示此连接是空闲的。
RealConnectionPool
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true)); /** The maximum number of idle connections for each address. */ //空闲的socket最大连接数 private final int maxIdleConnections; //socket的keepAlive时间 private final long keepAliveDurationNs; //双向队列,里面维护了RealConnection也就是socket物理连接的包装 private final Deque<RealConnection> connections = new ArrayDeque<>(); //记录连接失败的route的黑名单,当连接失败的时候就会把失败的线路加进去 final RouteDatabase routeDatabase = new RouteDatabase(); boolean cleanupRunning; //是否正在清理 复制代码
这里我们需要看ConnectionPool类来找到默认的空闲socket最大连接数和keepAlive时间
ConnectionPool
public final class ConnectionPool { final RealConnectionPool delegate; //默认的空闲的socket最大连接数为5个,socket的keepAlive的时间为5分钟 public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); } public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit); } ..... } 复制代码
通过ConnectionPool类我们可以发现默认空闲的socket最大连接数为5,socket的保活时间为5分钟,并且在构造ConnectionPool对象时实际构造的是RealConnectionPool对象。
RealConnectionPool#put
void put(RealConnection connection) { assert (Thread.holdsLock(this)); if (!cleanupRunning) { cleanupRunning = true; //使用线程池执行清理任务 executor.execute(cleanupRunnable); } //将连接添加到双端队列中 connections.add(connection); } 复制代码
放入连接的工作有两个:
在放入连接时我们会执行清理连接的操作,会调用线程池执行cleanupRunnable的任务,让我们先看看这个任务
RealConnectionPool
//线程不断调用cleanup来进行清理,并返回下次需要清理的间隔时间 private final Runnable cleanupRunnable = () -> { while (true) { //清理连接,并返回下次需要清理的间隔时间 long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (RealConnectionPool.this) { try { RealConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } }; 复制代码
可以发现一旦清理任务开始执行后,就会每隔指定的间隔时间进行清理连接。我们来看看cleanup方法
RealConnectionPool#cleanup
long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; // Find either a connection to evict, or the time that the next eviction is due. synchronized (this) { //遍历连接 for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); //查询此连接的transmitter的引用数量 //如果引用数量大于0,则使用数量inUseConnectionCount加1 //否则闲置数量idleConnectionCount加1 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; } idleConnectionCount++; //寻找空闲最久的那个连接 long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } //如果空闲连接的空闲时间超过5分钟,或者空闲连接数超过5个,则移除空闲最久的连接 if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0) { //如果空闲连接数大于0,则返回此链接即将到期的时间 return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0) { //如果所有连接都在使用中,5分钟后再清理 return keepAliveDurationNs; } else { //如果没有任何连接则跳出循环 cleanupRunning = false; return -1; } } closeQuietly(longestIdleConnection.socket()); // Cleanup again immediately. return 0; } 复制代码
上面的清理任务的流程简单概括成下面几点:
在清理连接时通过调用pruneAndGetAllocationCount方法来判断空闲连接和活跃连接,因此让我们看看其内部实现
RealConnectionPool#pruneAndGetAllocationCount
private int pruneAndGetAllocationCount(RealConnection connection, long now) { List<Reference<Transmitter>> references = connection.transmitters; //遍历弱引用列表 for (int i = 0; i < references.size(); ) { Reference<Transmitter> reference = references.get(i); //如果Transmitter被使用,则跳过 if (reference.get() != null) { i++; continue; } // We've discovered a leaked transmitter. This is an application bug. TransmitterReference transmitterRef = (TransmitterReference) reference; String message = "A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace); //如果Transmitter没有被使用则移除引用 references.remove(i); connection.noNewExchanges = true; //如果列表为空说明此连接没有被引用了,则返回0,表示此连接为空闲连接 if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return 0; } } //否则返回非0的数,表示此连接为活跃连接 return references.size(); } 复制代码
这里主要是遍历连接的弱引用列表,根据列表中的Transmitter是否为null来移除对应的弱引用,如果最后弱引用列表为0表示该连接为空闲连接,否则则表示该连接为活跃连接,并返回该活跃连接的引用计数。
服务请求拦截器CallServerInterceptor是最后一个拦截器,其主要作用就是向服务器请求并获取数据。我们直接看CallServerInterceptor的intercept方法
CallServerInterceptor#intercept
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Exchange exchange = realChain.exchange(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); //写入请求头 exchange.writeRequestHeaders(request); boolean responseHeadersStarted = false; Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); } //写入请求体 if (responseBuilder == null) { if (request.body().isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest(); BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, true)); //写入请求体 request.body().writeTo(bufferedRequestBody); } else { // Write the request body if the "Expect: 100-continue" expectation was met. BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection(); } } } else { exchange.noRequestBody(); } if (request.body() == null || !request.body().isDuplex()) { exchange.finishRequest(); } if (!responseHeadersStarted) { exchange.responseHeadersStart(); } //读取响应头 if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); //读取响应体 int code = response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } exchange.responseHeadersEnd(response); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } //返回最终响应 return response; } 复制代码
可以发现这个方法并没有再继续调用责任链的处理方法,而是直接返回了最终的响应给上一级的拦截器。
到这里,我们就把责任链的五大拦截器分析完毕了,在这五大拦截器中要重点关注缓存拦截器和连接拦截器,这两个拦截器有很多知识值得我们去学习,比如缓存拦截器中的缓存机制和连接拦截器的连接复用等。通过对拦截器的分析,也对责任链模式有了更深的理解,每一个拦截器都对应一个index不同的结点(RealInterceptorChain) ,每个结点(RealInterceptorChain)的proceed方法会产生下一个结点(RealInterceptorChain),然后调用当前结点对应拦截器的intercept方法,并将下一结点传进去。故如果某个拦截器调用了realChain的proceed方法,实际会调用下一结点对应拦截器的intercept方法,直到拦截器列表迭代完成。通过这种责任链的方法,每个拦截器对请求进行不同的处理,从而得到最终的响应数据。
参考博客: