本文分为六个部分:
异步请求
同步请求
Okhttp请求流程图
ArrayQueue
各种过滤器
自定义过滤器
OkHttpClient okHttpClient = new OkHttpClient(); Request request = new Request.Builder().url("www.google.com").build(); okHttpClient.newCall(request).enqueue(new Callback() { //异步 @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } }); 复制代码
我们构建为Request之后,就需要构建一个Call,一般都是
Call call = okHttpClient.newCall(request) 复制代码
返回OkhttpClient可以看到
@Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); } //Okhttp实现Call.Factory接口 public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { //Call接口之中 interface Factory { Call newCall(Request request); } 复制代码
从接口的源码我们可以看到,只是定义一个newCall用于创建Call的方法,这里其实用到了 工厂模式 的思想, 将构建的细节交给具体实现,顶层只需要拿到Call对象即可 。
下面是RealCall的实现细节:
RealCall: static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.eventListener = client.eventListenerFactory().create(call); return call; } RealCall构造方法: private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); this.timeout = new AsyncTimeout() { @Override protected void timedOut() { cancel(); } }; this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); } 复制代码
Call创建完之后,一般就到最后一个步骤了,将请求加入调度。
call.enqueue(new Callback() { //异步 @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } }); 复制代码
这里的call的真正实现是RealCall方法,我们来看下RealCall里面的enqueue方法。
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); client.dispatcher().enqueue(new AsyncCall(responseCallback)); } 复制代码
private void captureCallStackTrace() { Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()"); retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace); } 复制代码
//OkhttpClient public Dispatcher dispatcher() { return dispatcher; } //Dispatcher void enqueue(AsyncCall call) { synchronized (this) { readyAsyncCalls.add(call); } promoteAndExecute(); } 复制代码
这里先对Dispatcher的成员变量做个初步的认识
private int maxRequests = 64; private int maxRequestsPerHost = 5; private @Nullable Runnable idleCallback; /** Executes calls. Created lazily. */ private @Nullable ExecutorService executorService; /** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); 复制代码
可以看到这里用了三个队列ArrayDeque用于保存Call对象,分为三种状态:
关于ArrayDeque的介绍见文章末尾。
下面来看Dispather的enqueue()方法,先是加入到异步等待队列,然后执行promoteAndExecute()方法
private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; } 复制代码
该方法中遍历异步等待队列readyAsyncCalls, 如果当前正在执行的同步running队列个数大于maxRequest(64),跳出该循环,如果取出某一个的请求(AsyncCall)请求同一个主机的个数大于maxRequestsPerHost(5)时,跳过本地循环,继续下一次循环 。最后再加入线程池执行。
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } 复制代码
下面来分析一下AsyncCall的源码:
final class AsyncCall extends NamedRunnable { ··· } public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); } @Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); } 复制代码
实现了一个Runnable接口,线程名字为我们在构造器中传入的名字,最后执行execute()方法,具体实现在AsyncCall中
//异步最终走的方法,RealCall类中的内部类AsyncCall @Override protected void execute() { boolean signalledCallback = false; timeout.enter(); try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { e = timeoutExit(e); if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } //同步走的方法,在RealCall中 @Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); timeout.enter(); eventListener.callStart(this); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } catch (IOException e) { e = timeoutExit(e); eventListener.callFailed(this, e); throw e; } finally { client.dispatcher().finished(this); } } 复制代码
上面为异步和同步最终走的方法,可以看到同样的代码,终于看到了我们关注的Response了
Response result = getResponseWithInterceptorChain(); 复制代码
方法具体实现为:
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); //失败和重定向过滤器 interceptors.add(retryAndFollowUpInterceptor); //封装request和response过滤器 interceptors.add(new BridgeInterceptor(client.cookieJar())); //缓存相关的过滤器,负责读取缓存直接返回,更新缓存 interceptors.add(new CacheInterceptor(client.internalCache())); //负责和服务器建立连接 interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { //配置 OkHttpClient 时设置的 networkInterceptors interceptors.addAll(client.networkInterceptors()); } //负责向服务器发送网络请求数据,从服务器读取响应数据(实际网络请求) interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); return chain.proceed(originalRequest); } 复制代码
Okhttp默认会给我们实现这些过滤器,也可以添加自己想要实现的过滤器。
这里可以对比一下Volley源码中的思想,Volley的处理是将缓存,网络请求等一系列操作揉在一起写,导致用户对于Volley的修改只能通过修改源码方式,而修改就必须要充分阅读理解volley整个的流程,可能一部分的修改会影响全局的流程,而这里,将不同的职责的过滤器分别单独出来,用户只需要对关注的某一个功能项进行理解,并可以进行扩充修改,一对比,okHttp在这方面的优势立马体现出来了。这里大概先描述一下几个过滤器的功能:
retryAndFollowUpInterceptor——失败和重定向过滤器 BridgeInterceptor——封装request和response过滤器 CacheInterceptor——缓存相关的过滤器,负责读取缓存直接返回、更新缓存 ConnectInterceptor——负责和服务器建立连接,连接池等 networkInterceptors——配置 OkHttpClient 时设置的 networkInterceptors CallServerInterceptor——负责向服务器发送请求数据、从服务器读取响应数据(实际网络请求) 复制代码
添加完过滤器,就是执行过滤器了
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); return chain.proceed(originalRequest); 复制代码
进入proceed方法
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; ··· ··· // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); ··· ··· return response; } 复制代码
index传入的时候为0,后面new一个RealInterceptorChain并且将参数传递,index+1,接着获取index的interceptor,并调用intercept方法,传入新new的next对象,这里采用递归的思想来完成遍历,完成一个个interceptor的变量,随便找个interceptor来看看:
/** Opens a connection to the target server and proceeds to the next interceptor. */ public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); } } 复制代码
在ConnectInterceptor中可以看到,得到chain后,进行相应的处理后,继续调用proceed方法,接着刚才的逻辑,index+1,获取下一个interceptor,重复操作,利用递归循环,也就是Okhttp最经典的 责任链模式 。
//同步走的方法,在RealCall中 @Override public Response execute() throws IOException { synchronized (this) { //检查是否运行过 if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); timeout.enter(); eventListener.callStart(this); try { //加入到同步队列中 client.dispatcher().executed(this); //创建过滤责任链,得到response Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } catch (IOException e) { e = timeoutExit(e); eventListener.callFailed(this, e); throw e; } finally { client.dispatcher().finished(this); } } 复制代码
Deque 接口继承自 Queue接口,但 Deque 支持同时从两端添加或移除元素,因此又被成为双端队列。鉴于此,Deque 接口的实现可以被当作 FIFO队列使用,也可以当作LIFO队列(栈)来使用。官方也是推荐使用 Deque 的实现来替代 Stack。
ArrayDeque 可以作为栈来使用,效率要高于 Stack;ArrayDeque 也可以作为队列来使用,效率相较于基于双向链表的 LinkedList 也要更好一些。
ArrayDeque 是 Deque 接口的一种具体实现,是依赖于可变数组来实现的。ArrayDeque 没有容量限制,可根据需求自动进行扩容。ArrayDeque不支持值为 null 的元素。
RetryAndFollowUpInterceptor 重试和重定向
看看该过滤器中的intercept方法,简单处理一下:
@Override public Response intercept(Chain chain) throws IOException { ··· while (true) { ··· Response response; boolean releaseConnection = true; try { response = realChain.proceed(request, streamAllocation, null, null); releaseConnection = false; } ··· ··· if (followUp == null) { //满足条件,返回response streamAllocation.release(); return response; } ··· //不满足条件,重来 request = followUp; priorResponse = response; } } 复制代码
先梳理一下大致流程,在while(true)中执行的是proceed(),执行这个方法后,就会交给下一个过滤器执行,所以可以简单的理解为这个过滤器其实没做什么。
但是当出现一些异常导致条件不满足的时候,就要重新进行一系列操作,重新复制request,重新请求,也就是while的功能,对应也就是这个过滤器的主要功能:重试和重定向。
@Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Call call = realChain.call(); EventListener eventListener = realChain.eventListener(); //streamAllocation的创建位置 streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()), call, eventListener, callStackTrace); int followUpCount = 0; Response priorResponse = null; while (true) { //取消 if (canceled) { streamAllocation.release(); throw new IOException("Canceled"); } Response response; boolean releaseConnection = true; try { response = realChain.proceed(request, streamAllocation, null, null); releaseConnection = false; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. //尝试连接一个路由失败,这个请求还没有发出 if (!recover(e.getLastConnectException(), false, request)) { throw e.getLastConnectException(); } releaseConnection = false; //重试。。。 continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. //先判断当前请求是否已经发送了 boolean requestSendStarted = !(e instanceof ConnectionShutdownException); //同样的重试判断 if (!recover(e, requestSendStarted, request)) throw e; releaseConnection = false; //重试。。。 continue; } finally { // We're throwing an unchecked exception. Release any resources. //没有捕获到的异常,最终要释放 if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } // Attach the prior response if it exists. Such responses never have a body. //这里基本上都没有讲,priorResponse是用来保存前一个Resposne的,这里可以看到将前一个Response和当前的Resposne //结合在一起了,对应的场景是,当获得Resposne后,发现需要重定向,则将当前Resposne设置给priorResponse,再执行一遍流程, //直到不需要重定向了,则将priorResponse和Resposne结合起来。 if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } //判断是否需要重定向,如果需要重定向则返回一个重定向的Request,没有则为null Request followUp = followUpRequest(response); if (followUp == null) { //不需要重定向 if (!forWebSocket) { //是WebSocket,释放 streamAllocation.release(); } //返回response return response; } //需要重定向,关闭响应流 closeQuietly(response.body()); //重定向次数++,并且小于最大重定向次数MAX_FOLLOW_UPS(20) if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } //是UnrepeatableRequestBody, 刚才看过也就是是流类型,没有被缓存,不能重定向 if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } //判断是否相同,不然重新创建一个streamConnection if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace); } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } //赋值再来! request = followUp; priorResponse = response; } } 复制代码
默认的Okhttp是配置失败重连的 retryOnConnectionFailure = true
小结:
1.如果我们在配置OkHttpClient中配置retryOnConnectionFailure属性为false,表明拒绝失败重连,那么这 里返回false
2.如果请求已经发送,并且这个请求体是一个UnrepeatableRequestBody类型,则不能重试
3.如果是一些严重的问题(协议,安全...),拒绝重试
4.没有更多的可以使用的路由,则不要重试了
BridgeInterceptor:主要处理请求头,Gzip,以及cookie的处理
CacheInterceptor
1.通过Request尝试到Cache中拿缓存(里面非常多流程),当然前提是OkHttpClient中配置了缓存,默认是不支持的。 2.根据response,time,request创建一个缓存策略,用于判断怎样使用缓存。 3.如果缓存策略中设置禁止使用网络,并且缓存又为空,则构建一个Resposne直接返回,注意返回码=504 4.缓存策略中设置不使用网络,但是又缓存,直接返回缓存 5.接着走后续过滤器的流程,chain.proceed(networkRequest) 6.当缓存存在的时候,如果网络返回的Resposne为304,则使用缓存的Resposne。 7.构建网络请求的Resposne 8.当在OKHttpClient中配置了缓存,则将这个Resposne缓存起来。 9.缓存起来的步骤也是先缓存header,再缓存body。 10.返回Resposne。
Okhttp只能缓存Get请求,作者觉得技术上讲可以缓存head,post请求,但是这样做复杂度高而且收益低。
ConnectInterceptor(与服务器建立连接的过滤器)
CallServerInterceptor 数据交换(实际网络请求)
class NetworkErrorCodeInterceptor : Interceptor { override fun intercept(chain: Interceptor.Chain): Response { val request = chain.request() val response = chain.proceed(request) if (response.code() == 401) { //do something } return response } } 复制代码
class RequestHeaderInterceptor : Interceptor { override fun intercept(chain: Interceptor.Chain): Response { val original = chain.request() val hmacBefore = getHMACString(original) // LogUtil.e("hmac=before=$hmacBefore") val hmacAfter = AlgorithmUtil.hmacSHA256(hmacBefore) // LogUtil.e("hmac=after=$hmacAfter") val request = original.newBuilder() .header("accept", "application/json") .header("content-type", "application/json") .header("clientVersion", BuildConfig.VERSION_NAME) .header("x-client-id", BuildConfig.CLIENT_ID) .header("x-session-token", SessionManager.getSessionToken()) .header("x-hmac", hmacAfter) .header("x-anonymous", false.toString()) .header("platform", APP_PLATFORM) .build() LogUtil.d("header=${request.headers()}") return chain.proceed(request) } 复制代码
添加日志拦截器:HttpLoggingInterceptor
添加浏览器查看请求:StethoInterceptor
不算是拦截器OkHttpEventListener(继承于EventListener),是用于记录网络请求各个流程的时间,流量等等。
Okhttp在返回码 大于等于200并且小于300 的时候,才视为获取成功,否则会将返回的body放到错误的返回结果中,如下所示:
//Response public boolean isSuccessful() { return code >= 200 && code < 300; } /** Create an error response from {@code rawResponse} with {@code body} as the error body. */ public static <T> Response<T> error(ResponseBody body, okhttp3.Response rawResponse) { checkNotNull(body, "body == null"); checkNotNull(rawResponse, "rawResponse == null"); if (rawResponse.isSuccessful()) { throw new IllegalArgumentException("rawResponse should not be successful response"); } return new Response<>(rawResponse, null, body); } 复制代码