转载

OkHttp - CallServerInterceptor源码简析

Github: okhttp 分析版本: 930d4d0

This is the last interceptor in the chain. It makes a network call to the server

ConnectInterceptor 拦截器的功能就是负责与服务器建立 Socket 连接,并且创建了一个 Exchange 它包括通向服务器的输入流和输出流。而接下来的 CallServerInterceptor 拦截器的功能使用 Exchange 与服务器进行数据的读写操作的

intercept(chain: Interceptor.Chain)

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange()
    val request = realChain.request()
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()

    // 写入请求头信息
    exchange.writeRequestHeaders(request)

    var responseHeadersStarted = false
    var responseBuilder: Response.Builder? = null
    // 写入请求体信息(有请求体的情况)
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseHeadersStarted = true
        exchange.responseHeadersStart()
        responseBuilder = exchange.readResponseHeaders(true)
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection()!!.isMultiplexed) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }

    // 结束请求
    if (requestBody == null || !requestBody.isDuplex()) {
      exchange.finishRequest()
    }
    if (!responseHeadersStarted) {
      exchange.responseHeadersStart()
    }
    // 得到响应头
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false)!!
    }

    // 构建初步响应
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection()!!.handshake())
        .sentRequestAtMillis(sentRequestMillis) // 发送请求的时间
        .receivedResponseAtMillis(System.currentTimeMillis()) // 接收到响应的时间
        .build()
    var code = response.code()
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      response = exchange.readResponseHeaders(false)!!
          .request(request)
          .handshake(exchange.connection()!!.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code()
    }

    exchange.responseHeadersEnd(response)

    // 构建响应体
    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request().header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body()?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body()?.contentLength()}")
    }
    return response
  }
}
  • 写入请求 Header
  • 如果请求头的 Expect: 100-continue 时,只发送请求头
  • 根据返回的结果判断是否继续请求流程
  • 写入请求体,完成请求
  • 得到响应头,构建初步响应
  • 构建响应体,完成最终响应
  • 返回响应

Request Header

Exchange#writeRequestHeaders(request: Request)

class Exchange(
  internal val transmitter: Transmitter,
  internal val call: Call,
  internal val eventListener: EventListener,
  private val finder: ExchangeFinder,
  private val codec: ExchangeCodec
) {
  @Throws(IOException::class)
  fun writeRequestHeaders(request: Request) {
    try {
      eventListener.requestHeadersStart(call)
      codec.writeRequestHeaders(request)
      eventListener.requestHeadersEnd(call, request)
    } catch (e: IOException) {
      eventListener.requestFailed(call, e)
      trackFailure(e)
      throw e
    }
  }
}

这里调用了 ExchangeCodecwriteRequestHeaders() ,对应的使用策略模式分别根据是 Http 还是 Http/2 请求

Http1ExchangeCodec#writeRequestHeaders(request: Request)

class Http1ExchangeCodec(
  /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
  private val client: OkHttpClient?,
  /** The connection that carries this stream. */
  private val realConnection: RealConnection?,
  private val source: BufferedSource,
  private val sink: BufferedSink
) : ExchangeCodec {
  /**
   * Prepares the HTTP headers and sends them to the server.
   *
   * For streaming requests with a body, headers must be prepared **before** the output stream has
   * been written to. Otherwise the body would need to be buffered!
   *
   * For non-streaming requests with a body, headers must be prepared **after** the output stream
   * has been written to and closed. This ensures that the `Content-Length` header field receives
   * the proper value.
   */
  override fun writeRequestHeaders(request: Request) {
    val requestLine = RequestLine.get(
        request, realConnection!!.route().proxy().type())
    writeRequest(request.headers, requestLine)
  }
  
  /** Returns bytes of a request header for sending on an HTTP transport. */
  fun writeRequest(headers: Headers, requestLine: String) {
    check(state == STATE_IDLE) { "state: $state" }
    sink.writeUtf8(requestLine).writeUtf8("/r/n")
    for (i in 0 until headers.size) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("/r/n")
    }
    sink.writeUtf8("/r/n")
    state = STATE_OPEN_REQUEST_BODY
  }
}

遍历 Header,然后通过前一个拦截器建立的链接得到的 Sink 来进行写操作,同时将 state 变量赋值为了 STATE_OPEN_REQUEST_BODY ,运用了状态模式的思想

Request Body

Exchange#createRequestBody(request: Request, duplex: Boolean)

class Exchange(
  internal val transmitter: Transmitter,
  internal val call: Call,
  internal val eventListener: EventListener,
  private val finder: ExchangeFinder,
  private val codec: ExchangeCodec
) {
  @Throws(IOException::class)
  fun createRequestBody(request: Request, duplex: Boolean): Sink {
    this.isDuplex = duplex
    val contentLength = request.body!!.contentLength()
    eventListener.requestBodyStart(call)
    val rawRequestBody = codec.createRequestBody(request, contentLength)
    return RequestBodySink(rawRequestBody, contentLength)
  }
}

通过 createRequestBody 创建一个 Sink 对象,本质还是使用在 ConnectIntercept 创建的 ExchangeCodec 内部封装 Sink 对象进行写操作的

Http1ExchangeCodec#createRequestBody(request: Request, contentLength: Long)

class Http1ExchangeCodec(
  /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
  private val client: OkHttpClient?,
  /** The connection that carries this stream. */
  private val realConnection: RealConnection?,
  private val source: BufferedSource,
  private val sink: BufferedSink
) : ExchangeCodec {
  override fun createRequestBody(request: Request, contentLength: Long): Sink {
    return when {
      request.body != null && request.body!!.isDuplex() -> throw ProtocolException(
          "Duplex connections are not supported for HTTP/1")
      request.isChunked() -> newChunkedSink() // Stream a request body of unknown length.
      contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.
      else -> // Stream a request body of a known length.
        throw IllegalStateException(
            "Cannot stream a request body without chunked encoding or a known content length!")
    }
  }
  
  private fun newChunkedSink(): Sink {
    check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
    state = STATE_WRITING_REQUEST_BODY
    return ChunkedSink()
  }
  
  private fun newKnownLengthSink(): Sink {
    check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
    state = STATE_WRITING_REQUEST_BODY
    return KnownLengthSink()
  }
  
  /** An HTTP request body. */
  private inner class KnownLengthSink : Sink {
    private val timeout = ForwardingTimeout(sink.timeout())
    private var closed: Boolean = false

    override fun timeout(): Timeout = timeout

    override fun write(source: Buffer, byteCount: Long) {
      check(!closed) { "closed" }
      checkOffsetAndCount(source.size, 0, byteCount)
      sink.write(source, byteCount)
    }

    override fun flush() {
      if (closed) return // Don't throw; this stream might have been closed on the caller's behalf.
      sink.flush()
    }

    override fun close() {
      if (closed) return
      closed = true
      detachTimeout(timeout)
      state = STATE_READ_RESPONSE_HEADERS
    }
  }
  
  /**
   * An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's responsibility
   * to buffer chunks; typically by using a buffered sink with this sink.
   */
  private inner class ChunkedSink : Sink {
    private val timeout = ForwardingTimeout(sink.timeout())
    private var closed: Boolean = false

    override fun timeout(): Timeout = timeout

    override fun write(source: Buffer, byteCount: Long) {
      check(!closed) { "closed" }
      if (byteCount == 0L) return

      sink.writeHexadecimalUnsignedLong(byteCount)
      sink.writeUtf8("/r/n")
      sink.write(source, byteCount)
      sink.writeUtf8("/r/n")
    }

    @Synchronized
    override fun flush() {
      if (closed) return // Don't throw; this stream might have been closed on the caller's behalf.
      sink.flush()
    }

    @Synchronized
    override fun close() {
      if (closed) return
      closed = true
      sink.writeUtf8("0/r/n/r/n")
      detachTimeout(timeout)
      state = STATE_READ_RESPONSE_HEADERS
    }
  }
}

根据请求头 Transfer-Encoding 是否为 chunked 的方式,来创建不同 Sink 实现类,如果是 chunked 方式那么就创建 ChunkedSink ;如果不是 chunked 就表示内容的大小是固定的,那么就根据 content-length 创建指定大小的 KnownLengthSink ,然后又在外边包装了一层 RequestBodySink

Exchange.RequestBodySink

class Exchange(
  internal val transmitter: Transmitter,
  internal val call: Call,
  internal val eventListener: EventListener,
  private val finder: ExchangeFinder,
  private val codec: ExchangeCodec
) {
  /** A request body that fires events when it completes. */
  private inner class RequestBodySink internal constructor(
    delegate: Sink,
    /** The exact number of bytes to be written, or -1L if that is unknown. */
    private val contentLength: Long
  ) : ForwardingSink(delegate) {
    private var completed = false
    private var bytesReceived = 0L
    private var closed = false

    @Throws(IOException::class)
    override fun write(source: Buffer, byteCount: Long) {
      check(!closed) { "closed" }
      if (contentLength != -1L && bytesReceived + byteCount > contentLength) {
        throw ProtocolException(
            "expected $contentLength bytes but received ${bytesReceived + byteCount}")
      }
      try {
        super.write(source, byteCount)
        this.bytesReceived += byteCount
      } catch (e: IOException) {
        throw complete(e)
      }
    }

    @Throws(IOException::class)
    override fun flush() {
      try {
        super.flush()
      } catch (e: IOException) {
        throw complete(e)
      }
    }

    @Throws(IOException::class)
    override fun close() {
      if (closed) return
      closed = true
      if (contentLength != -1L && bytesReceived != contentLength) {
        throw ProtocolException("unexpected end of stream")
      }
      try {
        super.close()
        complete(null)
      } catch (e: IOException) {
        throw complete(e)
      }
    }

    private fun <E : IOException?> complete(e: E): E {
      if (completed) return e
      completed = true
      return bodyComplete(bytesReceived, false, true, e)
    }
  }
}

最终调用的 write(source: Buffer, byteCount: Long) 是调用的传入的 sink 的 write(source: Buffer, byteCount: Long)

FormBody#writeTo(sink: BufferedSink)

class FormBody internal constructor(
  encodedNames: List<String>,
  encodedValues: List<String>
) : RequestBody() {
  @Throws(IOException::class)
  override fun writeTo(sink: BufferedSink) {
    writeOrCountBytes(sink, false)
  }
  
  /**
   * Either writes this request to `sink` or measures its content length. We have one method
   * do double-duty to make sure the counting and content are consistent, particularly when it comes
   * to awkward operations like measuring the encoded length of header strings, or the
   * length-in-digits of an encoded integer.
   */
  private fun writeOrCountBytes(sink: BufferedSink?, countBytes: Boolean): Long {
    var byteCount = 0L
    val buffer: Buffer = if (countBytes) Buffer() else sink!!.buffer

    for (i in 0 until encodedNames.size) {
      if (i > 0) buffer.writeByte('&'.toInt())
      buffer.writeUtf8(encodedNames[i])
      buffer.writeByte('='.toInt())
      buffer.writeUtf8(encodedValues[i])
    }

    if (countBytes) {
      byteCount = buffer.size
      buffer.clear()
    }

    return byteCount
  }
}

具体是如何写入数据的,要根据传入到 Request 中的 RequestBody 的实现类来定,如果是表单类型的则是有 FormBody 负责具体的写操作,如果是文件类型的则是由 MutilPartBody 负责具体的写操作

Request finish

Exchange#finishRequest()

class Exchange(
  internal val transmitter: Transmitter,
  internal val call: Call,
  internal val eventListener: EventListener,
  private val finder: ExchangeFinder,
  private val codec: ExchangeCodec
) {
  @Throws(IOException::class)
  fun finishRequest() {
    try {
      codec.finishRequest()
    } catch (e: IOException) {
      eventListener.requestFailed(call, e)
      trackFailure(e)
      throw e
    }
  }
}

Http1ExchangeCodec#finishRequest()

class Http1ExchangeCodec(
  /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
  private val client: OkHttpClient?,
  /** The connection that carries this stream. */
  private val realConnection: RealConnection?,
  private val source: BufferedSource,
  private val sink: BufferedSink
) : ExchangeCodec {
  override fun finishRequest() {
    sink.flush()
  }
}

sink.flush() 将缓存区中数据写入到底层的 sink 中,其实就是写入到 server 中去了,相当于一个刷新缓冲区的功能

Response Header

Exchange#readResponseHeaders(expectContinue: Boolean)

class Exchange(
  internal val transmitter: Transmitter,
  internal val call: Call,
  internal val eventListener: EventListener,
  private val finder: ExchangeFinder,
  private val codec: ExchangeCodec
) {
  @Throws(IOException::class)
  fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
    try {
      val result = codec.readResponseHeaders(expectContinue)
      result?.initExchange(this)
      return result
    } catch (e: IOException) {
      eventListener.responseFailed(call, e)
      trackFailure(e)
      throw e
    }
  }
}

Http1ExchangeCodec#readResponseHeaders(expectContinue: Boolean)

class Http1ExchangeCodec(
  /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
  private val client: OkHttpClient?,
  /** The connection that carries this stream. */
  private val realConnection: RealConnection?,
  private val source: BufferedSource,
  private val sink: BufferedSink
) : ExchangeCodec {
  override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
    check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
      "state: $state"
    }

    try {
      val statusLine = StatusLine.parse(readHeaderLine())

      val responseBuilder = Response.Builder()
          .protocol(statusLine.protocol) // 协议,也就是 http 的版本例如 http1/2 /spdy
          .code(statusLine.code) // 响应码
          .message(statusLine.message) // 响应消息
          .headers(readHeaders()) // 响应头

      return when {
        expectContinue && statusLine.code == HTTP_CONTINUE -> {
          null
        }
        statusLine.code == HTTP_CONTINUE -> {
          state = STATE_READ_RESPONSE_HEADERS
          responseBuilder
        }
        else -> {
          state = STATE_OPEN_RESPONSE_BODY
          responseBuilder
        }
      }
    } catch (e: EOFException) {
      // Provide more context if the server ends the stream before sending a response.
      val address = realConnection?.route()?.address()?.url?.redact() ?: "unknown"
      throw IOException("unexpected end of stream on $address", e)
    }
  }
}

当客户端将请求数据发送给服务端之后,服务端做了处理之后会将结果返回给客户端,这是客户端需要根据这些返回的数据构造出一个 Response 对象出来然后返回给调用者

Response Body

Exchange#openResponseBody(response: Response)

class Exchange(
  internal val transmitter: Transmitter,
  internal val call: Call,
  internal val eventListener: EventListener,
  private val finder: ExchangeFinder,
  private val codec: ExchangeCodec
) {
  @Throws(IOException::class)
  fun openResponseBody(response: Response): ResponseBody {
    try {
      eventListener.responseBodyStart(call)
      val contentType = response.header("Content-Type")
      val contentLength = codec.reportedContentLength(response)
      val rawSource = codec.openResponseBodySource(response)
      val source = ResponseBodySource(rawSource, contentLength)
      return RealResponseBody(contentType, contentLength, source.buffer())
    } catch (e: IOException) {
      eventListener.responseFailed(call, e)
      trackFailure(e)
      throw e
    }
  }
  
  /** A response body that fires events when it completes. */
  internal inner class ResponseBodySource(
    delegate: Source,
    private val contentLength: Long
  ) : ForwardingSource(delegate) {
    private var bytesReceived = 0L
    private var completed = false
    private var closed = false

    init {
      if (contentLength == 0L) {
        complete(null)
      }
    }

    @Throws(IOException::class)
    override fun read(sink: Buffer, byteCount: Long): Long {
      check(!closed) { "closed" }
      try {
        val read = delegate.read(sink, byteCount)
        if (read == -1L) {
          complete(null)
          return -1L
        }

        val newBytesReceived = bytesReceived + read
        if (contentLength != -1L && newBytesReceived > contentLength) {
          throw ProtocolException("expected $contentLength bytes but received $newBytesReceived")
        }

        bytesReceived = newBytesReceived
        if (newBytesReceived == contentLength) {
          complete(null)
        }

        return read
      } catch (e: IOException) {
        throw complete(e)
      }
    }

    @Throws(IOException::class)
    override fun close() {
      if (closed) return
      closed = true
      try {
        super.close()
        complete(null)
      } catch (e: IOException) {
        throw complete(e)
      }
    }

    fun <E : IOException?> complete(e: E): E {
      if (completed) return e
      completed = true
      return bodyComplete(bytesReceived, true, false, e)
    }
  }
}

返回一个 ResponseBody 对象,该对象封装了连接服务端的输入流对 Source 对象

Http1ExchangeCodec#openResponseBodySource(response: Response)

class Http1ExchangeCodec(
  /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
  private val client: OkHttpClient?,
  /** The connection that carries this stream. */
  private val realConnection: RealConnection?,
  private val source: BufferedSource,
  private val sink: BufferedSink
) : ExchangeCodec {
  override fun openResponseBodySource(response: Response): Source {
    return when {
      !response.promisesBody() -> newFixedLengthSource(0)
      response.isChunked() -> newChunkedSource(response.request().url)
      else -> {
        val contentLength = response.headersContentLength()
        if (contentLength != -1L) {
          newFixedLengthSource(contentLength)
        } else {
          newUnknownLengthSource()
        }
      }
    }
  }
  
  private fun newFixedLengthSource(length: Long): Source {
    check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
    state = STATE_READING_RESPONSE_BODY
    return FixedLengthSource(length)
  }

  private fun newChunkedSource(url: HttpUrl): Source {
    check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
    state = STATE_READING_RESPONSE_BODY
    return ChunkedSource(url)
  }

  private fun newUnknownLengthSource(): Source {
    check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
    state = STATE_READING_RESPONSE_BODY
    realConnection!!.noNewExchanges()
    return UnknownLengthSource()
  }
  
  /** An HTTP body with a fixed length specified in advance. */
  private inner class FixedLengthSource internal constructor(private var bytesRemaining: Long) :
      AbstractSource() {

    init {
      if (bytesRemaining == 0L) {
        responseBodyComplete()
      }
    }

    override fun read(sink: Buffer, byteCount: Long): Long {
      require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
      check(!closed) { "closed" }
      if (bytesRemaining == 0L) return -1

      val read = super.read(sink, minOf(bytesRemaining, byteCount))
      if (read == -1L) {
        realConnection!!.noNewExchanges() // The server didn't supply the promised content length.
        val e = ProtocolException("unexpected end of stream")
        responseBodyComplete()
        throw e
      }

      bytesRemaining -= read
      if (bytesRemaining == 0L) {
        responseBodyComplete()
      }
      return read
    }

    override fun close() {
      if (closed) return

      if (bytesRemaining != 0L &&
          !discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
        realConnection!!.noNewExchanges() // Unread bytes remain on the stream.
        responseBodyComplete()
      }

      closed = true
    }
  }

  /** An HTTP body with alternating chunk sizes and chunk bodies. */
  private inner class ChunkedSource internal constructor(private val url: HttpUrl) :
      AbstractSource() {
    private var bytesRemainingInChunk = NO_CHUNK_YET
    private var hasMoreChunks = true

    override fun read(sink: Buffer, byteCount: Long): Long {
      require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
      check(!closed) { "closed" }
      if (!hasMoreChunks) return -1

      if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) {
        readChunkSize()
        if (!hasMoreChunks) return -1
      }

      val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk))
      if (read == -1L) {
        realConnection!!.noNewExchanges() // The server didn't supply the promised chunk length.
        val e = ProtocolException("unexpected end of stream")
        responseBodyComplete()
        throw e
      }
      bytesRemainingInChunk -= read
      return read
    }

    private fun readChunkSize() {
      // Read the suffix of the previous chunk.
      if (bytesRemainingInChunk != NO_CHUNK_YET) {
        source.readUtf8LineStrict()
      }
      try {
        bytesRemainingInChunk = source.readHexadecimalUnsignedLong()
        val extensions = source.readUtf8LineStrict().trim()
        if (bytesRemainingInChunk < 0L || extensions.isNotEmpty() && !extensions.startsWith(";")) {
          throw ProtocolException("expected chunk size and optional extensions" +
              " but was /"$bytesRemainingInChunk$extensions/"")
        }
      } catch (e: NumberFormatException) {
        throw ProtocolException(e.message)
      }

      if (bytesRemainingInChunk == 0L) {
        hasMoreChunks = false
        trailers = readHeaders()
        client!!.cookieJar().receiveHeaders(url, trailers!!)
        responseBodyComplete()
      }
    }

    override fun close() {
      if (closed) return
      if (hasMoreChunks &&
          !discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
        realConnection!!.noNewExchanges() // Unread bytes remain on the stream.
        responseBodyComplete()
      }
      closed = true
    }
  }

  /** An HTTP message body terminated by the end of the underlying stream. */
  private inner class UnknownLengthSource : AbstractSource() {
    private var inputExhausted: Boolean = false

    override fun read(sink: Buffer, byteCount: Long): Long {
      require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
      check(!closed) { "closed" }
      if (inputExhausted) return -1

      val read = super.read(sink, byteCount)
      if (read == -1L) {
        inputExhausted = true
        responseBodyComplete()
        return -1
      }
      return read
    }

    override fun close() {
      if (closed) return
      if (!inputExhausted) {
        responseBodyComplete()
      }
      closed = true
    }
  }
}

根据响应的不同请求创建不同的 Source 对象

原文  http://yydcdut.com/2019/07/12/okhttp-call-server-interceptor-analyse/
正文到此结束
Loading...