同步任务线程调度比较简单
try { client.dispatcher.executed(this) return getResponseWithInterceptorChain() } finally { client.dispatcher.finished(this) } 复制代码
finished方法中有个idleCallback,不太明白其功能。
最后顺便调用了一下promoteAndExecute()方法,个人理解为查询是否存在异步任务,如果有,那就再处理一下吧。
入口位于realcall的异步调用方法enqueue,首先将RealCall包装为实现Runnable的AsyncCall
override fun enqueue(responseCallback: Callback) { synchronized(this) { check(!executed) { "Already Executed" } executed = true } transmitter.callStart() client.dispatcher.enqueue(AsyncCall(responseCallback)) } 复制代码
然后到Dispatcher中enqueue方法, 将call添加到readyAsyncCalls队列中
internal fun enqueue(call: AsyncCall) { synchronized(this) { readyAsyncCalls.add(call) } promoteAndExecute() } 复制代码
然后遍历readyAsyncCalls将可以执行的任务转移到runningAsyncCalls队列,然后在线程池executorService中执行
private fun promoteAndExecute(): Boolean { assert(!Thread.holdsLock(this)) val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity. i.remove() asyncCall.callsPerHost().incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning } 复制代码
接下来看executorService线程池,语法有点陌生,理解起来就是一个Client会创建一个Dispather, 进而都会初始化一个线程池ThreadPoolExecutor
private var executorServiceOrNull: ExecutorService? = null @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("OkHttp Dispatcher", false)) } return executorServiceOrNull!! } 复制代码
异步线程AsyncCall将在这个线程池里执行,call的执行调用run方法,执行内容在前一章节已经描述