文章首发在个人博客 https://www.nullobject.cn ,公众号 NullObject 同步更新。
文章基于 OkHttp3.14.3 版本
上一篇 OkHttp源码分析(一)请求和响应过程简单分析 中我们简单分析了OkHttp从请求到响应的过程,这篇就来深入学习下其中涉及到的比较关键的类:
OkHttpClient类主要应用了和两种设计模式来设计,结合外观模式的思想,将许多对应OkHttp中各个功能模块的对象包含到类中,并为这些功能对象的配置提供了共同的对外接口。同时使用建造者模式,提供一个Builder类为这些众多的功能模块提供链式的配置方式,使得繁杂的功能模块配置变得简洁。首先,创建一个OkHttpClient.builder对象,接着按需设置builder各个参数:
OkHttpClient.Builder builder = new OkHttpClient.Builder();
OkHttpClient中超时时间参数有以下几种:
// 设置整个请求过程最大超时时间为60s builder.callTimeout(Duratino.ofSeconds(60));
// 设置连接建立超时时间 builder.connectTimeout(Duration.ofSeconds(10));
// 设置读超时时间 builder.readTimeout(Duration.ofSeconds(10));
// 设置写超时时间 builder.writeTimeout(Duration.ofSeconds(10));
builder.retryOnConnectionFailure(true);
OkHttpClient支持添加多个HTTP/HTTPS请求拦截器和WebSocket拦截器:
builder.addInterceptor(chain -> chain.proceed(chain.request()));
builder.addNetworkInterceptor(chain -> chain.proceed(chain.request()));
OkHttp支持自定义缓存的路径和大小,以及Cookie的缓存处理:
// 设置缓存文件,用于将HTTP/HTTPS响应缓存到文件系统从而达到重用的目的以节省时间和网络带宽 builder.cache(new Cache(new File("cache_path"), 1024 * 1024))
// 将cookie缓存到内存中 builder.cookieJar(new CookieJar() { private final HashMap<String, List<Cookie>> cookieStore = new HashMap<>(); @Override public void saveFromResponse(final HttpUrl url, final List<Cookie> cookies) { cookieStore.put(url.host(), cookies); } @Override public List<Cookie> loadForRequest(final HttpUrl url) { List<Cookie> cookies = cookieStore.get(url.host()); return null != cookies ? cookies : new ArrayList<Cookie>(); } })
// 自定义dns解析,屏蔽百度用域名解析并使用系统提供的DNS解析服务解析其他域名 builder.dns(hostname -> { // 屏蔽百度链接 if (hostname.contains("baidu.com")) { List<InetAddress> addresses = new ArrayList<>(); addresses.add(InetAddress.getByAddress(new byte[]{(byte) 127, (byte) 0, (byte) 0, (byte) 1})); return addresses; } return Dns.SYSTEM.lookup(hostname); })
builder.followRedirects(true) .followSslRedirects(true);
builder.pingInterval(Duration.ofSeconds(59));
builder.protocols(Util.immutableList(Protocol.HTTP_2, Protocol.HTTP_1_1));
// 手动配置连接池,其中ConnectionPool第一个参数表示池内容纳的最大连接 builder.connectionPool(new ConnectionPool(5, 5, TimeUnit.MINUTES));
builder.dispatcher(new Dispatcher()); //手动配置执行请求任务的线程池 builder.dispatcher(new Dispatcher(Executors.newFixedThreadPool(64)));
OkHttp中的 EventListener ,对于每次请求,犹如"上帝视角"般的存在:如果为OkHttpClient设置了EventListener,则一个请求从发起到结束的所有步骤都会被EventListener“看”到,请求的完整生命周期事件都会通过EventListener对应的接口回调给上层,因此,在开发debug阶段,或想要了解一个请求需要经历哪些流程时,也可以通过设置EventListener来获取相应信息。
// EventListener.NONE不监听任何事件 builder.eventListenerFactory(call -> EventListener.NONE);
public Builder eventListener(EventListener eventListener) { if (eventListener == null) throw new NullPointerException("eventListener == null"); this.eventListenerFactory = EventListener.factory(eventListener); return this; }
可以通过以下三个选项设置请求代理:
builder.proxySelector(ProxySelector.getDefault());
builder.proxyAuthenticator(Authenticator.NONE);
builder.proxy(Proxy.NO_PROXY);
builder.socketFactory(SocketFactory.getDefault());
// 设置默认固定证书 builder.certificatePinner(CertificatePinner.DEFAULT);
builder.connectionSpecs(Util.immutableList(ConnectionSpec.MODERN_TLS, ConnectionSpec.CLEARTEXT));
builder.sslSocketFactory((SSLSocketFactory) SSLSocketFactory.getDefault(), Util.platformTrustManager());
最后,生成OkHttpClient对象:
OkHttpClient client = builder.build();
接下来看看Request类。
Request封装了请求的内容,包括链接、请求体参数、请求头等,以及请求tag,比较简单。Request也是通过Builder构建:
// Step 2. 构建一个Request用于封装请求地址、请求类型、参数等信息 Request request = new Request.Builder().get() .url("https://www.baidu.com") .build();
创建好 OkHttpClient 和 Request 之后,就可以生成请求任务,发起请求了,接下来看 Call 和其实现类 RealCall 。
当前版本的OkHttp中,接口 Call 只有 RealCall 这一个实现类。Call表示一个已经准备好,可以执行的请求任务,Call执行时可以取消,但一个Call只能被执行一次。Call除了封装分别用于执行 同步 和 异步 请求的 execute() 、 enqueue(callback) 两个接口外,还封装了其他几个请求相关的接口:
接下来看看 RealCall 类源码。
我们在发起Http请求时会通过OkHttpClient和Request对象来创建Call:
// 创建一个新的请求任务Call Call call = httpClient.newCall(request);
而httpClient.newCall()方法内部通过RealCall的静态方法newCall创建并返回一个RealCall对象,所以在执行 同步 / 异步 请求时实际调用的是RealCall中的实现方法:
// OkHttpClient.newCall @Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); }
打开RealCall源码可以看到,RealCall内部持有本次请求的Request对象和OkHttpClient对象,同时还发现,RealCall还声明了一个 Transmitter 类型的对象并随着RealCall的创建而创建。Transmitter作为OkHttp的应用层和网络层的连接,负责对外暴露OkHttp中的高级应用程序层原语,包括连接、请求、响应和流等。结合RealCall源码可以发现,RealCall负责的是请求发起和执行,Transmitter则负责请求任务的状态、超时时间、生命周期事件的更新以及请求任务背后的连接、连接池的维护管理等。
/*RealCall.java:通过transmitter控制超时时间的计算、生命周期步骤更新、取消请求等*/ @Override public Response execute() throws IOException { // ... 忽略其他代码 transmitter.timeoutEnter(); transmitter.callStart(); // ... 忽略其他代码 } @Override public void enqueue(Callback responseCallback) { // ... 忽略其他代码 transmitter.callStart(); // ... 忽略其他代码 } @Override public void cancel() { transmitter.cancel(); } @Override public Timeout timeout() { return transmitter.timeout(); }
再看看clone()方法实现:
@SuppressWarnings("CloneDoesntCallSuperClone") // We are a final type & this saves clearing state. @Override public RealCall clone() { return RealCall.newRealCall(client, originalRequest, forWebSocket); }
可以看到,clone内部创建了一个新的Call,相当于调用了 (RealCall)client.newCall(request)
,因此可以用这个克隆的Call继续发起请求。
RealCall还封装了个内部类 AsyncCall 用于执行异步请求,AsyncCall声明了CallBack变量用于回调通知异步请求结果,以及一个线程安全的AtomicInteger类型变量callsPerHost用于计量同一主机的请求数。通过上一篇的分析知道,AsyncCall是一个Runnable并且最终通过 AsyncCall.execute() 方法执行网络请求。那RealCall内部是怎样执行到这个这个方法的呢?我们之前是通过快速跳转实现的方式找到了这个方法的,而AsyncCall封装的异步请求任务是在RealCall.enqueue执行时被添加到Dispatch中的请求队列:
// RealCall.enqueue() @Override public void enqueue(Callback responseCallback) { // ... 忽略无关代码 client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
那么只要搞清楚Dispatcher.enqueue()背后的队列中的任务何时何地执行的,上面的问题就有答案了。
Dispatcher是OkHttp中的请求任务调度器,内部维护了一个线程池和相关的请求队列用于实现高并发的异步请求:
public final class Dispatcher { // 最大并发请求数,默认为64个 private int maxRequests = 64; // 相同服务器主机的最大并发请求数,默认为5个 private int maxRequestsPerHost = 5; // 空闲回调,如果设置,则当该调度器空闲时(正在执行的任务数变为0)时回调通知idleCallback.run() private @Nullable Runnable idleCallback; // 执行异步任务的线程池 private @Nullable ExecutorService executorService; // 存放已经准备就绪可以执行,但尚未执行的异步请求任务的双向队列 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); // 存放正在执行的异步请求任务,包括已经取消但未完全结束的请求的双向队列 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); // 存放正在执行的同步请求任务,包括已经取消但未完全结束的请求的双向队列 private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); //...其他代码 }
其中线程池对象通过懒加载方式创建:
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
可以看到,executorService实际创建的是一个无边界、核心线程数为 0 的线程池。其中池内线程空闲等待时长为 60 s,超过空闲时间自动结束;工作队列workQueue为 SynchronousQueue 类型的队列,该队列是一个同步队列,保证并发的任务顺序执行,且该类型队列内部不存储值,只作传递,由于executorService创建的线程数无限制,不会有队列等待,所以使用SynchronousQueue(参考Executors.newCachedThread()创建缓存线程池);
:
SynchronousQueue每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量为0,严格说并不是一种容器,由于队列没有容量,因此不能调用peek等操作,因此只有移除元素才有元素,显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入者(生产者)传递给移除者(消费者),这在多任务队列中最快的处理任务方式。对于高频请求场景,无疑是最合适的。
在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
接着第3节RealCall分析最后的问题,来看看Dispatcher.enqueue()方法的实现:
// Dispatcher.enqueue() void enqueue(AsyncCall call) { synchronized (this) { // 将新的异步请求任务添加到readyAsyncCalls队列 readyAsyncCalls.add(call); // 修改call,使其共享runningAsyncCalls或readyAsyncCalls中现有的请求相同主机的call.callsPerHost变量 if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } // 执行AsyncCall请求调度策略,发起call请求 promoteAndExecute(); }
对于新请求入列的异步请求任务 call ,首先将其添加到readyAsyncCalls队列,以表示这个call准备就绪,可以执行请求;接着修改这个call的 callsPerHost 属性为与先前添加的相同主机请求任务的call共享,从而实现对相同主机的请求计数,对相同主机的最大并发请求数进行限制。接着调用 promoteAndExecute() 方法,将readyAsyncCalls队列中的任务提升到runningAsyncCalls,并执行请求:
private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); // 创建可执行任务列表,用于筛选出readyAsyncCalls中可执行的任务 List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { // 遍历筛选readyAsyncCalls for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); // 如果runningAsyncCalls中的请求任务数超过最大并发请求数限制maxRequests则任务继续放在readyAsyncCalls中等待执行 if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. // 如果与asyncCall相同主机的请求数超过最大并发同主机请求数则,则不执行该请求 if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity. // 从readyAsyncCalls中移除符合条件的请求任务 i.remove(); // 将asyncCall关联的主机请求数增1 asyncCall.callsPerHost().incrementAndGet(); // 加入可执行请求任务列表 executableCalls.add(asyncCall); // 加入runningAsyncCalls任务队列 runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } // 遍历执行请求任务 for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; }
可以看到,Dispatcher请求调度器最终是在promoteAndExecute()方法中实现最大并发请求数量和最大并发同主机请求数量限制的。在方法的最后遍历执行请求任务,调用了每一个AsyncCall的executeOn()方法并将当前Dispatcher的线程池作为参数传入,在看看这个AsyncCall.executeOn(executorService)方法的实现:
// RealCall.AsyncCall.executeOn() void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { // 通过executorService线程池执行请求任务 executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); // 发生异常时关闭连接 transmitter.noMoreExchanges(ioException); responseCallback.onFailure(RealCall.this, ioException); } finally { // 最后如果请求任务执行失败则结束任务,从runningAsyncCalls中将任务移除 if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } }
AsyncCall.executeOn()方法内部调用线程池的execute方法执行本次任务,由此触发AsyncCall父类的run方法,并执行到AsyncCall的execute()方法,完成了本次请求!因此,AsyncCall.execute()调用过程大致如下:
最后,调用Dispatcher.finished(call)方法结束本次请求:
@Override protected void execute() { // ... 忽略无关代码 try { // ... 忽略无关代码 } catch (IOException e) { // ... 忽略无关代码 } finally { // 结束本次请求 client.dispatcher().finished(this); } }
// Dispatcher.finished(call) void finished(AsyncCall call) { // 将call同主机并发请求数减1 call.callsPerHost().decrementAndGet(); // 结束本次请求任务,主动将call从runningAsyncCalls队列移除 finished(runningAsyncCalls, call); }
以上从Dispatcher.enqueue()开始到Dispatcher.finished(call)结束就是Dispatcher调度异步请求的过程。Dispatcher对同步请求的调度执行就简单多了,单线程任务,直接从RealCall.execute跟进分析即可。不管是同步还是异步请求,最终都是通过RealCall的getResponseWithInterceptorChain()方法完成请求和获取响应结果的,那getResponseWithInterceptorChain()方法内部是如何通过拦截器链完成请求的呢?下一篇就来分析分析OkHttp中的拦截器Interceptor。
欢迎关注个人公众号: