public class NIO_Demo2 { public static void main(String[] args) throws IOException, InterruptedException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 3333), 1000); serverSocketChannel.configureBlocking(false); final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()]; for (int i = 0; i < selectors.length; i++) { final Selector selector = Selector.open(); selectors[i] = selector; new Thread(new ClientProcessor(selector)).start(); } AtomicInteger id = new AtomicInteger(); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { iterator.next(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ); iterator.remove(); } } } /** * 客户端消息处理器 */ static class ClientProcessor implements Runnable { private Selector selector; public ClientProcessor(Selector selector) { this.selector = selector; } @Override public void run() { while (true) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { if (!key.isValid()) { continue; } if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuff = (ByteBuffer) key.attachment(); int read = socketChannel.read(readBuff); if (read == -1) { //通道连接关闭,可以取消这个注册键,后续不在触发。 key.cancel(); socketChannel.close(); } else { //翻转buffer,从写入状态切换到读取状态 readBuff.flip(); int position = readBuff.position(); int limit = readBuff.limit(); List<ByteBuffer> buffers = new ArrayList<>(); // 按照协议从流中分割出消息 /**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/ for (int i = position; i < limit; i++) { //读取到消息结束符 if (readBuff.get() == '/r') { ByteBuffer message = ByteBuffer.allocate(i - readBuff.position()); readBuff.limit(i); message.put(readBuff); readBuff.limit(limit); message.flip(); buffers.add(message); } } /**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/ /**将所有得到的消息发送出去**/ for (ByteBuffer buffer : buffers) { while (buffer.hasRemaining()) { socketChannel.write(buffer); } } /**将所有得到的消息发送出去**/ // 压缩readBuffer,压缩完毕后进入写入状态。并且由于长度是256,压缩之后必然有足够的空间可以写入一条消息 readBuff.compact(); } } } selectionKeys.clear(); } catch (Exception e) { e.printStackTrace(); } } } } }
socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ);
必须设置通道为 非阻塞,才能向 Selector 注册。
在发生错误的语句前添加:
socketChannel.configureBlocking(false);
注意参数值,false 为 非阻塞,true 为 阻塞。
点击量:0