NIO是面向缓冲区进行通信的,不是面向流的。我们都知道,既然是缓冲区,那它一定存在一个固定大小。这样一来通常会遇到两个问题:
介绍这个问题之前,务必要提一下我代码整体架构。
代码参见GitHub仓库
https://github.com/CuriousLei/smyl-im
在这个项目中,我的NIO核心库设计思路流程图如下所示
介绍:
光这样实现,必然会有粘包、半包问题。要重现这两个问题也很简单。
这个问题实质上是消息体与缓冲区数据不一一对应导致的。那么,如何解决呢?
可以采用固定头部方案来解决,头部设置四个字节,存储一个int值,记录后面数据的长度。以此来标记一个消息体。
我的工程项目中,客户端和服务端共用一个nio核心包,即niohdl,可保证收发数据格式一致。
要实现以上设想,必须在connector和ioArgs之间加一层Dispatcher类,用于处理 消息体 与 缓冲区 之间的转化关系(消息体取个名字:Packet)。根据输入和输出的不同,分别叫ReceiveDispatcher和SendDispatcher。即通过它们来操作Packet与ioArgs之间的转化。
定义这个消息体,继承关系如下图所示:
Packet是基类,代码如下:
package cn.buptleida.niohdl.core; import java.io.Closeable; import java.io.IOException; /** * 公共的数据封装 * 提供了类型以及基本的长度的定义 */ public class Packet implements Closeable { protected byte type; protected int length; public byte type(){ return type; } public int length(){ return length; } @Override public void close() throws IOException { } }
SendPacket和ReceivePacket分别代表发送消息体和接收消息体。StringReceivePacket和StringSendPacket代表字符串类的消息,因为本次实践只限于字符串消息的收发,今后可能有文件之类的,有待扩展。
代码中必然会涉及到字节数组的操作,所以,以StringSendPacket为例,需要提供将String转化为byte[]的方法。代码如下所示:
package cn.buptleida.niohdl.box; import cn.buptleida.niohdl.core.SendPacket; public class StringSendPacket extends SendPacket { private final byte[] bytes; public StringSendPacket(String msg) { this.bytes = msg.getBytes(); this.length = bytes.length;//父类中的实例域 } @Override public byte[] bytes() { return bytes; } }
在connector对象的实例域中会引用一个SendDispatcher对象。发送数据时,会通过SendDispatcher中的方法对数据进行封装和处理。其大致的关系图如下所示:
queue,需要发送消息时,connector将消息写入sendPacket,并存入队列queue,执行出队。用packetTemp变量引用出队的元素,将四字节的长度信息和packetTemp写入ioArgs的缓冲区中,发送完毕之后,再判断packetTemp是否完整写出(使用position和total指针标记、判断),决定继续输出packetTemp的内容,还是开始下一轮出队。
这个过程的程序框图如下所示:
在代码中,SendDispatcher实际上是一个接口,我用AsyncSendDispatcher实现此接口,代码如下:
package cn.buptleida.niohdl.impl.async; import cn.buptleida.niohdl.core.*; import cn.buptleida.utils.CloseUtil; import java.io.IOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; public class AsyncSendDispatcher implements SendDispatcher { private final AtomicBoolean isClosed = new AtomicBoolean(false); private Sender sender; private Queue<SendPacket> queue = new ConcurrentLinkedDeque<>(); private AtomicBoolean isSending = new AtomicBoolean(); private ioArgs ioArgs = new ioArgs(); private SendPacket packetTemp; //当前发送的packet大小以及进度 private int total; private int position; public AsyncSendDispatcher(Sender sender) { this.sender = sender; } /** * connector将数据封装进packet后,调用这个方法 * @param packet */ @Override public void send(SendPacket packet) { queue.offer(packet);//将数据放进队列中 if (isSending.compareAndSet(false, true)) { sendNextPacket(); } } @Override public void cancel(SendPacket packet) { } /** * 从队列中取数据 * @return */ private SendPacket takePacket() { SendPacket packet = queue.poll(); if (packet != null && packet.isCanceled()) { //已经取消不用发送 return takePacket(); } return packet; } private void sendNextPacket() { SendPacket temp = packetTemp; if (temp != null) { CloseUtil.close(temp); } SendPacket packet = packetTemp = takePacket(); if (packet == null) { //队列为空,取消发送状态 isSending.set(false); return; } total = packet.length(); position = 0; sendCurrentPacket(); } private void sendCurrentPacket() { ioArgs args = ioArgs; args.startWriting();//将ioArgs缓冲区中的指针设置好 if (position >= total) { sendNextPacket(); return; } else if (position == 0) { //首包,需要携带长度信息 args.writeLength(total); } byte[] bytes = packetTemp.bytes(); //把bytes的数据写入到IoArgs中 int count = args.readFrom(bytes, position); position += count; //完成封装 args.finishWriting();//flip()操作 //向通道注册OP_write,将Args附加到runnable中;selector线程监听到就绪即可触发线程池进行消息发送 try { sender.sendAsync(args, ioArgsEventListener); } catch (IOException e) { closeAndNotify(); } } private void closeAndNotify() { CloseUtil.close(this); } @Override public void close(){ if (isClosed.compareAndSet(false, true)) { isSending.set(false); SendPacket packet = packetTemp; if (packet != null) { packetTemp = null; CloseUtil.close(packet); } } } /** * 接收回调,来自writeHandler输出线程 */ private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() { @Override public void onStarted(ioArgs args) { } @Override public void onCompleted(ioArgs args) { //继续发送当前包packetTemp,因为可能一个包没发完 sendCurrentPacket(); } }; }
同样,ReceiveDispatcher也是一个接口,代码中用AsyncReceiveDispatcher实现。在connector对象的实例域中会引用一个AsyncReceiveDispatcher对象。接收数据时,会通过ReceiveDispatcher中的方法对接收到的数据进行拆包处理。其大致的关系图如下所示:
每一个消息体的首部会有一个四字节的int字段,代表消息的长度值,按照这个长度来进行读取。如若一个ioArgs未满足这个长度,就读取下一个ioArgs,保证数据包的完整性。这个流程就不画程序框图了,偷个懒hhhh。其实看下面代码注释已经很清晰了,容易理解。
AsyncReceiveDispatcher的代码如下所示:
package cn.buptleida.niohdl.impl.async; import cn.buptleida.niohdl.box.StringReceivePacket; import cn.buptleida.niohdl.core.ReceiveDispatcher; import cn.buptleida.niohdl.core.ReceivePacket; import cn.buptleida.niohdl.core.Receiver; import cn.buptleida.niohdl.core.ioArgs; import cn.buptleida.utils.CloseUtil; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; public class AsyncReceiveDispatcher implements ReceiveDispatcher { private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Receiver receiver; private final ReceivePacketCallback callback; private ioArgs args = new ioArgs(); private ReceivePacket packetTemp; private byte[] buffer; private int total; private int position; public AsyncReceiveDispatcher(Receiver receiver, ReceivePacketCallback callback) { this.receiver = receiver; this.receiver.setReceiveListener(ioArgsEventListener); this.callback = callback; } /** * connector中调用该方法进行 */ @Override public void start() { registerReceive(); } private void registerReceive() { try { receiver.receiveAsync(args); } catch (IOException e) { closeAndNotify(); } } private void closeAndNotify() { CloseUtil.close(this); } @Override public void stop() { } @Override public void close() throws IOException { if(isClosed.compareAndSet(false,true)){ ReceivePacket packet = packetTemp; if(packet!=null){ packetTemp = null; CloseUtil.close(packet); } } } /** * 回调方法,从readHandler输入线程中回调 */ private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() { @Override public void onStarted(ioArgs args) { int receiveSize; if (packetTemp == null) { receiveSize = 4; } else { receiveSize = Math.min(total - position, args.capacity()); } //设置接受数据大小 args.setLimit(receiveSize); } @Override public void onCompleted(ioArgs args) { assemblePacket(args); //继续接受下一条数据,因为可能同一个消息可能分隔在两份IoArgs中 registerReceive(); } }; /** * 解析数据到packet * @param args */ private void assemblePacket(ioArgs args) { if (packetTemp == null) { int length = args.readLength(); packetTemp = new StringReceivePacket(length); buffer = new byte[length]; total = length; position = 0; } //将args中的数据写进外面buffer中 int count = args.writeTo(buffer,0); if(count>0){ //将数据存进StringReceivePacket的buffer当中 packetTemp.save(buffer,count); position+=count; if(position == total){ completePacket(); packetTemp = null; } } } private void completePacket() { ReceivePacket packet = this.packetTemp; CloseUtil.close(packet); callback.onReceivePacketCompleted(packet); } }
其实粘包、半包的解决方案并没有什么奥秘,单纯地复杂而已。方法核心就是自定义一个消息体Packet,完成Packet中的byte数组与缓冲区数组之间的复制转化即可。当然,position、limit等等指针的辅助很重要。
总结这个博客,也是将目前为止的工作进行梳理和记录。我将通过 smyl-im 这个项目来持续学习+实践。因为之前学习过程中有很多零碎的知识点,都躺在我的有道云笔记里,感觉没必要总结成博客。本次博客讲的内容刚好是一个成体系的东西,正好可以将这个项目背景带出来,后续的博客就可以在这基础上衍生拓展了。