上次提到要改进我们的RPC框架,这周花时间研究一下JDK提供给我们的原生NIO非阻塞式网络编程思想。NIO 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。
BIO与NIO的主要区别
java NIO和BIO之间第一个最大的区别是,BIO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
2. 阻塞与非阻塞
Java BIO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
3. NIO特有的Selector选择器机制
Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。
今天我们就基于以上的理解,实现一个端对端的非阻塞式IO的网络编程。
/** * @author andychen https://blog.51cto.com/14815984 * @description:NIO客户端核心处理器 */ public class NioClientHandler implements Runnable { //服务端主机 private final String host; //服务端口 private final int port; /**定义NIO选择器:用于注册和监听事件 * 选择监听的事件类型: OP_READ 读事件 / OP_WRITE 写事件 * OP_CONNECT 客户端连接事件 / OP_ACCEPT 服务端接收通道连接事件 */ private Selector selector = null; //定义客户端连接通道 private SocketChannel channel = null; //运行状态是否被激活 private volatile boolean activated=false; public NioClientHandler(String host, int port) { this.port = port; this.host = host; this.init(); } /** * 处理器初始化 * 负责建立连接准备工作 */ private void init(){ try { //创建并打开选择器 this.selector = Selector.open(); //建立并打开监听通道 this.channel = SocketChannel.open(); /** * 设置通道通讯模式为非阻塞,NIO默认为阻塞式的 */ this.channel.configureBlocking(false); //激活运行状态 this.activated = true; } catch (IOException e) { e.printStackTrace(); this.stop(); } } /** * 连接服务器 */ private void connect(){ try { /** * 连接服务端:因为之前设置了通讯模式为非阻塞 * 这里会立即返回TCP握手是否已建立 */ if(this.channel.connect(new InetSocketAddress(this.host, this.port))){ //连接建立后,在通道上注册读事件关注,客户端一接收到数据立即触发处理 this.channel.register(this.selector, SelectionKey.OP_READ); } else{ //若连接握手未建立,则在通道上继续关注连接事件,一旦连接建立继续进行后续的处理逻辑 this.channel.register(this.selector, SelectionKey.OP_CONNECT); } } catch (IOException e) { e.printStackTrace(); this.stop(); } } /** * 选择器事件迭代处理 * @param keys 选择器事件KEY */ private void eventIterator(Set<SelectionKey> keys){ SelectionKey key = null; //这里采用迭代器,因为需要迭代时对key进行移除操作 Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()){ key = it.next(); //这里先移除事件key,避免多次处理 it.remove(); //处理迭代事件 this.proccessEvent(key); } } /** * 处理发生的事件 * @param key 选择器事件KEY */ private void proccessEvent(SelectionKey key){ //只对有效的事件类型进行处理 if(key.isValid()){ try { //在事件通道上处理 SocketChannel socketChannel = (SocketChannel) key.channel(); /**处理连接就绪事件 * */ if(key.isConnectable()){ //检测连接是否完成,避免发生导致NotYetConnectedException异常 if(socketChannel.finishConnect()){ System.out.println("Has completed connection with server.."); /** * 在通道上关注读事件,NO的写事件一般不特别关注, * 原因:写缓冲区大部分时间被认为是空闲的,会频繁被选择器选择(会浪费CPU资源), * 所以不应该频繁被注册; * 只有在写的数据超过写缓冲区可用空间时,把一部分数据刷出缓冲区后, * 有空间时再通知应用程序进行写; * 且应用程序写完后,应立即关闭写事件 */ socketChannel.register(this.selector, SelectionKey.OP_READ); }else{//这里若连接仍未建立一般视为网络或其他原因,暂时退出 this.stop(); } } /** * 处理读事件 */ if(key.isReadable()){ //开辟内存缓冲区,这里用JVM堆内存 ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE); //将通道中的数据读到缓冲区 int length = socketChannel.read(buffer); if(0 < length){ /** * 进行读写转换,NIO固定范式 */ buffer.flip(); //获取buffer可用空间 int size = buffer.remaining(); byte[] bytes = new byte[size]; //读Buffer buffer.get(bytes); //获取缓冲区数据 String result = new String(bytes,"utf-8"); System.out.println("Recevied server message: "+result); }else if(0 > length){ //取消关注当前事件,关闭通道 key.cancel(); socketChannel.close(); } } } catch (Exception e) { key.cancel(); if(null != key.channel()){ try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } } e.printStackTrace(); } } } /** * 写数据到对端 * @param data */ public void write(String data){ try { byte[] bytes = data.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); //将数据放入写缓冲区 buffer.put(bytes); buffer.flip(); this.channel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } /** * 停止运行 */ public void stop(){ this.activated = false; System.exit(-1); } /** * 客户端通讯业务核实现 */ @Override public void run() { //建立服务器连接 this.connect(); //持续监听各种事件的发生 while (this.activated){ try { //监听事件是否发生,若发生直接返回;反之阻塞至事件发生 this.selector.select(); } catch (IOException e) { e.printStackTrace(); this.stop(); } //获取发生事件的类型 Set<SelectionKey> keys = this.selector.selectedKeys(); //迭代处理事件 this.eventIterator(keys); } //关闭选择器 if(null != this.selector){ try { this.selector.close(); } catch (IOException e) { e.printStackTrace(); } } this.stop(); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:NIO客户端启动器 */ public class NioClientStarter { private static NioClientHandler clientHandler = null; /*启动运行客户端*/ public static void main(String[] args) { try { clientHandler = new NioClientHandler(Constant.SERV_HOST, Constant.SERV_PORT); new Thread(clientHandler).start(); } catch (Exception e) { e.printStackTrace(); } /** * 在控制台发实时数据到对端 */ Scanner scanner = new Scanner(System.in); while (true){ String data = scanner.next(); if(null != data && !"".equals(data)){ clientHandler.write(data); } } } }
服务端部分
/** * @author andychen https://blog.51cto.com/14815984 * @description:NIO服务端核心处理器 */ public class NioServerHandler implements Runnable{ private final int port; //定义选择器 private Selector selector = null; /** * 定义服务端通道: 与客户端类似的思路 */ private ServerSocketChannel channel = null; //服务器运行是否被激活 private volatile boolean activated = false; public NioServerHandler(int port) { this.port = port; this.init(); } /** * 初始化处理器 * 负责做好运行监听和接收之前的准备 */ private void init(){ try { //创建并打开选择器 this.selector = Selector.open(); //创建并打开监听通道 this.channel = ServerSocketChannel.open(); /** * 设置通道通讯模式为非阻塞(NIO默认为阻塞) */ this.channel.configureBlocking(false); //绑定监听的服务端口 this.channel.socket().bind(new InetSocketAddress(this.port)); /** * 注册在服务端通道上,首先关注的事件 */ this.channel.register(this.selector, SelectionKey.OP_ACCEPT); //设置运行状态激活 this.activated = true; } catch (IOException e) { e.printStackTrace(); this.stop(); } } /** * 停止服务 */ public void stop(){ this.activated = false; try { //关闭选择器 if(null != this.selector){ if(this.selector.isOpen()){ this.selector.close(); } this.selector = null; } //关闭通道 if(null != this.channel){ if(this.channel.isOpen()){ this.channel.close(); } this.channel = null; } } catch (IOException e) { e.printStackTrace(); } System.exit(-1); } /** * 在迭代处理发生的事件 * @param keys 发生的事件类型 */ private void eventIterator(Set<SelectionKey> keys){ //SelectionKey key = null; Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()){ SelectionKey key = it.next(); /** * 这里先从迭代器移除,避免后面重复执行 */ it.remove(); //处理事件 this.proccessEvent(key); } } /** * * @param key 选择执行的事件KEY */ private void proccessEvent(SelectionKey key){ //只对有效的事件KEY执行处理 if(key.isValid()){ try { /** * 处理通道接收数据事件 */ if(key.isAcceptable()){ /** * 注意这里接收事件的通道是服务端通道 */ ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); //接收客户端Socket SocketChannel channel = serverChannel.accept(); //设置其为非阻塞 channel.configureBlocking(false); //然后注册此通道的读事件 channel.register(this.selector, SelectionKey.OP_READ); System.out.println("Build connection with client.."); } /** * 处理读事件 */ if(key.isReadable()){ System.out.println("Reading client data..."); SocketChannel channel = (SocketChannel) key.channel(); //开辟内存空间,接收数据 ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE); //将数据读入缓冲区 int length = channel.read(buffer); if(0 < length){ //读写切换 buffer.flip(); //更具缓冲区数据建立转换的字节数组 byte[] bytes = new byte[buffer.remaining()]; //从缓冲区读取字节数据 buffer.get(bytes); //解码数据 String data = new String(bytes, "utf-8"); System.out.println("Recevied data: "+data); //向对端发送接收应答 String answer = "Server has recevied data:"+data; this.reply(channel, answer); }else if(0 > length){ //取消处理的事件 key.cancel(); channel.close(); } } /** * 处理写事件 */ if(key.isWritable()){ SocketChannel channel = (SocketChannel) key.channel(); //拿到写事件的buffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //若buffer中有数据,则刷到对端 if(buffer.hasRemaining()){ int length = channel.write(buffer); System.out.println("Write data "+length+" byte to client."); }else{ //若没有数据,则继续监听读事件 key.interestOps(SelectionKey.OP_READ); } } } catch (IOException e) { key.cancel(); e.printStackTrace(); } } } /** * 应答对端 * @param msg 应答消息 */ private void reply(SocketChannel channel, String msg){ //消息编码 byte[] bytes = msg.getBytes(); //开启写缓冲区 ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE); //将数据写入缓冲区 buffer.put(bytes); //切换到读事件 buffer.flip(); /** * 这里为了不出现写空或写溢出缓冲区情况,建立写事件监听同时保留之前的读监听 * 作为监听的附件传入写操作的buffer */ try { channel.register(this.selector, SelectionKey.OP_WRITE |SelectionKey.OP_READ, buffer); } catch (ClosedChannelException e) { e.printStackTrace(); } } /** * 服务端监听运行核心业务实现 */ @Override public void run() { while (this.activated){ try { /** * 运行到此方法阻塞,直到有事件发生再返回 * */ this.selector.select(); //获取被监听的事件 Set<SelectionKey> keys = this.selector.selectedKeys(); //在迭代器中,处理不同的事件 this.eventIterator(keys); } catch (IOException e) { e.printStackTrace(); this.stop(); } } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:NIO网络编程服务端启动类 */ public class NioServerStart { /** * 运行服务端监听 * @param args */ public static void main(String[] args) { String serverTag = "server: "+Constant.SERV_PORT; NioServerHandler serverHandler = null; try { serverHandler = new NioServerHandler(Constant.SERV_PORT); new Thread(serverHandler, serverTag).start(); System.out.println("Starting "+serverTag+" listening..."); } catch (Exception e) { e.printStackTrace(); if(null != serverHandler){ serverHandler.stop(); } } } }
多次验证结果
通过以上的实战,我们看到NIO网络编程实现比BIO稍微要复杂一些。面向缓冲的机制确实比面向流的机制要灵活很多;服务运行的体验也比阻塞式IO更加流畅;独有的选择器机制也让NIO可以支撑较大并发数,但学习和开发的成本稍微高一些,项目当中可以有选择地使用。
目前网络编程这块用得比较多的优秀IO框架非Netty莫属了,很多优秀的RPC框架的底层也基于Netty扩展和开发。下次我们就顺带给大家展示一下Netty的网络编程之美。