@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val request = realChain.request() val transmitter = realChain.transmitter() // We need the network to satisfy this request. Possibly for validating a conditional GET. val doExtensiveHealthChecks = request.method != "GET" val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks) return realChain.proceed(request, transmitter, exchange) } 复制代码
/** Returns a new exchange to carry a new request and response. */ internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange { synchronized(connectionPool) { check(!noMoreExchanges) { "released" } check(exchange == null) { "cannot make a new request because the previous response is still open: " + "please call response.close()" } } val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks) val result = Exchange(this, call, eventListener, exchangeFinder!!, codec) synchronized(connectionPool) { this.exchange = result this.exchangeRequestDone = false this.exchangeResponseDone = false return result } } 复制代码
1.如果当前请求已经连接上,那么就直接使用。 2.如果连接池中有可以服用的连接,那么根据RealConnection.isEligible来判断是否使用。 3.如果当前没有存在的连接,那么就尝试从新建立一个新的连接。
fun find(client: OkHttpClient,chain: Interceptor.Chain,doExtensiveHealthChecks: Boolean ): ExchangeCodec { //参数设置代码省略。。。。。。。 try { //找到目前可用的连接 val resultConnection = findHealthyConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled, doExtensiveHealthChecks = doExtensiveHealthChecks ) return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure() throw e } catch (e: IOException) { trackFailure() throw RouteException(e) } } 复制代码
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ 复制代码
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ 复制代码
@Throws(IOException::class) private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { var foundPooledConnection = false var result: RealConnection? = null var selectedRoute: Route? = null var releasedConnection: RealConnection? val toClose: Socket? //使用代码块锁 保证线程同步 synchronized(connectionPool) { //前文分析过这里只会是用户主动取消,如果取消抛出异常 if (transmitter.isCanceled) throw IOException("Canceled") hasStreamFailure = false // This is a fresh attempt. releasedConnection = transmitter.connection //这里判断如果数据传输已经完成就返回需要关闭的sokcet,反之为null toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) { transmitter.releaseConnectionNoEvents() } else { null } //已经有一个分配好的连接 并且处于可用状态 if (transmitter.connection != null) { // We had an already-allocated connection and it's good. result = transmitter.connection releasedConnection = null } //按照之前的策略,如果当前没有已经连接好的连接,会尝试从连接池中找到一个可用的。 if (result == null) { // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true result = transmitter.connection } else if (nextRouteToTry != null) { selectedRoute = nextRouteToTry nextRouteToTry = null } else if (retryCurrentRoute()) { selectedRoute = transmitter.connection!!.route() } } } //关闭之前socket toClose?.closeQuietly() if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection!!) } if (foundPooledConnection) { eventListener.connectionAcquired(call, result!!) } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result!! } // If we need a route selection, make one. This is a blocking operation. var newRouteSelection = false if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) { newRouteSelection = true routeSelection = routeSelector.next() } var routes: List<Route>? = null synchronized(connectionPool) { if (transmitter.isCanceled) throw IOException("Canceled") if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection!!.routes //再次尝试从连接池中找到可用连接 if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true result = transmitter.connection } } if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection!!.next() } //再次尝试从连接池中找到可用连接失败,创建一个connection // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = RealConnection(connectionPool, selectedRoute!!) connectingConnection = result } } // If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result!!) return result!! } //刚刚创建出来的connection 调用connect方法进行网络连接 // Do TCP + TLS handshakes. This is a blocking operation. result!!.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) //将当前域名从失败的黑名单中移除掉 connectionPool.routeDatabase.connected(result!!.route()) var socket: Socket? = null synchronized(connectionPool) { connectingConnection = null //在连接过程中最后一次尝试从连接池中找到已经有的连接,防止同时有两个相对host的请求发出,这样就能复用已有的连接 // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result!!.noNewExchanges = true socket = result!!.socket() result = transmitter.connection // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In // that case we will retry the route we just successfully connected with. nextRouteToTry = selectedRoute } else { //将这次连接放入连接池中,为了后续可能直接复用 connectionPool.put(result!!) transmitter.acquireConnectionNoEvents(result!!) } } socket?.closeQuietly() eventListener.connectionAcquired(call, result!!) return result!! } 复制代码
@Throws(SocketException::class) internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec { val socket = this.socket!! val source = this.source!! val sink = this.sink!! val http2Connection = this.http2Connection return if (http2Connection != null) { Http2ExchangeCodec(client, this, chain, http2Connection) } else { socket.soTimeout = chain.readTimeoutMillis() source.timeout().timeout(chain.readTimeoutMillis().toLong(), MILLISECONDS) sink.timeout().timeout(chain.writeTimeoutMillis().toLong(), MILLISECONDS) Http1ExchangeCodec(client, this, source, sink) } } 复制代码
- 发送请求的headers[writeRequest]
- 打开一个水槽(直译)去写入请求体
- 开始写入请求体然后关闭这个水槽
- 读取响应头 - 申请一部分资源开始读取响应体 - 读取完成响应体后关闭资源