我们先看一下怎么使用OKhtttp完成WebSocket的请求:
//设置连接超时时间 mOkHttpClient = new OkHttpClient.Builder().connectTimeout(9 * 10, TimeUnit.SECONDS).build(); Request request = new Request.Builder().url(BASE_URL).build(); mWebSocket = mOkHttpClient.newWebSocket(request, this); 复制代码
重点在这里,打开OkHttpClient.class查找newWebSocket()方法:
/** * Uses {@code request} to connect a new web socket. */ @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) { RealWebSocket webSocket = new RealWebSocket(request, listener, new Random()); webSocket.connect(this); return webSocket; } 复制代码
这里传入request对象和websocket的专用监听WebSocketListener,WebSocketListener 对象稍后再做赘述,主流程还是看RealWebSocket.class的connect()方法: 步骤1:
client = client.newBuilder() .protocols(ONLY_HTTP1) .build(); 复制代码
我们都知道普通的请求时client是需要被bulid的,这里拿到OkHttpClient又重新创建了一遍,一开始就创建好了干嘛还要创建创建呢?看这个方法:protocols(ONLY_HTTP1),
private static final List<Protocol> ONLY_HTTP1 = Collections.singletonList(Protocol.HTTP_1_1); 复制代码
步骤2:
final Request request = originalRequest.newBuilder() .header("Upgrade", "websocket") .header("Connection", "Upgrade") .header("Sec-WebSocket-Key", key) .header("Sec-WebSocket-Version", "13") .build(); 复制代码
对request对象的头部加工,
步骤3:
call = Internal.instance.newWebSocketCall(client, request); 复制代码
从OkHttpClient中 获取WebSocket的call对象(回调使用),这个Internal.instance虽然是接口方法,其实现是在OkHttpClient中,直接看对应方法:
@Override public Call newWebSocketCall(OkHttpClient client, Request originalRequest) { return new RealCall(client, originalRequest, true); } 复制代码
步骤4:搜嘎 原来enqueue()方法是使用RealCall.class的enqueue()方法,这是一个入队的方法,而且是个异步的方法。这就说明webSocket建立连接后才响应回调。而且如果是长连接那么这个线程就一直在线程池里不会被释放掉。
call.enqueue(new Callback() { @Override public void onResponse(Call call, Response response) { try { checkResponse(response); } catch (ProtocolException e) { failWebSocket(e, response); closeQuietly(response); return; } 复制代码
照现在的进度已经到了设置好的回调要开始执行了,那就转战RealCall
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); } 复制代码
其实我对okhttp同步请求有几点疑惑: 1一开始我没有创建线程,那么这个请求就是在主线程中吗? 2如果是同步请求那么如果同时多次请求是不是如果前面的请求在执行后面的请求在进入等待的状态了呢? 其实这些问题就需要从dispatcher()的线程池入手了。
这个dispatcher在一开始介绍ok的时候已经介绍过了,我们来看dispatcher中的enqueue()方法: 嘿嘿嘿,又到了OkHttp请求里了 而且 这时候realCall内部创建了AsyncCall(异步的Call),其实看方法名就应该知道的,ok的webSocket都是使用异步的,而且我们要明白现在只是一个最初的socket,之后的通信,都会在该线程池的一个线程中进行。
问题1:ok的websocket是异步的,并不会阻塞主线程,而且也不需要单独开辟一个子线程来创建连接。 问题2:会不会阻塞首先我们再次看看这个executorService的线程池结构。虽然在同步篇对dispatcher的线程池做过介绍,但是在我看来还是很解释不够清晰的地方: 首先 这个是dispatcher线程池的结构
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); 复制代码
我在这里做一个详细的说明:首先,SynchronousQueue是一个无缓存的阻塞的队列,什么意思呢?我们可以理解为当这个队列中有元素的时候,这个元素没有被取走(take方法)之前是不允许继续对之后的内容进行操作。
注意1:它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。 注意2:它是线程安全的,是阻塞的。 注意3:不允许使用 null 元素。 注意4:公平排序策略是指调用put的线程之间,或take的线程之间。公平排序策略可以查考ArrayBlockingQueue中的公平策略。 所以这又解决了一个困扰我多年的难题: okhttp的能同时执行多少个请求? 这个线程池的配置其实就是Executors提供的线程池配置方案之一,构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
用一个形象的比喻就是一个传球手,当从主线程传进了任务,就创建一个runnable来接收。
这里是Dispatcher的异步启动方法:
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } 复制代码
在这里专门用runningAsyncCalls来记录在执行的Call,每次执行都会记录,当向executor添加call的时候,根据2,将任务放入SynchronousQueue中等待前面的request被取出才能执行之后的request,这里maxRequests 被定为64.超出64的将会被放入readyAsyncCalls。 ready和running之间怎么传递呢? 这就需要我们对比分析下RealCall这个类: 同步的时候是调用RealCall的:@Override public Response execute() throws IOException 异步的时候是调用AsyncCall的:
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } 复制代码
事件的回调已经具备了,回收需要看这里.finished(this)方法,最终会调用这个:
private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } } 复制代码
那么问题又来了, 请对比分析Ok与Volley的优缺点。
此前我先声明一点,一个websocket链接的建立是在一个子线程当中,如果链接不关闭这个子线程一直存在, 在链接前 我们创建了一个RealWebSocket.class我们进它的构造里看看也许有个惊喜:
public RealWebSocket(Request request, WebSocketListener listener, Random random) { //省略部分代码 this.writerRunnable = new Runnable() { @Override public void run() { try { while (writeOneFrame()) { } } catch (IOException e) { failWebSocket(e, null); } } }; } 复制代码
在这里创建了一个写的线程,writerRunnable 再看connect()方法:这次只需要看call的回调就可以。根据现在的流程,链接成功,走了成功的回调,Call的onResponse方法:
try { listener.onOpen(RealWebSocket.this, response); String name = "OkHttp WebSocket " + request.url().redact(); initReaderAndWriter(name, pingIntervalMillis, streams); streamAllocation.connection().socket().setSoTimeout(0); loopReader(); } catch (Exception e) { failWebSocket(e, null); } } 复制代码
核心代码在这里: 1.initReaderAndWriter()初始化读写者。这是为同服务器交互进行准备?
this.writer = new WebSocketWriter(streams.client, streams.sink, random); this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false)); 复制代码
准备了Writer,准备了定时任务(心跳链接ping——pong) runWriter();方法都做了什么呢? private void runWriter() { assert (Thread.holdsLock(this));
if (executor != null) { executor.execute(writerRunnable); } } 复制代码
哈哈 原来是为心跳链接做准备啊,定时进行通知服务器 我还在哈。
2.loopReader()开始轮训读取消息(随时准备接受来自服务器的消息)
public void loopReader() throws IOException { while (receivedCloseCode == -1) { // This method call results in one or more onRead* methods being called on this thread. reader.processNextFrame(); } } 复制代码
这不,一直循环调用reader.processNextFrame();
/** * Process the next protocol frame. * * <ul> * <li>If it is a control frame this will result in a single call to {@link FrameCallback}. * <li>If it is a message frame this will result in a single call to {@link * FrameCallback#onReadMessage}. If the message spans multiple frames, each interleaved * control frame will result in a corresponding call to {@link FrameCallback}. * </ul> */ void processNextFrame() throws IOException { readHeader(); if (isControlFrame) { readControlFrame(); } else { readMessageFrame(); } } 复制代码
没办法 注释写的太好了,我忍不住都粘贴了进来: 1如果是控制帧将会有一个单一的callback:FrameCallback 2如果是消息帧也会有一个单一的callback:FrameCallback#onReadMessage
看到这里websocket基本上已经完了,剩下的就是调用监听了。 ~~~~~~~~~~~~~~ 补充部分 ~~~~~~~~~~~~~~~
感谢网友朋友细心指导,因为写这篇文章比较早(细节忘了很多,尴尬)还原问题: “框架会自动发送ping包吗? 怎么设置发送间隔时间呢?”
真的会,而且在而且OkHttpClient也支持设置心跳间隔:
// Promote the HTTP streams into web socket streams. StreamAllocation streamAllocation = Internal.instance.streamAllocation(call); 复制代码
还对 ping pong的次数进行了记录:至于怎么发送ping 需要看这个:
initReaderAndWriter(name, pingIntervalMillis, streams); 复制代码
没错 又追踪到了初始化读写者,在初始化读写者的时候有这样一句(多看一句就能回答 读者的问题了 甚是惭愧):
if (pingIntervalMillis != 0) { executor.scheduleAtFixedRate( new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS); } 复制代码
由此可见: 1 如果pingIntervalMillis 设置为0的时候 心跳executor是不会执行的。 2 executor 原来也负责心跳包的定时任务
让我们看看 pingrunnable里都做了什么吧:
private final class PingRunnable implements Runnable { PingRunnable() { } @Override public void run() { writePingFrame(); } } void writePingFrame() { WebSocketWriter writer; synchronized (this) { if (failed) return; writer = this.writer; } try { writer.writePing(ByteString.EMPTY); } catch (IOException e) { failWebSocket(e, null); } } 复制代码
果然简单实用: 一个runnable 调用writer的writePing方法。想一想还是很合理啊,毕竟发送消息就是需要 writer来做,所以 writer有这些方法也不足为其。具体writer怎么写 我们看下:
/** Send a ping with the supplied {@code payload}. */ void writePing(ByteString payload) throws IOException { synchronized (this) { writeControlFrameSynchronized(OPCODE_CONTROL_PING, payload); } } /** Send a pong with the supplied {@code payload}. */ void writePong(ByteString payload) throws IOException { synchronized (this) { writeControlFrameSynchronized(OPCODE_CONTROL_PONG, payload); } } 复制代码
顺便一瞅 就在下边有个pong的发送方法,分析一下: 1 入参payload 是ByteString.EMPTY 就是一个空的字节, 2 最终都是相同的方法writeControlFrameSynchronized, 3 对于消息的区分:依靠writeControlFrameSynchronized的第一个入参opcode, 4 writeControlFrameSynchronized这个方法虽然没有注释 但是 即然写消息都需要调用这个方法,相比这个方法才是writer的实力担当:
private void writeControlFrameSynchronized(int opcode, ByteString payload) throws IOException { assert Thread.holdsLock(this); if (writerClosed) throw new IOException("closed"); int length = payload.size(); if (length > PAYLOAD_BYTE_MAX) { throw new IllegalArgumentException( "Payload size must be less than or equal to " + PAYLOAD_BYTE_MAX); } int b0 = B0_FLAG_FIN | opcode; sink.writeByte(b0); int b1 = length; if (isClient) { b1 |= B1_FLAG_MASK; sink.writeByte(b1); random.nextBytes(maskKey); sink.write(maskKey); byte[] bytes = payload.toByteArray(); toggleMask(bytes, bytes.length, maskKey, 0); sink.write(bytes); } else { sink.writeByte(b1); sink.write(payload); } sink.flush(); } 复制代码
操作太6 ,表示职能看懂个大概 , 都被写入这个sink中了!!!
问题来了:sink是什么东西?
/** Writes must be guarded(被守护的) by synchronizing on 'this'. */ final BufferedSink sink; 复制代码
没有交代,但是有这样一个提醒,对sink写的时候必须是被synchronizing保护的 这样我算是明白为嘛ping和pong的方法都会加锁了(他说咋做就咋做 嘻嘻 稍后看)。
我们先从单词上理解这个变量的意义吧:sink,水槽,洗涤池,什么鬼?看不懂。。。我还是看BufferedSink吧:
都说了是个小型的缓冲池,因此在写的时候会对大小进行限制: static final long PAYLOAD_BYTE_MAX = 125L;
虽然是个接口但是已经给了我们足够多的有效信息,让我们看看在创建的时候是怎么实现这个BufferedSink,回到最初writer创建的地方:
this.writer = new WebSocketWriter(streams.client, streams.sink, random); 复制代码
哦?在初始化的时候从Stream中获取的。在向上找当初的stream是怎么创建的: 当链接成功后就会 返回一个Call:
@Override public void onResponse(Call call, Response response) // Promote the HTTP streams into web socket streams. // 促进 http流初始化这个socket流 StreamAllocation streamAllocation = Internal.instance.streamAllocation(call); // Prevent connection pooling! // 防止连接共用 streamAllocation.noNewStreams(); //创建 Stream Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation); 复制代码
看来一切的谜底都在 RealConnection的newWebSockerStreams里:
public RealWebSocket.Streams newWebSocketStreams(final StreamAllocation streamAllocation) { return new RealWebSocket.Streams(true, source, sink) { @Override public void close() throws IOException { streamAllocation.streamFinished(true, streamAllocation.codec()); } }; } 复制代码
呵呵,看到真相我有点想放弃, new RealWebSocket.Streams(true, source, sink) sink就是这样被赋予的,让我回想一下,RealConnection还是挺熟悉的,是在什么时候创建的呢? 今天先研究到这里我容我仔细研究一番。。。