我前段时间的一篇博客 java网络编程——多线程数据收发并行 总结了服务端与客户端之间的收发并行实践。原理很简单,就是针对单一客户端,服务端起两个线程分别负责read和write操作,然后线程保持阻塞等待读写执行。
事实上,这样的模式非常糟糕。因为每一个客户端在服务端需要占用两条线程,假如有1000个客户端,则需要2000+条线程。cpu需要花费大量的时间进行线程上下文切换,造成系统资源浪费。
想要缩减线程数量,先要解决阻塞问题。而NIO可以通过IO多路复用将read和write的阻塞给抹去。再配合线程池,即可实现用少量的线程支撑起上百万个客户端的连接。
java NIO全称java non-blocking IO。字面意思即非阻塞式IO。实际上这里的非阻塞只是宏观的说法。
关于IO模式,这里引一个别人的博客,介绍了几种IO模式的区别:
简述同步IO、异步IO、阻塞IO、非阻塞IO之间的联系与区别
本博客不再赘述这些,只是想说NIO属于其中的IO复用模型。(实验室里有一本《UNIX网络编程》疫情结束回学校一定把这部分好好看看)
多路复用IO模型中,会有一个线程去不断轮询多个socket的状态,当socket有读写事件时,才来调用IO操作。因为是一个线程来管理多个socket,系统不需要建立其它线程、维护线程,只有socket就绪时,才会使用IO资源,所以它大大降低了资源占用。
java NIO中,使用selector.select()监听多个通道是否有到达事件,没有事件就一直阻塞,有事件就调用IO进行处理。
详细介绍如下
这里以服务端读取客户端消息的流程为例,介绍NIO的使用(完整内容只有输入,暂且不管输出)。画了一个流程图,如下所示:
selector = Selector.open(); ServerSocketChannel server = ServerSocketChannel.open(); // 设置为非阻塞 server.configureBlocking(false); // 绑定本地端口 server.socket().bind(new InetSocketAddress(port)); // 注册客户端连接到达监听 server.register(selector, SelectionKey.OP_ACCEPT);
同时还要建立readSelector和writeSelector。其实线程池也是提前建立的,这里暂且不写。
readSelector = Selector.open(); writeSelector = Selector.open();
//select()方法返回已就绪的通道数 if (selector.select() == 0) { continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 检查当前Key的状态是否是accept的 // 客户端到达状态 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 非阻塞状态拿到客户端连接 SocketChannel socketChannel = serverSocketChannel.accept(); try { // 客户端构建异步线程 // 添加同步处理 //此处代码暂且忽略 } catch (IOException e) { e.printStackTrace(); System.out.println("客户端连接异常:" + e.getMessage()); } }
/** *参数分别是:channel,对应的selector,以及 *registerOps:待注册的操作集,这个在后文中有详细解析; *locker:用于标识同步代码块的状态,是锁定还是可用; *runnable:执行具体读写操作的类,送给线程池执行; *map:建立SelectionKey与Runnable映射关系的HashMap。 */ private static SelectionKey registerSelection(SocketChannel channel, Selector selector, int registerOps, AtomicBoolean locker, HashMap<SelectionKey, Runnable> map, Runnable runnable) { synchronized (locker) { // 设置锁定状态 locker.set(true); try { // 唤醒当前的selector,让selector不处于select()状态 //注册channel时一定要将selector唤醒,否则当前select中没有刚注册的channel selector.wakeup(); SelectionKey key = null; if (channel.isRegistered()) { // 查询是否已经注册过 key = channel.keyFor(selector); if (key != null) { //将新的Ops添加进去 key.interestOps(key.readyOps() | registerOps); } } if (key == null) { // 注册selector得到Key key = channel.register(selector, registerOps); // 注册回调 map.put(key, runnable); } return key; } catch (ClosedChannelException e) { return null; } finally { // 解除锁定状态 locker.set(false); try { // 通知 locker.notify(); } catch (Exception ignored) { } } } }
try { if (readSelector.select() == 0) { continue; } Set<SelectionKey> selectionKeys = readSelector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { if (selectionKey.isValid()) { //IO处理代码,暂且忽略 } } selectionKeys.clear(); } catch (IOException e) { e.printStackTrace(); }
注意:以上都是一些代码片段,没有完全串联起来,省略了一些类对象调用、方法调用以及关键的线程池操作等等。但是基本的方法已经展示出来,剩下的后面的博客再去填坑。
光看上面的代码,对于一些NIO方法的认知还是很模糊的。下面通过阅读selector类和SelectionKey类的源码注释,来加深对部分方法的理解。
selector是NIO的核心类,下面是选择器的一些重要方法:
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
注册进selector的任何一个channel都用一个SelectionKey对象来指代。
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
interest set:兴趣集;一个channel所有的操作集;可通过interestOps(int)方法来更新
ready-operation set:就绪操作集,只包含使得channel被报告就绪的操作,底层通过与或操作来更新;例如当一个channel读取就绪时,将read操作集加入到就绪集中。
终于写完了,这篇博客只能算是对NIO简单介绍,一些东西还没讲到。channel和buffer部分的方法没有分析,线程池部分没有加上,还有输出操作那一套,都没讲。总想尽可能多地详细完整一点,但是越深入,知识点就越庞大,所以只能放弃一部分内容,于是成了现在这个样子。如果详细规划一下拆开多个博客写会更好。