Github: okhttp 分析版本: 930d4d0
Opens a connection to the target server and proceeds to the next interceptor
class ConnectInterceptor(val client: OkHttpClient) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val request = realChain.request() val transmitter = realChain.transmitter() // We need the network to satisfy this request. Possibly for validating a conditional GET. val doExtensiveHealthChecks = request.method != "GET" val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks) return realChain.proceed(request, transmitter, exchange) } }
doExtensiveHealthChecks
为非 GET 请求 transmitter.newExchange()
来创建 Exchange
realChain.proceed()
告知下一个拦截器开始去执行 class Transmitter( private val client: OkHttpClient, private val call: Call ) { /** Returns a new exchange to carry a new request and response. */ internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange { synchronized(connectionPool) { check(!noMoreExchanges) { "released" } check(exchange == null) { "cannot make a new request because the previous response is still open: " + "please call response.close()" } } val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks) val result = Exchange(this, call, eventListener, exchangeFinder!!, codec) synchronized(connectionPool) { this.exchange = result this.exchangeRequestDone = false this.exchangeResponseDone = false return result } } }
通过 exchangeFinder!!.find()
来创建 ExchangeCodec
ExchangeFinder#find(client: OkHttpClient, chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean)
class ExchangeFinder( private val transmitter: Transmitter, private val connectionPool: RealConnectionPool, private val address: Address, private val call: Call, private val eventListener: EventListener ) { fun find( client: OkHttpClient, chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean ): ExchangeCodec { val connectTimeout = chain.connectTimeoutMillis() val readTimeout = chain.readTimeoutMillis() val writeTimeout = chain.writeTimeoutMillis() val pingIntervalMillis = client.pingIntervalMillis() val connectionRetryEnabled = client.retryOnConnectionFailure() try { // 寻找一个链接(在链接池中寻找或者在新创建一个连接) val resultConnection = findHealthyConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled, doExtensiveHealthChecks = doExtensiveHealthChecks ) return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure() throw e } catch (e: IOException) { trackFailure() throw RouteException(e) } } }
通过 findHealthyConnection()
找到一条『健康』的链接,然后通过 RealConnection#newCodec()
来创建 ExchangeCodec
ExchangeFinder#findHealthyConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean)
class ExchangeFinder( private val transmitter: Transmitter, private val connectionPool: RealConnectionPool, private val address: Address, private val call: Call, private val eventListener: EventListener ) { /** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ @Throws(IOException::class) private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { // 循环查找一个链接 val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled ) // 如果是新链接,跳过 healthy 判断直接返回 // If this is a brand new connection, we can skip the extensive health checks. synchronized(connectionPool) { if (candidate.successCount == 0) { return candidate } } // 这条链接是否可用 // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges() // 禁止这条链接,将 noNewExchanges 置为 true continue } return candidate } } }
findHealthyConnection()
是负责死循环去检测获取到的 RealConnection
是否可用,如果是新创建的则跳过检测,当 RealConnection
不可用的话就继续去调用 findConnection 去重新获取一个连接
ExchangeFinder#findConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean)
class ExchangeFinder( private val transmitter: Transmitter, private val connectionPool: RealConnectionPool, private val address: Address, private val call: Call, private val eventListener: EventListener ) { /** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ @Throws(IOException::class) private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { var foundPooledConnection = false var result: RealConnection? = null var selectedRoute: Route? = null var releasedConnection: RealConnection? val toClose: Socket? synchronized(connectionPool) { if (transmitter.isCanceled) throw IOException("Canceled") // 取消了 hasStreamFailure = false // This is a fresh attempt. releasedConnection = transmitter.connection // 若不可用了,则关闭 toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) { transmitter.releaseConnectionNoEvents() } else { null } // 从 transmitter 获取 if (transmitter.connection != null) { // We had an already-allocated connection and it's good. result = transmitter.connection releasedConnection = null } if (result == null) { // 从链接池中取,取到赋值给 transmitter // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true result = transmitter.connection } else if (nextRouteToTry != null) { // 路由 selectedRoute = nextRouteToTry nextRouteToTry = null } else if (retryCurrentRoute()) { selectedRoute = transmitter.connection!!.route() } } } // 释放没用的 connection toClose?.closeQuietly() if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection!!) } // 如果找到复用的,则使用这条链接,回调 if (foundPooledConnection) { eventListener.connectionAcquired(call, result!!) } if (result != null) { // 找到一条可复用的链接 // If we found an already-allocated or pooled connection, we're done. return result!! } // 切换路由再在链接池里面找,如果有则返回 // If we need a route selection, make one. This is a blocking operation. var newRouteSelection = false if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) { newRouteSelection = true routeSelection = routeSelector.next() } var routes: List<Route>? = null synchronized(connectionPool) { if (transmitter.isCanceled) throw IOException("Canceled") if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection!!.routes if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true result = transmitter.connection } } if (!foundPooledConnection) { // 没找到则创建一条 if (selectedRoute == null) { selectedRoute = routeSelection!!.next() } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = RealConnection(connectionPool, selectedRoute!!) connectingConnection = result } } // If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result!!) return result!! } // 建立链接 // Do TCP + TLS handshakes. This is a blocking operation. result!!.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) // 将这条路由从错误缓存中清除 connectionPool.routeDatabase.connected(result!!.route()) var socket: Socket? = null synchronized(connectionPool) { connectingConnection = null // 检测一下,若多并发情况下同 address 下导致创建多个,则将当前这个释放掉 // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result!!.noNewExchanges = true socket = result!!.socket() result = transmitter.connection } else { // 将这个请求加入链接池 connectionPool.put(result!!) transmitter.acquireConnectionNoEvents(result!!) } } // 释放掉 socket socket?.closeQuietly() eventListener.connectionAcquired(call, result!!) return result!! } }
RealConnectionPool#transmitterAcquirePooledConnection(address: Address, transmitter: Transmitter, routes: List
class RealConnectionPool( /** The maximum number of idle connections for each address. */ private val maxIdleConnections: Int, keepAliveDuration: Long, timeUnit: TimeUnit ) { /** * Attempts to acquire a recycled connection to `address` for `transmitter`. Returns true if a * connection was acquired. * * If `routes` is non-null these are the resolved routes (ie. IP addresses) for the connection. * This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com` * and `square.ca`. */ fun transmitterAcquirePooledConnection( address: Address, transmitter: Transmitter, routes: List<Route>?, requireMultiplexed: Boolean ): Boolean { assert(Thread.holdsLock(this)) for (connection in connections) { if (requireMultiplexed && !connection.isMultiplexed) continue if (!connection.isEligible(address, routes)) continue transmitter.acquireConnectionNoEvents(connection) return true } return false } }
遍历 pool 中的 connections(ArrayQueue),如果链接是可以复用的则将这个连接返回
class RealConnection( val connectionPool: RealConnectionPool, private val route: Route ) : Http2Connection.Listener(), Connection { /** * Returns true if this connection can carry a stream allocation to `address`. If non-null * `route` is the resolved route for a connection. */ internal fun isEligible(address: Address, routes: List<Route>?): Boolean { // 如果当前这次连接的最大并发数达到上限,返回 false // If this connection is not accepting new exchanges, we're done. if (transmitters.size >= allocationLimit || noNewExchanges) return false // 如果两个 address 的其他参数不相同,返回 false // If the non-host fields of the address don't overlap, we're done. if (!this.route.address().equalsNonHost(address)) return false // 如果两个 address 的 url 的 host 相同,返回 true, // If the host exactly matches, we're done: this connection can carry the address. if (address.url.host == this.route().address().url.host) { return true // This connection is a perfect match. } // 如果上面的不符合,在下面的情况下可以合并链接 // At this point we don't have a hostname match. But we still be able to carry the request if // our connection coalescing requirements are met. See also: // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/ // 首先这个链接需要时 HTTP/2 // 1. This connection must be HTTP/2. if (http2Connection == null) return false // 同一 IP // 2. The routes must share an IP address. if (routes == null || !routeMatchesAny(routes)) return false // 这个连接的服务器证书必须覆盖新的主机 // 3. This connection's server certificate's must cover the new host. if (address.hostnameVerifier !== OkHostnameVerifier) return false if (!supportsUrl(address.url)) return false // 证书将必须匹配主机 // 4. Certificate pinning must match the host. try { address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates) } catch (_: SSLPeerUnverifiedException) { return false } return true // The caller's address can be carried by this connection. } }
class RealConnection( val connectionPool: RealConnectionPool, private val route: Route ) : Http2Connection.Listener(), Connection { @Throws(SocketException::class) internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec { val socket = this.socket!! val source = this.source!! val sink = this.sink!! val http2Connection = this.http2Connection return if (http2Connection != null) { Http2ExchangeCodec(client, this, chain, http2Connection) } else { socket.soTimeout = chain.readTimeoutMillis() source.timeout().timeout(chain.readTimeoutMillis().toLong(), MILLISECONDS) sink.timeout().timeout(chain.writeTimeoutMillis().toLong(), MILLISECONDS) Http1ExchangeCodec(client, this, source, sink) } } }
判断是 Http 还是 Http2,然后根据策略模式返回