转载

OkHttp3 4.x 流程解析

OkHttp 是当前 Android 开发中主流的 HTTP 网络请求框架。由 Square 公司设计研发并 开源 。

OkHttp 的 4.x 版本和 3.x 前的版本相比,最大的变动就是改为 Kotiln 来编写代码。截止当前,OkHttp 最新版本为 4.7.2。因此我们使用 4.7.2 的源码来分析网络请求的流程。

OkHttp 基本使用

首先看一个最基本的 GET 请求:

val okHttpClient = OkHttpClient()
val request: Request = Request.Builder().url("https://blog.csdn.net/QasimCyrus").build()
okHttpClient.newCall(request).enqueue(object : Callback {
    override fun onResponse(call: Call, response: Response) {
        // 网络响应
    }
    
    override fun onFailure(call: Call, e: IOException) {
        // 网络错误
    }
})
复制代码

这是一个异步请求,我们通过这个请求来看看 OkHttp 的网络请求流程。

OkHttp 流程分析

OkHttpClient.newCall(Request)

在生成 OkHttpClientRequest 之后,调用 OkHttpClient 的 newCall(Request) 方法准备请求:

/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
复制代码

可以看到内部是生成了一个实现了 Call 接口的 RealCall

Call.enqueue(Callback)

紧接着调用 Call 的 enqueue(Callback) 方法:

override fun enqueue(responseCallback: Callback) {
  // 一个 call 只能被执行一次,再次执行将抛出 IllegalStateException 异常
  synchronized(this) {
    check(!executed) { "Already Executed" }
    executed = true
  }
  ···
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}
复制代码

Dispatcher.enqueue(AsyncCall)

RealCall 内部生成一个 AsyncCall 对象,并交由 Dispatcher.enqueue(AsyncCall) 去调度请求,并在未来某个时间执行请求:

internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
    // 先放入准备请求队列中
    readyAsyncCalls.add(call)
    ···
  }
  // 调度执行
  promoteAndExecute()
}
复制代码

Dispatcher 是 OkHttpClient 的调度器,是一种外观模式(门面模式)。主要用来实现执行、取消异步请求操作。本质上是内部维护了一个线程池去执行异步操作,并且在 Dispatcher 内部根据一定的策略,保证最大并发个数、同一 host 主机允许执行请求的线程个数等。

Dispatcher.promoteAndExecute()

我们再来看看 Dispatcher 的 promoteAndExecute() 方法:

private fun promoteAndExecute(): Boolean {
  ···
  
  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {
      val asyncCall = i.next()
      
      // 当前正在执行的异步请求数超过了最大请求数,直接结束遍历
      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      // 当前 asyncCall 对应的主机执行请求数已经超过每个主机最大请求数,则跳过当前请求。
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
      
      i.remove()
      asyncCall.callsPerHost.incrementAndGet()
      executableCalls.add(asyncCall)
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }
  
  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService)
  }
  
  return isRunning
}
复制代码

该方法遍历了 readyAsyncCalls 队列拿到每一个 asyncCall ,符合执行条件则从 readyAsyncCalls 中移除,放入 runningAsyncCalls 中,同时放入该次可执行队列 executableCalls 中。

AsyncCall.executeOn(ExecutorService)

放在可执行队列 executableCalls 中的每个 AsyncCall 都会调用 executeOn(ExecutorService) 将自己加入到调度器的线程中:

fun executeOn(executorService: ExecutorService) {
  ···
  
  var success = false
  try {
    executorService.execute(this)
    success = true
  } catch (e: RejectedExecutionException) {
    ···
    responseCallback.onFailure(this@RealCall, ioException)
  } finally {
    if (!success) {
      client.dispatcher.finished(this) // This call is no longer running!
    }
  }
}
复制代码

AsyncCall.run()

AsyncCall 实现了 Runnable 接口,因此执行请求的逻辑都放在了 Runnable.run()

override fun run() {
  threadName("OkHttp ${redactedUrl()}") {
    var signalledCallback = false
    try {
      val response = getResponseWithInterceptorChain()
      signalledCallback = true
      responseCallback.onResponse(this@RealCall, response)
    } catch (e: IOException) {
      if (signalledCallback) {
        ···
      } else {
        responseCallback.onFailure(this@RealCall, e)
      }
    } catch (t: Throwable) {
      cancel()
      if (!signalledCallback) {
        val canceledException = IOException("canceled due to $t")
        ···
        responseCallback.onFailure(this@RealCall, canceledException)
      }
      throw t
    } finally {
      client.dispatcher.finished(this)
    }
  }
}
复制代码

AsyncCall.getResponseWithInterceptorChain()

真正获取请求数据是调用了 AsyncCall 的 getResponseWithInterceptorChain() 方法,其内部是一个拦截器的责任链模式:

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
  // Build a full stack of interceptors.
  val interceptors = mutableListOf<Interceptor>()
  // 用户自定义的拦截器
  interceptors += client.interceptors
  // 重试和重定向拦截器:建立连接、读取内容失败的重试 和 完整读取请求返回后的重定向。
  interceptors += RetryAndFollowUpInterceptor(client)
  // 桥接拦截器:主要对 Request 中的 Head 设置默认值,比如 Content-Type、Keep-Alive、Cookie 等;同时对返回的 Response 进行了处理。
  interceptors += BridgeInterceptor(client.cookieJar)
  // 缓存拦截器:负责 HTTP 请求的缓存处理。
  interceptors += CacheInterceptor(client.cache)
  // 连接拦截器:负责建立与服务器地址之间的连接,也就是 TCP 链接。
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    // 网络拦截器:用于监听单个请求和响应。
    interceptors += client.networkInterceptors
  }
  // 请求服务拦截器:负责向服务器发送请求,并从服务器拿到远端数据结果。
  interceptors += CallServerInterceptor(forWebSocket)
  
  val chain = RealInterceptorChain(
      call = this,
      interceptors = interceptors,
      index = 0,
      exchange = null,
      request = originalRequest,
      connectTimeoutMillis = client.connectTimeoutMillis,
      readTimeoutMillis = client.readTimeoutMillis,
      writeTimeoutMillis = client.writeTimeoutMillis
  )
  
  ···
  try {
    val response = chain.proceed(originalRequest)
    ···
    return response
  } catch (e: IOException) {
    ···
  } finally {
    ···
  }
}
复制代码

CallServerInterceptor.intercept

CallServerInterceptor 是责任链里面最后一个拦截器,也是发送请求、接收网络数据的核心部分。

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  val exchange = realChain.exchange!!
  val request = realChain.request
  val requestBody = request.body
  ···
  
  // 写请求头部
  exchange.writeRequestHeaders(request)
  
  ···
  var responseBuilder: Response.Builder? = null
  // 根据是否有请求体执行不同的请求方法
  ···
  
  // 结束请求
  if (requestBody == null || !requestBody.isDuplex()) {
    exchange.finishRequest()
  }
  
  if (responseBuilder == null) {
    // 读取响应头部数据
    responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
    ···
  }
  // 构建响应
  var response = responseBuilder
      .request(request)
      .handshake(exchange.connection.handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build()
  ···
  
  exchange.responseHeadersEnd(response)
  
  response = if (forWebSocket && code == 101) {
    // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
    response.newBuilder()
        .body(EMPTY_RESPONSE)
        .build()
  } else {
    response.newBuilder()
        .body(exchange.openResponseBody(response))
        .build()
  }
  
  // 错误和异常处理
  ···
  
  return response
}
复制代码

这里主要分析流程,因此网络请求的细节不做展开。

原文  https://juejin.im/post/5efcb1ab6fb9a07eb65a502c
正文到此结束
Loading...