上文简单概括了下 okHttp3
请求的整体流程, 本篇主要看下 ConnectInterceptor
的主要工作内容
已知拦截器链都是从各拦截器的 intercept
方法开始调用, 那么我们从 ConnectInterceptor
的 intercept
代码开始看起
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); // 从RetryAndFollowUpInterceptor获取 StreamAllocation streamAllocation = realChain.streamAllocation(); // 判断请求是不是GET方法, 不是的情况下,需要进行有效监测 boolean doExtensiveHealthChecks = !request.method().equals("GET"); // 新建HttpCodec HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); }
我们可以看到, 获取连接的拦截器内, 主要只有三个步骤:
HttpCodec
streamAllocation
获取连接 httpCodec
和 connection
作为参数带到下个拦截器的调用方法中
这里 HttpCodec
我们可以大概了解下, 它是个抽象类, 有 Http1Codec
和 Http2Codec
实现它, 分别根据Http/1.1,和Http/2做针对请求响应不同的编解码处理.
而 StreamAllocation
对象是在 RetryAndFollowUpInterceptor
中新建获取到的, 它做了 Streams
, Connections
, Calls
的关系管理.这里要注意的是 Streams
表示的是逻辑层面的连接(流), 每个连接( Connection
)都定义了可以并发请求的连接( Streams
), HTTP/1.x每次只能携带一次, HTTP/2可以携带多次.
回头我们看下 streamAllocation.newStream
做了什么
public HttpCodec newStream( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { int connectTimeout = chain.connectTimeoutMillis(); int readTimeout = chain.readTimeoutMillis(); int writeTimeout = chain.writeTimeoutMillis(); int pingIntervalMillis = client.pingIntervalMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { // 遍历查找健康可用的连接 RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); // HttpCodec初始化 HttpCodec resultCodec = resultConnection.newCodec(client, chain, this); synchronized (connectionPool) { codec = resultCodec; return resultCodec; } } catch (IOException e) { throw new RouteException(e); } }
可以看到它这里也就做了三个动作
HttpCodec
继续往下看:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) { // 查找连接, 更加倾向连接池内已存在的连接, 否则会重新构建 RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // If this is a brand new connection, we can skip the extensive health checks. // 如果是全新的连接, 则跳过可用检查, 直接返回 synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } } // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. // 判断是否是可用连接 if (!candidate.isHealthy(doExtensiveHealthChecks)) { // 禁止新流创建 noNewStreams(); continue; } return candidate; } }
继续看 findConnection
方法
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { // 从连接池中找到连接 boolean foundPooledConnection = false; // 实际需要返回的连接 RealConnection result = null; // 对应找到的路由 Route selectedRoute = null; // 对应可释放的连接 Connection releasedConnection; // 需要关闭的socket Socket toClose; synchronized (connectionPool) { // 异常判断 // 判断是否连接已经被释放, codec是否为空, 请求是否被取消 if (released) throw new IllegalStateException("released"); if (codec != null) throw new IllegalStateException("codec != null"); if (canceled) throw new IOException("Canceled"); // 尝试寻找已经存在的连接来使用. // 但是需要注意的是, 已存在的连接可能已经无法再创建新的流 // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. releasedConnection = this.connection; // toClose如果无法创建流, 需要关闭的socket toClose = releaseIfNoNewStreams(); if (this.connection != null) { // We had an already-allocated connection and it's good. // 如果当前连接不为空, 就说明这个连接是可以用的 result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released! releasedConnection = null; } // 如果没有现成的连接 if (result == null) { // Attempt to get a connection from the pool. // 尝试从连接池中获取 Internal.instance.get(connectionPool, address, this, null); // 如果有复用的连接 if (connection != null) { // 表示找到连接池可复用的连接 foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } // 关闭socket closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. // 如果有存在已分配的连接或者是连接池内可复用的连接, 则直接返回该连接对象 return result; } // If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; // 切换路由 routeSelection = routeSelector.next(); } synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); // 获取可复用的连接 Internal.instance.get(connectionPool, address, this, route); // 如果存在可复用连接 if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } } // 如果没有找到可复用的连接 if (!foundPooledConnection) { // 如果当前路由为空 if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; // 创建新的连接 result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } } // If we found a pooled connection on the 2nd time around, we're done. // 如果第二次有找到, 则返回复用的连接 if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } // Do TCP + TLS handshakes. This is a blocking operation. // 做三次握手 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); // 将该路由从错误缓存记录中移除 routeDatabase().connected(result.route()); Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Pool the connection. // 在连接池中添加该连接 Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. // 如果有其他复数连接到相同地址, 则删除重复连接 if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; }
其实看方法注释, 我们大概可以知道这里做的就是返回一个连接, 首先会从连接池中来, 如果连接池中没有对应连接, 则再重新新建一个连接.
具体的注释都在代码里了, 我们再看下其中几个调用方法.
首先我们将即将要释放的连接指向当前的连接, 通过调用 releaseIfNoNewStreams
方法, 返回需要关闭的socket
我们来看下 releaseIfNoNewStreams
方法的代码
/** * 如果当前连接无法新建流, 释放当前连接, 并且返回需要关闭的socket * 由于http2复数请求会使用同一个连接, 所以可能存在当前连接限制后续的请求 */ private Socket releaseIfNoNewStreams() { // 断言锁持有 assert (Thread.holdsLock(connectionPool)); RealConnection allocatedConnection = this.connection; // 当当前连接不为空, 而且没有新的流被创建 if (allocatedConnection != null && allocatedConnection.noNewStreams) { return deallocate(false, false, true); } // 正常情况, 没有需要被关闭的socket返回 return null; }
可以看到只有当当前连接存在, 而且不允许有新的流产生的时候, 才会返回执行 deallocate(false, false, true)
后的结果, 正常的情况下, 没有需要被关闭的socket返回
关于 deallocate
方法, 可以看下面的代码
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) { assert (Thread.holdsLock(connectionPool)); if (streamFinished) { this.codec = null; } if (released) { this.released = true; } Socket socket = null; if (connection != null) { if (noNewStreams) { connection.noNewStreams = true; } if (this.codec == null && (this.released || connection.noNewStreams)) { // 释放连接 release(connection); // 如果这个连接的当前的流为空 if (connection.allocations.isEmpty()) { // 当连接的流为0时候的记录时间戳 connection.idleAtNanos = System.nanoTime(); // 判断连接是否闲置 if (Internal.instance.connectionBecameIdle(connectionPool, connection)) { // 如果闲置, 则返回需要关闭的socket socket = connection.socket(); } } // 回收 connection = null; } } return socket; } private void release(RealConnection connection) { for (int i = 0, size = connection.allocations.size(); i < size; i++) { Reference<StreamAllocation> reference = connection.allocations.get(i); if (reference.get() == this) { connection.allocations.remove(i); return; } } throw new IllegalStateException(); }
可以看到这里具体做的就是, 编解码类对象 codec
赋值为null, 调用 release
释放连接, 当这个 connection
没有连接流的时候, 一并判断连接是否闲置, 如果闲置, 则返回对应的 socket
, 并将当前的 connection
赋值为null.而这里的 release
方法主要做的就是移除连接对应流引用的移除.
我们回头去看 findConnection
方法内下一步
if (this.connection != null) { // We had an already-allocated connection and it's good. // 如果当前连接不为空, 就说明这个连接是可以用的 result = this.connection; releasedConnection = null; }
我们知道前面调用 releaseIfNoNewStreams
的时候, 如果有返回socket, 那么connection也会被置为null, 所以这里connection不为空, 说明就是现在的连接是可以用的, 那么需要释放连接的对象就为null, 没必要被释放.
// 如果没有现成的连接 if (result == null) { // Attempt to get a connection from the pool. // 尝试从连接池中获取 Internal.instance.get(connectionPool, address, this, null); // 如果有复用的连接 if (connection != null) { // 表示找到连接池可复用的连接 foundPooledConnection = true; result = connection; } else { selectedRoute = route; } }
而如果没有可用的连接, 那么就会从连接池中尝试获取
/** * 返回一个可重用的连接, 如果没有对应连接存在, 则返回null */ @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { // 断言锁的持有 assert (Thread.holdsLock(this)); // 遍历 for (RealConnection connection : connections) { // 判断连接是否可复用 if (connection.isEligible(address, route)) { streamAllocation.acquire(connection, true); return connection; } } return null; } public void acquire(RealConnection connection, boolean reportedAcquired) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); this.connection = connection; this.reportedAcquired = reportedAcquired; // 添加一个流的引用 connection.allocations.add(new StreamAllocationReference(this, callStackTrace)); }
可以看到, 如果当前连接池中有连接可复用, 则会将新的流引用添加在 connection.allocations
中.
// 关闭socket closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. // 如果有存在已分配的连接或者是连接池内可复用的连接, 则直接返回该连接对象 return result; }
再回到主方法内, 不论是否有找到可用的连接, 都会关闭socket, 然后根据是否存在需要释放的连接, 回调 eventListener.connectionReleased
, 并根据是否找到连接池内可用连接, 回调 eventListener.connectionAcquired
.当有实际可用的连接的时候, 那么直接返回该连接对象.
boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; // 切换路由 routeSelection = routeSelector.next(); } synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); // 获取可复用的连接 Internal.instance.get(connectionPool, address, this, route); // 如果存在可复用连接 if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } } // 如果没有找到可复用的连接 if (!foundPooledConnection) { // 如果当前路由为空 if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; // 创建新的连接 result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } } // If we found a pooled connection on the 2nd time around, we're done. // 如果第二次有找到, 则返回复用的连接 if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }
当仍然没有找到连接的时候, 那么就会切换路由, 再次从连接池内找对应路由可复用的连接, 如果有找到, 则返回这次复用的连接对象.
// Do TCP + TLS handshakes. This is a blocking operation. // 做三次握手 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); // 将该路由从错误缓存记录中移除 routeDatabase().connected(result.route()); Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Pool the connection. // 在连接池中添加该连接 Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. // 如果有其他复数连接到相同地址, 则删除重复连接 if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result;
但如果这次仍然没有找到对应可用的连接, 则只好新建连接, 并将流引用加到对应的 conection
对象中, 然后做三次握手, 并将对应的路由从错误缓存中移除.
最后还做了重复连接的去重的工作, 然后再返回这个新建的连接对象.
截止至此, 寻找可用连接的代码分析就完成了.
回头再看下 HttpCodec的初始化
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain, StreamAllocation streamAllocation) throws SocketException { if (http2Connection != null) { return new Http2Codec(client, chain, streamAllocation, http2Connection); } else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1Codec(client, streamAllocation, source, sink); } }
主要就是根据判断是Http1还是Http2来判断新建哪个编解码类.
可以看出, ConnectionInterceptor
拦截器, 主要做的是,
connection HttpCodec