Github: okhttp 分析版本: 930d4d0
This is the last interceptor in the chain. It makes a network call to the server
ConnectInterceptor
拦截器的功能就是负责与服务器建立 Socket 连接,并且创建了一个 Exchange
它包括通向服务器的输入流和输出流。而接下来的 CallServerInterceptor
拦截器的功能使用 Exchange
与服务器进行数据的读写操作的
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange() val request = realChain.request() val requestBody = request.body val sentRequestMillis = System.currentTimeMillis() // 写入请求头信息 exchange.writeRequestHeaders(request) var responseHeadersStarted = false var responseBuilder: Response.Builder? = null // 写入请求体信息(有请求体的情况) if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() responseHeadersStarted = true exchange.responseHeadersStart() responseBuilder = exchange.readResponseHeaders(true) } if (responseBuilder == null) { if (requestBody.isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // Write the request body if the "Expect: 100-continue" expectation was met. val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection()!!.isMultiplexed) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() } // 结束请求 if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } if (!responseHeadersStarted) { exchange.responseHeadersStart() } // 得到响应头 if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false)!! } // 构建初步响应 var response = responseBuilder .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) // 发送请求的时间 .receivedResponseAtMillis(System.currentTimeMillis()) // 接收到响应的时间 .build() var code = response.code() if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false)!! .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code() } exchange.responseHeadersEnd(response) // 构建响应体 response = if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } if ("close".equals(response.request().header("Connection"), ignoreCase = true) || "close".equals(response.header("Connection"), ignoreCase = true)) { exchange.noNewExchangesOnConnection() } if ((code == 204 || code == 205) && response.body()?.contentLength() ?: -1L > 0L) { throw ProtocolException( "HTTP $code had non-zero Content-Length: ${response.body()?.contentLength()}") } return response } }
class Exchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, private val finder: ExchangeFinder, private val codec: ExchangeCodec ) { @Throws(IOException::class) fun writeRequestHeaders(request: Request) { try { eventListener.requestHeadersStart(call) codec.writeRequestHeaders(request) eventListener.requestHeadersEnd(call, request) } catch (e: IOException) { eventListener.requestFailed(call, e) trackFailure(e) throw e } } }
这里调用了 ExchangeCodec
的 writeRequestHeaders()
,对应的使用策略模式分别根据是 Http 还是 Http/2 请求
class Http1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ private val client: OkHttpClient?, /** The connection that carries this stream. */ private val realConnection: RealConnection?, private val source: BufferedSource, private val sink: BufferedSink ) : ExchangeCodec { /** * Prepares the HTTP headers and sends them to the server. * * For streaming requests with a body, headers must be prepared **before** the output stream has * been written to. Otherwise the body would need to be buffered! * * For non-streaming requests with a body, headers must be prepared **after** the output stream * has been written to and closed. This ensures that the `Content-Length` header field receives * the proper value. */ override fun writeRequestHeaders(request: Request) { val requestLine = RequestLine.get( request, realConnection!!.route().proxy().type()) writeRequest(request.headers, requestLine) } /** Returns bytes of a request header for sending on an HTTP transport. */ fun writeRequest(headers: Headers, requestLine: String) { check(state == STATE_IDLE) { "state: $state" } sink.writeUtf8(requestLine).writeUtf8("/r/n") for (i in 0 until headers.size) { sink.writeUtf8(headers.name(i)) .writeUtf8(": ") .writeUtf8(headers.value(i)) .writeUtf8("/r/n") } sink.writeUtf8("/r/n") state = STATE_OPEN_REQUEST_BODY } }
遍历 Header,然后通过前一个拦截器建立的链接得到的 Sink
来进行写操作,同时将 state 变量赋值为了 STATE_OPEN_REQUEST_BODY
,运用了状态模式的思想
class Exchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, private val finder: ExchangeFinder, private val codec: ExchangeCodec ) { @Throws(IOException::class) fun createRequestBody(request: Request, duplex: Boolean): Sink { this.isDuplex = duplex val contentLength = request.body!!.contentLength() eventListener.requestBodyStart(call) val rawRequestBody = codec.createRequestBody(request, contentLength) return RequestBodySink(rawRequestBody, contentLength) } }
通过 createRequestBody 创建一个 Sink 对象,本质还是使用在 ConnectIntercept
创建的 ExchangeCodec
内部封装 Sink 对象进行写操作的
class Http1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ private val client: OkHttpClient?, /** The connection that carries this stream. */ private val realConnection: RealConnection?, private val source: BufferedSource, private val sink: BufferedSink ) : ExchangeCodec { override fun createRequestBody(request: Request, contentLength: Long): Sink { return when { request.body != null && request.body!!.isDuplex() -> throw ProtocolException( "Duplex connections are not supported for HTTP/1") request.isChunked() -> newChunkedSink() // Stream a request body of unknown length. contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length. else -> // Stream a request body of a known length. throw IllegalStateException( "Cannot stream a request body without chunked encoding or a known content length!") } } private fun newChunkedSink(): Sink { check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" } state = STATE_WRITING_REQUEST_BODY return ChunkedSink() } private fun newKnownLengthSink(): Sink { check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" } state = STATE_WRITING_REQUEST_BODY return KnownLengthSink() } /** An HTTP request body. */ private inner class KnownLengthSink : Sink { private val timeout = ForwardingTimeout(sink.timeout()) private var closed: Boolean = false override fun timeout(): Timeout = timeout override fun write(source: Buffer, byteCount: Long) { check(!closed) { "closed" } checkOffsetAndCount(source.size, 0, byteCount) sink.write(source, byteCount) } override fun flush() { if (closed) return // Don't throw; this stream might have been closed on the caller's behalf. sink.flush() } override fun close() { if (closed) return closed = true detachTimeout(timeout) state = STATE_READ_RESPONSE_HEADERS } } /** * An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's responsibility * to buffer chunks; typically by using a buffered sink with this sink. */ private inner class ChunkedSink : Sink { private val timeout = ForwardingTimeout(sink.timeout()) private var closed: Boolean = false override fun timeout(): Timeout = timeout override fun write(source: Buffer, byteCount: Long) { check(!closed) { "closed" } if (byteCount == 0L) return sink.writeHexadecimalUnsignedLong(byteCount) sink.writeUtf8("/r/n") sink.write(source, byteCount) sink.writeUtf8("/r/n") } @Synchronized override fun flush() { if (closed) return // Don't throw; this stream might have been closed on the caller's behalf. sink.flush() } @Synchronized override fun close() { if (closed) return closed = true sink.writeUtf8("0/r/n/r/n") detachTimeout(timeout) state = STATE_READ_RESPONSE_HEADERS } } }
根据请求头 Transfer-Encoding 是否为 chunked 的方式,来创建不同 Sink 实现类,如果是 chunked 方式那么就创建 ChunkedSink
;如果不是 chunked 就表示内容的大小是固定的,那么就根据 content-length 创建指定大小的 KnownLengthSink
,然后又在外边包装了一层 RequestBodySink
class Exchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, private val finder: ExchangeFinder, private val codec: ExchangeCodec ) { /** A request body that fires events when it completes. */ private inner class RequestBodySink internal constructor( delegate: Sink, /** The exact number of bytes to be written, or -1L if that is unknown. */ private val contentLength: Long ) : ForwardingSink(delegate) { private var completed = false private var bytesReceived = 0L private var closed = false @Throws(IOException::class) override fun write(source: Buffer, byteCount: Long) { check(!closed) { "closed" } if (contentLength != -1L && bytesReceived + byteCount > contentLength) { throw ProtocolException( "expected $contentLength bytes but received ${bytesReceived + byteCount}") } try { super.write(source, byteCount) this.bytesReceived += byteCount } catch (e: IOException) { throw complete(e) } } @Throws(IOException::class) override fun flush() { try { super.flush() } catch (e: IOException) { throw complete(e) } } @Throws(IOException::class) override fun close() { if (closed) return closed = true if (contentLength != -1L && bytesReceived != contentLength) { throw ProtocolException("unexpected end of stream") } try { super.close() complete(null) } catch (e: IOException) { throw complete(e) } } private fun <E : IOException?> complete(e: E): E { if (completed) return e completed = true return bodyComplete(bytesReceived, false, true, e) } } }
最终调用的 write(source: Buffer, byteCount: Long)
是调用的传入的 sink 的 write(source: Buffer, byteCount: Long)
class FormBody internal constructor( encodedNames: List<String>, encodedValues: List<String> ) : RequestBody() { @Throws(IOException::class) override fun writeTo(sink: BufferedSink) { writeOrCountBytes(sink, false) } /** * Either writes this request to `sink` or measures its content length. We have one method * do double-duty to make sure the counting and content are consistent, particularly when it comes * to awkward operations like measuring the encoded length of header strings, or the * length-in-digits of an encoded integer. */ private fun writeOrCountBytes(sink: BufferedSink?, countBytes: Boolean): Long { var byteCount = 0L val buffer: Buffer = if (countBytes) Buffer() else sink!!.buffer for (i in 0 until encodedNames.size) { if (i > 0) buffer.writeByte('&'.toInt()) buffer.writeUtf8(encodedNames[i]) buffer.writeByte('='.toInt()) buffer.writeUtf8(encodedValues[i]) } if (countBytes) { byteCount = buffer.size buffer.clear() } return byteCount } }
具体是如何写入数据的,要根据传入到 Request 中的 RequestBody 的实现类来定,如果是表单类型的则是有 FormBody 负责具体的写操作,如果是文件类型的则是由 MutilPartBody 负责具体的写操作
class Exchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, private val finder: ExchangeFinder, private val codec: ExchangeCodec ) { @Throws(IOException::class) fun finishRequest() { try { codec.finishRequest() } catch (e: IOException) { eventListener.requestFailed(call, e) trackFailure(e) throw e } } }
class Http1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ private val client: OkHttpClient?, /** The connection that carries this stream. */ private val realConnection: RealConnection?, private val source: BufferedSource, private val sink: BufferedSink ) : ExchangeCodec { override fun finishRequest() { sink.flush() } }
sink.flush()
将缓存区中数据写入到底层的 sink 中,其实就是写入到 server 中去了,相当于一个刷新缓冲区的功能
class Exchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, private val finder: ExchangeFinder, private val codec: ExchangeCodec ) { @Throws(IOException::class) fun readResponseHeaders(expectContinue: Boolean): Response.Builder? { try { val result = codec.readResponseHeaders(expectContinue) result?.initExchange(this) return result } catch (e: IOException) { eventListener.responseFailed(call, e) trackFailure(e) throw e } } }
class Http1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ private val client: OkHttpClient?, /** The connection that carries this stream. */ private val realConnection: RealConnection?, private val source: BufferedSource, private val sink: BufferedSink ) : ExchangeCodec { override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? { check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) { "state: $state" } try { val statusLine = StatusLine.parse(readHeaderLine()) val responseBuilder = Response.Builder() .protocol(statusLine.protocol) // 协议,也就是 http 的版本例如 http1/2 /spdy .code(statusLine.code) // 响应码 .message(statusLine.message) // 响应消息 .headers(readHeaders()) // 响应头 return when { expectContinue && statusLine.code == HTTP_CONTINUE -> { null } statusLine.code == HTTP_CONTINUE -> { state = STATE_READ_RESPONSE_HEADERS responseBuilder } else -> { state = STATE_OPEN_RESPONSE_BODY responseBuilder } } } catch (e: EOFException) { // Provide more context if the server ends the stream before sending a response. val address = realConnection?.route()?.address()?.url?.redact() ?: "unknown" throw IOException("unexpected end of stream on $address", e) } } }
当客户端将请求数据发送给服务端之后,服务端做了处理之后会将结果返回给客户端,这是客户端需要根据这些返回的数据构造出一个 Response 对象出来然后返回给调用者
class Exchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, private val finder: ExchangeFinder, private val codec: ExchangeCodec ) { @Throws(IOException::class) fun openResponseBody(response: Response): ResponseBody { try { eventListener.responseBodyStart(call) val contentType = response.header("Content-Type") val contentLength = codec.reportedContentLength(response) val rawSource = codec.openResponseBodySource(response) val source = ResponseBodySource(rawSource, contentLength) return RealResponseBody(contentType, contentLength, source.buffer()) } catch (e: IOException) { eventListener.responseFailed(call, e) trackFailure(e) throw e } } /** A response body that fires events when it completes. */ internal inner class ResponseBodySource( delegate: Source, private val contentLength: Long ) : ForwardingSource(delegate) { private var bytesReceived = 0L private var completed = false private var closed = false init { if (contentLength == 0L) { complete(null) } } @Throws(IOException::class) override fun read(sink: Buffer, byteCount: Long): Long { check(!closed) { "closed" } try { val read = delegate.read(sink, byteCount) if (read == -1L) { complete(null) return -1L } val newBytesReceived = bytesReceived + read if (contentLength != -1L && newBytesReceived > contentLength) { throw ProtocolException("expected $contentLength bytes but received $newBytesReceived") } bytesReceived = newBytesReceived if (newBytesReceived == contentLength) { complete(null) } return read } catch (e: IOException) { throw complete(e) } } @Throws(IOException::class) override fun close() { if (closed) return closed = true try { super.close() complete(null) } catch (e: IOException) { throw complete(e) } } fun <E : IOException?> complete(e: E): E { if (completed) return e completed = true return bodyComplete(bytesReceived, true, false, e) } } }
返回一个 ResponseBody 对象,该对象封装了连接服务端的输入流对 Source 对象
class Http1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ private val client: OkHttpClient?, /** The connection that carries this stream. */ private val realConnection: RealConnection?, private val source: BufferedSource, private val sink: BufferedSink ) : ExchangeCodec { override fun openResponseBodySource(response: Response): Source { return when { !response.promisesBody() -> newFixedLengthSource(0) response.isChunked() -> newChunkedSource(response.request().url) else -> { val contentLength = response.headersContentLength() if (contentLength != -1L) { newFixedLengthSource(contentLength) } else { newUnknownLengthSource() } } } } private fun newFixedLengthSource(length: Long): Source { check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" } state = STATE_READING_RESPONSE_BODY return FixedLengthSource(length) } private fun newChunkedSource(url: HttpUrl): Source { check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" } state = STATE_READING_RESPONSE_BODY return ChunkedSource(url) } private fun newUnknownLengthSource(): Source { check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" } state = STATE_READING_RESPONSE_BODY realConnection!!.noNewExchanges() return UnknownLengthSource() } /** An HTTP body with a fixed length specified in advance. */ private inner class FixedLengthSource internal constructor(private var bytesRemaining: Long) : AbstractSource() { init { if (bytesRemaining == 0L) { responseBodyComplete() } } override fun read(sink: Buffer, byteCount: Long): Long { require(byteCount >= 0L) { "byteCount < 0: $byteCount" } check(!closed) { "closed" } if (bytesRemaining == 0L) return -1 val read = super.read(sink, minOf(bytesRemaining, byteCount)) if (read == -1L) { realConnection!!.noNewExchanges() // The server didn't supply the promised content length. val e = ProtocolException("unexpected end of stream") responseBodyComplete() throw e } bytesRemaining -= read if (bytesRemaining == 0L) { responseBodyComplete() } return read } override fun close() { if (closed) return if (bytesRemaining != 0L && !discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) { realConnection!!.noNewExchanges() // Unread bytes remain on the stream. responseBodyComplete() } closed = true } } /** An HTTP body with alternating chunk sizes and chunk bodies. */ private inner class ChunkedSource internal constructor(private val url: HttpUrl) : AbstractSource() { private var bytesRemainingInChunk = NO_CHUNK_YET private var hasMoreChunks = true override fun read(sink: Buffer, byteCount: Long): Long { require(byteCount >= 0L) { "byteCount < 0: $byteCount" } check(!closed) { "closed" } if (!hasMoreChunks) return -1 if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) { readChunkSize() if (!hasMoreChunks) return -1 } val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk)) if (read == -1L) { realConnection!!.noNewExchanges() // The server didn't supply the promised chunk length. val e = ProtocolException("unexpected end of stream") responseBodyComplete() throw e } bytesRemainingInChunk -= read return read } private fun readChunkSize() { // Read the suffix of the previous chunk. if (bytesRemainingInChunk != NO_CHUNK_YET) { source.readUtf8LineStrict() } try { bytesRemainingInChunk = source.readHexadecimalUnsignedLong() val extensions = source.readUtf8LineStrict().trim() if (bytesRemainingInChunk < 0L || extensions.isNotEmpty() && !extensions.startsWith(";")) { throw ProtocolException("expected chunk size and optional extensions" + " but was /"$bytesRemainingInChunk$extensions/"") } } catch (e: NumberFormatException) { throw ProtocolException(e.message) } if (bytesRemainingInChunk == 0L) { hasMoreChunks = false trailers = readHeaders() client!!.cookieJar().receiveHeaders(url, trailers!!) responseBodyComplete() } } override fun close() { if (closed) return if (hasMoreChunks && !discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) { realConnection!!.noNewExchanges() // Unread bytes remain on the stream. responseBodyComplete() } closed = true } } /** An HTTP message body terminated by the end of the underlying stream. */ private inner class UnknownLengthSource : AbstractSource() { private var inputExhausted: Boolean = false override fun read(sink: Buffer, byteCount: Long): Long { require(byteCount >= 0L) { "byteCount < 0: $byteCount" } check(!closed) { "closed" } if (inputExhausted) return -1 val read = super.read(sink, byteCount) if (read == -1L) { inputExhausted = true responseBodyComplete() return -1 } return read } override fun close() { if (closed) return if (!inputExhausted) { responseBodyComplete() } closed = true } } }
根据响应的不同请求创建不同的 Source 对象