官网的介绍——An HTTP & HTTP/2 client for Android and Java applications。
它的优点:
implementation 'com.squareup.okhttp3:okhttp:3.14.2' 复制代码
OkHttpClient okHttpClient = new OkHttpClient(); okHttpClient.newCall( new Request.Builder() .url("https://api.github.com/users/apkcore") .build()) //enqueue是异步执行 .enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.d(TAG, "onFailure: failure"); } @Override public void onResponse(Call call, Response response) throws IOException { Log.d(TAG, "onResponse: success"); } }); 复制代码
它的使用是非常简单的,我们在Android中一般使用enqueue执行异步请求。
现在我们看它的源码实现
先看newCall的源码
@Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); } 复制代码
可以看到,直接调用了realCall的方法,说明RealCall是Call的实现,继续看RealCall.newRealCall的源码。
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. //利用传递过来的参数,new一个RealCall RealCall call = new RealCall(client, originalRequest, forWebSocket); call.transmitter = new Transmitter(client, call); return call; } 复制代码
可以看到,先调用RealCall的构造方法,其中第三个参数是否为webSocket(可以服务端主动向客户端推送)。
然后new Transmitter这个对象,放入call中,它的构造源码如下
/** * Bridge between OkHttp's application and network layers. This class exposes high-level application layer primitives: connections, requests, responses, and streams. * ... */ public final class Transmitter { public Transmitter(OkHttpClient client, Call call) { this.client = client; this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this.call = call; //主要是这句,连接的状态的时间点记录 this.eventListener = client.eventListenerFactory().create(call); this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); } } 复制代码
可以看它的类注释可以知道,这是OkHttp的应用层和网络层之间的桥梁。中文又叫发射器,这个类执行连接,请求,响应和流的真正操作。
然后,我们继续看.enqueue方法的源码,直接点的话进入的是Call接口,刚刚我们已经发现了,RealCall才是Call的实现类,所以,在RealCall中找到.enqueue方法
@Override public void enqueue(Callback responseCallback) { synchronized (this) { //每个请求只能执行一次 if (executed) throw new IllegalStateException("Already Executed"); executed = true; } // transmitter.callStart(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); } 复制代码
可以看到,每个Call只执行一次,否则会抛异常,这里先transmitter.callStart方法,绑定call,开始通知监听器组件网络请求开始了, 主要是通知 EventListener
。
然后到了关键代码 client.dispatcher().enqueue(new AsyncCall(responseCallback));
,我们分开来阅读它的源码。
final Dispatcher dispatcher; public Dispatcher dispatcher() { return dispatcher; } 复制代码
这个Dispatcher类的源码如下:
/** * Policy on when async requests are executed. * * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number * of calls concurrently. */ public final class Dispatcher { //控制最大请求并发数 private int maxRequests = 64; //单个主机最大并发数 private int maxRequestsPerHost = 5; private @Nullable Runnable idleCallback; /** Executes calls. Created lazily. */ //线程池 private @Nullable ExecutorService executorService; /** Ready async calls in the order they'll be run. */ //已经准备好异步执行的队列 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ //正在执行的异步队列 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ //正在执行的同步队列 private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); ... } 复制代码
可以知道, Dispatcher
本质上是异步请求的调度器,负责调度异步请求的执行,控制最大请求并发数和单个主机的最大并发数,并持有一个线程池来负面异步请求,对同步只做统计操作。
void enqueue(AsyncCall call) { synchronized (this) { //先把AsyncCall加到准备队列中 readyAsyncCalls.add(call); // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. //更改AsyncCall使其能共享同一主机 if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } //关键调用 promoteAndExecute(); } 复制代码
通过分析,我们知道,enqueue先把AsycCall加入到准备队列中,然后调用promoteAndExecute,其实看方法名我们也知道,这是执行代码
private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); //如果超出最大同步线程,当然这任务就老实呆在准备队列里 if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. //同理,超过单一主机连接数,继续遍历下一个asyncCall if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); asyncCall.callsPerHost().incrementAndGet(); //添加进executableCalls executableCalls.add(asyncCall); //添加到运行队列 runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); //执行线程任务 asyncCall.executeOn(executorService()); } return isRunning; } 复制代码
最后,我们看一下asyncCall.executeOn(executorService());这个代码里面的调用
//创建一个线程池 public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } 复制代码
传入进入的参数AsyncCall
final class AsyncCall extends NamedRunnable { private final Callback responseCallback; private volatile AtomicInteger callsPerHost = new AtomicInteger(0); AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } ... } 复制代码
可以看到,AsyncCall是一个runnable类,由enqueue的源码分析我们可以知道,最后调用了 asyncCall.executeOn
方法
void executeOn(ExecutorService executorService) { ... try { //执行线程池 executorService.execute(this); success = true; } catch (RejectedExecutionException e) { ... } finally { if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } } 复制代码
可以看到,线程池执行时,就会调用AsyncCall的run方法,我们去找它的run方法,发现实现在父类
public abstract class NamedRunnable implements Runnable { ... @Override public final void run() { ... execute(); ... } ... } 复制代码
可以看到,就是调用了execute方法,这个方法在AsyncCall中有重写
@Override protected void execute() { boolean signalledCallback = false; transmitter.timeoutEnter(); try { Response response = getResponseWithInterceptorChain();//1 signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } 复制代码
可以看到,真正的Respone是通过getResponseWithInterceptorChain这个方法获取的,其他的代码只是控件与回调,这也是OkHttp的最精彩的设计Interceptor(拦截器)与Chain(调用链)
上面我们分析了enqueue异步执行的源码,其实同步执行的源码也差不多
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.timeoutEnter(); transmitter.callStart(); try { //先调用Dispatcher有executed方法 client.dispatcher().executed(this); //然后调用getResponseWithInterceptorChain方法 return getResponseWithInterceptorChain方法(); } finally { client.dispatcher().finished(this); } } 复制代码
dispatcher的executed方法只是一个把任务添加到运行同步队列的方法
synchronized void executed(RealCall call) { runningSyncCalls.add(call); } 复制代码
这里对我们阅读源码没什么影响,不过可以扩展我们对网络的一些知识的了解。我们回到最开始,看 OkHttpClient
的源码中的属性
final Dispatcher dispatcher;//线程调度器,负责异步请求的执行,会平衡性能,对同步操作只做统计 final @Nullable Proxy proxy;//可配置的代理,比如翻墙访问 final List<Protocol> protocols;//给服务端列出的所支持的协议的版本 final List<ConnectionSpec> connectionSpecs;//配置的是使用的Http还是Https,以及可支持的tls等 final List<Interceptor> interceptors;//拦截器 final List<Interceptor> networkInterceptors;// final EventListener.Factory eventListenerFactory;//创建EventListener final ProxySelector proxySelector;//设置代理的用户名密码 final CookieJar cookieJar;//cookie存储器 final @Nullable Cache cache;//缓存 final @Nullable InternalCache internalCache;//内部缓存接口 final SocketFactory socketFactory;//创建tcp连接端口 final SSLSocketFactory sslSocketFactory;//ssl的factory final CertificateChainCleaner certificateChainCleaner;//整理证书链 final HostnameVerifier hostnameVerifier;//主机名验证器,给HTTPs用的 final CertificatePinner certificatePinner;//用来验证自定义证书的 final Authenticator proxyAuthenticator;//用于监听代理权限不足的回调 final Authenticator authenticator;//用于监听权限不足的回调 final ConnectionPool connectionPool;//连接池。带缓存的集合 final Dns dns;//用于设置dns,根据域名拿到ip列表 final boolean followSslRedirects;//http与https互跳时 final boolean followRedirects;//有重定向时要不要跳转 final boolean retryOnConnectionFailure;//请求失败是否重连 final int callTimeout;//call的超时时间 final int connectTimeout;//tcp超时时间 final int readTimeout;//下载超时时间 final int writeTimeout;//写入请求的超时时间 final int pingInterval;//心跳包ping的间隔 复制代码
可以看到,无论是execute还是enqueue,都是调用了Disapatch的的对应execute与enqueue方法,真正的response都是通过 getResponseWithInterceptorChain
来获取的,区别就是execute的执行是在dispatch中只做一个同步统计,而enqueue方法会在dispatch中根据当前并发数选择是否执行队列中等待的AsyncCall。
上面分析到了,网络请求的入口实际上就是在RealCall的 getResponseWithInterceptorChain
这个方法
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. //创建完整的拦截器集合 List<Interceptor> interceptors = new ArrayList<>(); //用户定义的一系列拦截器,如日志等 interceptors.addAll(client.interceptors()); //负责失败重试和重连的拦截器 interceptors.add(new RetryAndFollowUpInterceptor(client)); //把用户构造的请求转化为发送到服务器的请求,把服务器返回的结果转化为用户友好的结果 interceptors.add(new BridgeInterceptor(client.cookieJar())); //负责缓存策略的拦截器 interceptors.add(new CacheInterceptor(client.internalCache())); //负责与服务器连接的拦截器 interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } //负责向服务器发送数据和从服务器获取数据的拦截器 interceptors.add(new CallServerInterceptor(forWebSocket)); //把这个集合转换成链 Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis()); boolean calledNoMoreExchanges = false; try { //关键执行代码 Response response = chain.proceed(originalRequest); ... return response; } catch (IOException e) { ... } finally { ... } } 复制代码
从源码中可以看出,它会先把所有的拦截器,转换成一条链RealInterceptorChain,再执行RealInterceptorChain中的proceed方法
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); ... // Call the next interceptor in the chain. //从下一位开始的拦截器链 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); //调用对应拦截器的intercept方法 Response response = interceptor.intercept(next); ... return response; } 复制代码
而每一个interceptor.intercept的方法体里,都调用了realChain.proceed方法,这样chain与interceptor互相递归调用,直到世界(链)的尽头。
在这里其实是用到了责任链模式,比如,CacheInterceptor中
@Override public Response intercept(Chain chain) throws IOException { Response cacheCandidate = cache != null ? cache.get(chain.request()) ... Request networkRequest = strategy.networkRequest; ... // If we don't need the network, we're done. // 如果不需要网络直接可以返回缓存结果 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } ... } 复制代码
在源码中,如果可以直接取缓存里的数据时,是直接return的,不需要走后面的网络请求流程,这也是责任链模式所带来的便利性。
使用了责任链模式,每个拦截器完成的大致流程是
可以看到,其实如果仔细看ConnectInterceptor,CallServerInterceptor等的源码,我们能发现,okhttp在内部自己完整的实现了一套TCP、TLS/SSL连接建立,HTTP的报文传输,各种http特性的支持(重试,跳转,cache,cookie等)。有兴趣的盆友可以自己跟着源码阅读一下,参考地址 dieyidezui.com/okhttp-3-4-…