OkHttp 是当前 Android 开发中主流的 HTTP 网络请求框架。由 Square 公司设计研发并 开源 。
OkHttp 的 4.x 版本和 3.x 前的版本相比,最大的变动就是改为 Kotiln 来编写代码。截止当前,OkHttp 最新版本为 4.7.2。因此我们使用 4.7.2 的源码来分析网络请求的流程。
首先看一个最基本的 GET 请求:
val okHttpClient = OkHttpClient() val request: Request = Request.Builder().url("https://blog.csdn.net/QasimCyrus").build() okHttpClient.newCall(request).enqueue(object : Callback { override fun onResponse(call: Call, response: Response) { // 网络响应 } override fun onFailure(call: Call, e: IOException) { // 网络错误 } }) 复制代码
这是一个异步请求,我们通过这个请求来看看 OkHttp 的网络请求流程。
在生成 OkHttpClient
和 Request
之后,调用 OkHttpClient 的 newCall(Request)
方法准备请求:
/** Prepares the [request] to be executed at some point in the future. */ override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false) 复制代码
可以看到内部是生成了一个实现了 Call
接口的 RealCall
。
紧接着调用 Call 的 enqueue(Callback)
方法:
override fun enqueue(responseCallback: Callback) { // 一个 call 只能被执行一次,再次执行将抛出 IllegalStateException 异常 synchronized(this) { check(!executed) { "Already Executed" } executed = true } ··· client.dispatcher.enqueue(AsyncCall(responseCallback)) } 复制代码
RealCall 内部生成一个 AsyncCall
对象,并交由 Dispatcher.enqueue(AsyncCall)
去调度请求,并在未来某个时间执行请求:
internal fun enqueue(call: AsyncCall) { synchronized(this) { // 先放入准备请求队列中 readyAsyncCalls.add(call) ··· } // 调度执行 promoteAndExecute() } 复制代码
Dispatcher 是 OkHttpClient 的调度器,是一种外观模式(门面模式)。主要用来实现执行、取消异步请求操作。本质上是内部维护了一个线程池去执行异步操作,并且在 Dispatcher 内部根据一定的策略,保证最大并发个数、同一 host 主机允许执行请求的线程个数等。
我们再来看看 Dispatcher 的 promoteAndExecute()
方法:
private fun promoteAndExecute(): Boolean { ··· val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() // 当前正在执行的异步请求数超过了最大请求数,直接结束遍历 if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. // 当前 asyncCall 对应的主机执行请求数已经超过每个主机最大请求数,则跳过当前请求。 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning } 复制代码
该方法遍历了 readyAsyncCalls
队列拿到每一个 asyncCall
,符合执行条件则从 readyAsyncCalls 中移除,放入 runningAsyncCalls
中,同时放入该次可执行队列 executableCalls
中。
放在可执行队列 executableCalls
中的每个 AsyncCall 都会调用 executeOn(ExecutorService)
将自己加入到调度器的线程中:
fun executeOn(executorService: ExecutorService) { ··· var success = false try { executorService.execute(this) success = true } catch (e: RejectedExecutionException) { ··· responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { client.dispatcher.finished(this) // This call is no longer running! } } } 复制代码
AsyncCall 实现了 Runnable 接口,因此执行请求的逻辑都放在了 Runnable.run()
:
override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false try { val response = getResponseWithInterceptorChain() signalledCallback = true responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { ··· } else { responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") ··· responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { client.dispatcher.finished(this) } } } 复制代码
真正获取请求数据是调用了 AsyncCall 的 getResponseWithInterceptorChain()
方法,其内部是一个拦截器的责任链模式:
@Throws(IOException::class) internal fun getResponseWithInterceptorChain(): Response { // Build a full stack of interceptors. val interceptors = mutableListOf<Interceptor>() // 用户自定义的拦截器 interceptors += client.interceptors // 重试和重定向拦截器:建立连接、读取内容失败的重试 和 完整读取请求返回后的重定向。 interceptors += RetryAndFollowUpInterceptor(client) // 桥接拦截器:主要对 Request 中的 Head 设置默认值,比如 Content-Type、Keep-Alive、Cookie 等;同时对返回的 Response 进行了处理。 interceptors += BridgeInterceptor(client.cookieJar) // 缓存拦截器:负责 HTTP 请求的缓存处理。 interceptors += CacheInterceptor(client.cache) // 连接拦截器:负责建立与服务器地址之间的连接,也就是 TCP 链接。 interceptors += ConnectInterceptor if (!forWebSocket) { // 网络拦截器:用于监听单个请求和响应。 interceptors += client.networkInterceptors } // 请求服务拦截器:负责向服务器发送请求,并从服务器拿到远端数据结果。 interceptors += CallServerInterceptor(forWebSocket) val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) ··· try { val response = chain.proceed(originalRequest) ··· return response } catch (e: IOException) { ··· } finally { ··· } } 复制代码
CallServerInterceptor
是责任链里面最后一个拦截器,也是发送请求、接收网络数据的核心部分。
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body ··· // 写请求头部 exchange.writeRequestHeaders(request) ··· var responseBuilder: Response.Builder? = null // 根据是否有请求体执行不同的请求方法 ··· // 结束请求 if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } if (responseBuilder == null) { // 读取响应头部数据 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! ··· } // 构建响应 var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() ··· exchange.responseHeadersEnd(response) response = if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } // 错误和异常处理 ··· return response } 复制代码
这里主要分析流程,因此网络请求的细节不做展开。