介绍了jdk实现nio的关键Selector以及SelectableChannel,了解了它的原理,就明白了netty为什么是事件驱动模型:(netty极简教程(四): Selector事件驱动以及SocketChannel 的使用 ,接下来将它的使用更深入一步, nio reactor模型演进以及聊天室的实现;
示例源码: github.com/jsbintask22…
对于io消耗而言,我们知道提升效率的关键在于服务端对于io的使用;而nio压榨cpu的关键在于使用 Selector
实现的 reactor
事件模型以及多线程的加入时机:
省略Selector以及ServerSocketChannel的获取注册; 将所有的操作至于reactor主线程
while (true) { // 1 if (selector.select(1000) == 0) { // 2 continue; } Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); // 3 while (selectedKeys.hasNext()) { SelectionKey selectionKey = selectedKeys.next(); SelectableChannel channel = selectionKey.channel(); if (selectionKey.isAcceptable()) { // 4 ServerSocketChannel server = (ServerSocketChannel) channel; SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE)); String serverGlobalInfo = "系统消息:用户[" + client.getRemoteAddress() + "]上线了"; System.err.println(serverGlobalInfo); forwardClientMsg(serverGlobalInfo, client); // 5 } else if (selectionKey.isReadable()) { SocketChannel client = (SocketChannel) channel; SocketAddress remoteAddress = null; try { remoteAddress = client.getRemoteAddress(); String clientMsg = retrieveClientMsg(selectionKey); if (clientMsg.equals("")) { return; } System.err.println("收到用户[" + remoteAddress + "]消息:" + clientMsg); forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client); // 6 } catch (Exception e) { String msg = "系统消息:" + remoteAddress + "下线了"; forwardClientMsg(msg, client); System.err.println(msg); selectionKey.cancel(); // 7 try { client.close(); } catch (IOException ex) { ex.printStackTrace(); } } } selectedKeys.remove(); } } 复制代码
SelectionKey Accept Read
读取以及广播消息方法如下:
SocketChannel client = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = (ByteBuffer) selectionKey.attachment(); int len = client.read(buffer); if (len == 0) { return ""; } buffer.flip(); byte[] data = new byte[buffer.remaining()]; int index = 0; while (len != index) { data[index++] = buffer.get(); } buffer.clear(); return new String(data, StandardCharsets.UTF_8); 复制代码
Set<SelectionKey> allClient = selector.keys(); allClient.forEach(selectionKey -> { SelectableChannel channel = selectionKey.channel(); if (!(channel instanceof ServerSocketChannel) && channel != client) { // 1 SocketChannel otherClient = (SocketChannel) channel; try { otherClient.write(ByteBuffer.wrap(clientMsg.getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { e.printStackTrace(); } } }); 复制代码
从Selector上获取所有注册的Channel然后遍历,如果不是ServerSocketChannel或者当前消息的Channel,就将消息发送出去.
以上,所有代码放在同一线程中,对于单核cpu而言,相比于bio的 Socket
编程,我们主要有一个方面的改进
accept read
而对于多核cpu而言,Selector虽然能够有效规避accept和read的无用等待时间,可是它依然存在一些问题;
select
基于上面的单线程问题考虑,我们可以将io操作放入线程池中处理:
if (selectionKey.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) channel; SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE)); String serverGlobalInfo = "系统消息:用户[" + client.getRemoteAddress() + "]上线了"; System.err.println(serverGlobalInfo); executorService.submit(() -> { // 1 forwardClientMsg(serverGlobalInfo, client); }); } else if (selectionKey.isReadable()) { executorService.submit(() -> { // 2 SocketChannel client = (SocketChannel) channel; SocketAddress remoteAddress = null; try { remoteAddress = client.getRemoteAddress(); String clientMsg = retrieveClientMsg(selectionKey); if (clientMsg.equals("")) { return; } System.err.println("收到用户[" + remoteAddress + "]消息:" + clientMsg); forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client); } catch (Exception e) { String msg = "系统消息:" + remoteAddress + "下线了"; forwardClientMsg(msg, client); System.err.println(msg); selectionKey.cancel(); try { client.close(); } catch (IOException ex) { ex.printStackTrace(); } } }); } selectedKeys.remove(); } 复制代码
在 1与2处,我们加入了线程池处理,不再在reactor主线程中做任何io操作。 这便是reactor多线程模型
虽然模型2有效利用了多核cpu优势,可是依然能够找到瓶颈
基于以上问题,我们可以考虑引入多个 Selector
,这样主Selector只负责读取accept操作,而其他的io操作均有子Selector负责,这便是多Reactor多线程模型
基于上面的思考,我们要在单Reactor多线程模型上主要需要以下操作
子Selector
基于以上,会增加一个子Selector列表,并且将原来的accept以及读取广播分开; private List<Selector> subSelector = new ArrayList<>(8);
定义一个包含8个子selector的列表并进行初始化
如图,分别开启了一个reactor主线程,以及8个子selector子线程,其中,主线程现在只进行accept然后添加至子selector
while (true) { if (mainSelector.select(1000) == 0) { continue; } Iterator<SelectionKey> selectedKeys = mainSelector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey selectionKey = selectedKeys.next(); SelectableChannel channel = selectionKey.channel(); if (selectionKey.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) channel; SocketChannel client = server.accept(); client.configureBlocking(false); client.register(subSelector.get(index++), SelectionKey.OP_READ, // 1 ByteBuffer.allocate(CLIENT_BUFFER_SIZE)); if (index == 8) { // 2 index = 0; } String serverGlobalInfo = "系统消息:用户[" + client.getRemoteAddress() + "]上线了"; System.err.println(serverGlobalInfo); forwardClientMsg(serverGlobalInfo, client); } } selectedKeys.remove(); } 复制代码
所有的从Selector只进行io操作,并且本身已经在异步线程中运行
while (true) { if (subSelector.select(1000) == 0) { continue; } Iterator<SelectionKey> selectedKeys = subSelector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey selectionKey = selectedKeys.next(); SelectableChannel channel = selectionKey.channel(); if (selectionKey.isReadable()) { SocketChannel client = (SocketChannel) channel; SocketAddress remoteAddress = null; try { remoteAddress = client.getRemoteAddress(); String clientMsg = retrieveClientMsg(selectionKey); // 1 if (clientMsg.equals("")) { return; } System.err.println("收到用户[" + remoteAddress + "]消息:" + clientMsg); forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client); // 2 } catch (Exception e) { String msg = "系统消息:" + remoteAddress + "下线了"; forwardClientMsg(msg, client); System.err.println(msg); selectionKey.cancel(); try { client.close(); } catch (IOException ex) { ex.printStackTrace(); } } } selectedKeys.remove(); } 复制代码
事实上,在netty的线程模型中,与上方的 多Reactor多线程模型类似
,一个改进版的多路复用多Reactor模型; Reactor主从线程模型
基于以上思考,我们将在后面在netty源码中进行一一验证。