Github: okhttp 分析版本: 930d4d0
An HTTP client for Android, Kotlin, and Java.
OkHttp is an HTTP client that’s efficient by default:
public class GetExample { OkHttpClient client = new OkHttpClient(); String run(String url) throws IOException { Request request = new Request.Builder() .url(url) .build(); try (Response response = client.newCall(request).execute()) { return response.body().string(); } } public static void main(String[] args) throws IOException { GetExample example = new GetExample(); String response = example.run("https://raw.github.com/square/okhttp/master/README.md"); System.out.println(response); } }
public class PostExample { public static final MediaType JSON = MediaType.get("application/json; charset=utf-8"); OkHttpClient client = new OkHttpClient(); String post(String url, String json) throws IOException { RequestBody body = RequestBody.create(json, JSON); Request request = new Request.Builder() .url(url) .post(body) .build(); try (Response response = client.newCall(request).execute()) { return response.body().string(); } } String bowlingJson(String player1, String player2) { return "{'winCondition':'HIGH_SCORE'," + "'name':'Bowling'," + "'round':4," + "'lastSaved':1367702411696," + "'dateStarted':1367702378785," + "'players':[" + "{'name':'" + player1 + "','history':[10,8,6,7,8],'color':-13388315,'total':39}," + "{'name':'" + player2 + "','history':[6,10,5,10,10],'color':-48060,'total':41}" + "]}"; } public static void main(String[] args) throws IOException { PostExample example = new PostExample(); String json = example.bowlingJson("Jesse", "Jake"); String response = example.post("http://www.roundsapp.com/post", json); System.out.println(response); } }
okhttp
OkHttpClient client = new OkHttpClient();
根据名字我们就能看出,OkHttpClient 为 OkHttp 的客户端,在使用的时候首先要做的就是要创建这样一个客户端
open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { constructor() : this(Builder()) }
默认构造方法使用的是默认配置的 Builder:
class Builder constructor() { internal var dispatcher: Dispatcher = Dispatcher() // 调度器 internal var proxy: Proxy? = null // 代理 internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS // 协议 internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS // 传输层版本和连接协议 internal val interceptors: MutableList<Interceptor> = mutableListOf() // 拦截器 internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() // 网络拦截器 internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector() // 代理选择器 internal var cookieJar: CookieJar = CookieJar.NO_COOKIES // cookie internal var cache: Cache? = null // cache 缓存 internal var internalCache: InternalCache? = null // 内部缓存 internal var socketFactory: SocketFactory = SocketFactory.getDefault() // socket 工厂 internal var sslSocketFactory: SSLSocketFactory? = null // socket工厂 用于https internal var certificateChainCleaner: CertificateChainCleaner? = null // 验证确认响应书,适用HTTPS 请求连接的主机名 internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier // 主机名字确认 internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT // 证书链 internal var proxyAuthenticator: Authenticator = Authenticator.NONE // 代理身份验证 internal var authenticator: Authenticator = Authenticator.NONE // 身份验证 internal var connectionPool: ConnectionPool = ConnectionPool() //链接复用池 internal var dns: Dns = Dns.SYSTEM // DNS internal var followSslRedirects: Boolean = true // 重定向 internal var followRedirects: Boolean = true // 本地重定向 internal var retryOnConnectionFailure: Boolean = true // 重试连接失败 internal var callTimeout: Int = 0 // 请求超时 internal var connectTimeout: Int = 10000 // 连接超时 internal var readTimeout: Int = 10000 // 读取超时 internal var writeTimeout: Int = 10000 // 写入超时 internal var pingInterval: Int = 0 // Web socket and HTTP/2 ping interval // ... fun build(): OkHttpClient = OkHttpClient(this) }
okhttp 的最佳表现就是创建一个 OkHttpClient 实例,并将其重用到所有的 http 请求调用上之所以所有请求公用一个 OkHttpClient,因为每个 OkHttpClient 都有自己的的连接池和线程池,这样的话可以重用连接和线程可减少延迟并节省内存
Request request = new Request.Builder() .url(url) .build();
发送一个 HTTP 请求类要构建一个 Request 对象
class Request internal constructor( @get:JvmName("url") val url: HttpUrl, // 请求地址 @get:JvmName("method") val method: String, // 请求方法[GET/POST/PUT/PATCH/...] @get:JvmName("headers") val headers: Headers, // 请求头 @get:JvmName("body") val body: RequestBody?, // 请求体 internal val tags: Map<Class<*>, Any> // 请求标签 ) { // ... open class Builder { internal var url: HttpUrl? = null internal var method: String internal var headers: Headers.Builder internal var body: RequestBody? = null //... constructor() { this.method = "GET" this.headers = Headers.Builder() } /** * Sets the URL target of this request. * * @throws IllegalArgumentException if [url] is not a valid HTTP or HTTPS URL. Avoid this * exception by calling [HttpUrl.parse]; it returns null for invalid URLs. */ open fun url(url: String): Builder { // Silently replace web socket URLs with HTTP URLs. val finalUrl: String = when { url.startsWith("ws:", ignoreCase = true) -> { "http:${url.substring(3)}" } url.startsWith("wss:", ignoreCase = true) -> { "https:${url.substring(4)}" } else -> url } return url(finalUrl.toHttpUrl()) } // ... open fun build(): Request { return Request( checkNotNull(url) { "url == null" }, method, headers.build(), body, tags.toImmutableMap() ) } }
Request 也是通过 Builder 形式来创建的
Call call = client.newCall(request);
Call 即调用是一个准备好去执行的请求 Request
interface Call : Cloneable { fun request(): Request @Throws(IOException::class) fun execute(): Response fun enqueue(responseCallback: Callback) fun cancel() fun isExecuted(): Boolean fun isCanceled(): Boolean fun timeout(): Timeout override fun clone(): Call interface Factory { fun newCall(request: Request): Call } }
request() execute() enqueue(responseCallback: Callback) cancel() isExecuted() isCanceled() timeout()
OkHttpClient 实现了 Call.Factory,使用工厂模式将构建的细节交给具体实现,顶层只需要拿到 Call 对象即可
open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { // ... /** Prepares the [request] to be executed at some point in the future. */ override fun newCall(request: Request): Call { return RealCall.newRealCall(this, request, false /* for web socket */) } // ... }
继续看 RealCall 中的 newRealCall 方法:
internal class RealCall private constructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { /** * There is a cycle between the [Call] and [Transmitter] that makes this awkward. * This is set after immediately after creating the call instance. */ private lateinit var transmitter: Transmitter // ... companion object { fun newRealCall( client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean ): RealCall { // Safely publish the Call instance to the EventListener. return RealCall(client, originalRequest, forWebSocket).apply { transmitter = Transmitter(client, this) } } } }
RealCall 为具体产品,实现了 Call 接口;其中 Transmitter 是 OkHttp 的应用层和网络层的一个桥梁类,包含了连接,请求,响应和流
class Transmitter( private val client: OkHttpClient, private val call: Call ) { private val connectionPool: RealConnectionPool = client.connectionPool().delegate private val eventListener: EventListener = client.eventListenerFactory().create(call) private val timeout = object : AsyncTimeout() { override fun timedOut() { cancel() } }.apply { timeout(client.callTimeoutMillis().toLong(), MILLISECONDS) } // ... }
在创建 Transmitter 对象的时候设置了相关指标的监听器和 ConnectionPool
internal class RealCall private constructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { // ... override fun execute(): Response { synchronized(this) { check(!executed) { "Already Executed" } executed = true } transmitter.timeoutEnter() transmitter.callStart() try { client.dispatcher().executed(this) return getResponseWithInterceptorChain() } finally { client.dispatcher().finished(this) } } // ... }
AsyncTimeout
类中的 enter()
方法 getStackTraceForCloseable()
internal class RealCall private constructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { // ... @Throws(IOException::class) fun getResponseWithInterceptorChain(): Response { // Build a full stack of interceptors. val interceptors = ArrayList<Interceptor>() interceptors.addAll(client.interceptors()) interceptors.add(RetryAndFollowUpInterceptor(client)) // 失败重试以及重定向 interceptors.add(BridgeInterceptor(client.cookieJar())) // 用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应 interceptors.add(CacheInterceptor(client.internalCache())) // 读取缓存直接返回、更新缓存 interceptors.add(ConnectInterceptor(client)) // 和服务器建立连接 if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()) } interceptors.add(CallServerInterceptor(forWebSocket)) // 向服务器发送请求数据、从服务器读取响应数据 val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()) var calledNoMoreExchanges = false try { val response = chain.proceed(originalRequest) if (transmitter.isCanceled) { response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw transmitter.noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null) } } } // ... }
proceed()
,开启链式调用请求,并最终返回响应 response
Transmitter
对象的 noMoreExchanges()
,释放请求连接 Interceptor 使用的是责任链模式
internal class RealCall private constructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { // ... override fun enqueue(responseCallback: Callback) { synchronized(this) { check(!executed) { "Already Executed" } executed = true } transmitter.callStart() client.dispatcher().enqueue(AsyncCall(responseCallback)) } // ... }
最终调用的是 Dispatcher
中的 enqueue()
class Dispatcher constructor() { // ... /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private val runningSyncCalls = ArrayDeque<RealCall>() // ... /** Used by `Call#execute` to signal it is in-flight. */ @Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) } // ... }
直接将将 call 添加到正在执行的请求队列中去,runningSyncCalls 为正在请求的同步队列
class Dispatcher constructor() { // ... /** Ready async calls in the order they'll be run. */ private val readyAsyncCalls = ArrayDeque<AsyncCall>() // ... internal fun enqueue(call: AsyncCall) { synchronized(this) { readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.get().forWebSocket) { val existingCall = findExistingCallWithHost(call.host()) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() } // ... }
封装到一个 AsyncCall 中传递进来,添加到正在等待的异步队列 readyAsyncCalls 中去,接着继续调用 promoteAndExecute()
方法执行相关操作
class Dispatcher constructor() { // ... /** * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the * executor service. Must not be called with synchronization because executing calls can call * into user code. * * @return true if the dispatcher is currently running calls. */ private fun promoteAndExecute(): Boolean { assert(!Thread.holdsLock(this)) val executableCalls = ArrayList<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity. i.remove() asyncCall.callsPerHost().incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService()) } return isRunning } @Synchronized fun executorService(): ExecutorService { if (executorService == null) { executorService = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("OkHttp Dispatcher", false)) } return executorService!! } // ... }
maxRequests maxRequestsPerHost
internal class RealCall private constructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable { /** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ fun executeOn(executorService: ExecutorService) { assert(!Thread.holdsLock(client.dispatcher())) var success = false try { executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) transmitter.noMoreExchanges(ioException) responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { client.dispatcher().finished(this) // This call is no longer running! } } } override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false transmitter.timeoutEnter() try { val response = getResponseWithInterceptorChain() signalledCallback = true responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e) } else { responseCallback.onFailure(this@RealCall, e) } } finally { client.dispatcher().finished(this) } } } } }
又回到了 getResponseWithInterceptorChain()
中
所有的 interceptor 都整合到了 RealInterceptorChain
中,执行拦截器链方法 proceed()
class RealInterceptorChain( private val interceptors: List<Interceptor>, private val transmitter: Transmitter, private val exchange: Exchange?, private val index: Int, private val request: Request, private val call: Call, private val connectTimeout: Int, private val readTimeout: Int, private val writeTimeout: Int ) : Interceptor.Chain { // ... override fun proceed(request: Request): Response { return proceed(request, transmitter, exchange) } @Throws(IOException::class) fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response { if (index >= interceptors.size) throw AssertionError() calls++ // If we already have a stream, confirm that the incoming request will use it. check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } // If we already have a stream, confirm that this is the only call to chain.proceed(). check(this.exchange == null || calls <= 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } // Call the next interceptor in the chain. val next = RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout) val interceptor = interceptors[index] @Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") // Confirm that the next interceptor made its required call to chain.proceed(). check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" } check(response.body() != null) { "interceptor $interceptor returned a response with no body" } return response } }
在初始化的时候,会将所有拦截器组成的集合传递过来,同时将请求 Request
和 Call
也会传递过来, index
参数,最开始传入的是 0, exchange
参数,如果是应用拦截器,connection 必须是 null;如果是网络拦截器,connection 必须不为 null