<p align="right">——日拱一卒,不期而至!</p>
你好,我是彤哥,本篇是netty系列的第七篇。
上一章我们一起学习了Java NIO的核心组件Buffer,它通常跟Channel一起使用,但是它们在网络IO中又该如何使用呢,今天我们将一起学习另一个NIO核心组件—— Selector ,没有它可以说就干不起来网络IO。
我们先来看两段Selector的注释,见类 java.nio.channels.Selector
。
A multiplexor of {@link SelectableChannel} objects.
它是 SelectableChannel
对象的多路复用器 ,从这里我们也可以知道Java NIO实际上是多路复用IO。
SelectableChannel
有几个子类,你会非常熟悉:
我们有必要复习一下 多路复用IO的流程 :
第一阶段通过select去轮询检查有没有连接准备好数据,第二阶段把数据从内核空间拷贝到用户空间。
在Java中,就是通过 Selector
这个多路复用器来实现第一阶段的。
A selector may be created by invoking the {@link #open open} method of this class, which will use the system's default {@link java.nio.channels.spi.SelectorProvider selector provider} to create a new selector. A selector may also be created by invoking the {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector} method of a custom selector provider. A selector remains open until it is closed via its {@link #close close} method.
Selector
可以通过它自己的 open()
方法创建,它将通过默认的 java.nio.channels.spi.SelectorProvider
类创建一个新的Selector。也可以通过实现 java.nio.channels.spi.SelectorProvider
类的抽象方法 openSelector()
来自定义实现一个Selector。Selector一旦创建将会一直处于open状态直到调用了 close()
方法为止。
通过跟踪源码:
> java.nio.channels.Selector#open() 1> java.nio.channels.spi.SelectorProvider#provider() 1.1> sun.nio.ch.DefaultSelectorProvider#create() // 返回WindowsSelectorProvider 2> sun.nio.ch.WindowsSelectorProvider#openSelector() // 返回WindowsSelectorImpl
可以看到,在Windows平台下,默认实现的Provider是 WindowsSelectorProvider
,它的 openSelector()
方法返回的是 WindowsSelectorImpl
,它就是Windows平台默认的Selector实现。
是滴,因为网络IO是跟操作系统息息相关的,不同的操作系统的实现可能都不一样,Linux下面JDK的实现完全不一样,那么我们为什么没有感知到呢?我的代码在Windows下面写的,拿到Linux下面不是一样运行?那是Java虚拟机(或者说Java运行时环境)帮我们把这个事干了,它屏蔽了跟操作系统相关的细节,这也是Java代码可以“Write Once, Run Anywhere”的精髓所在。
上面我们说了selector是多路复用器,它是在网络IO的第一阶段用来轮询检查有没有连接准备好数据的,那么它和Channel是什么关系呢?
Selector通过不断轮询的方式同时监听多个Channel的事件,注意,这里是 同时监听
,一旦有Channel准备好了,它就会返回这些准备好了的Channel,交给处理线程去处理。
所以,在NIO编程中,通过Selector我们就实现了一个线程同时处理多个连接请求的目标,也可以一定程序降低服务器资源的消耗。
通过调用 Selector.open()
方法是我们常用的方式:
Selector selector = Selector.open();
当然,也可以通过实现 java.nio.channels.spi.SelectorProvider.openSelector()
抽象方法自定义一个Selector。
为了将Channel跟Selector绑定在一起,需要将Channel注册到Selector上,调用Channel的 register()
方法即可:
channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
Channel必须是非阻塞模式才能注册到Selector上,所以,无法将一个FileChannel注册到Selector,因为FileChannel没有所谓的阻塞还是非阻塞模式,本文来源于工从号彤哥读源码。
注册的时候第二个参数传入的是监听的事件,一共有四种事件:
当Channel触发了某个事件,通常也叫作那个事件就绪了。比如,数据准备好可以读取了就叫作读就绪了,同样地,还有写就绪、连接就绪、接受就绪,当然后面两个不常听到。
在Java中,这四种监听事件是定义在 SelectionKey
中的:
所以,也可以通过 位或
命令监听多个感兴趣的事件:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
正如上面所看到的,Channel注册到Selector后返回的是一个 SelectionKey
,所以 SelectionKey
又可以看作是Channel和Selector之间的一座桥梁,把两者绑定在了一起。
SelectionKey
具有以下几个重要属性:
里面保存了注册Channel到Selector时传入的第二个参数,即感兴趣的事件集。
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
可以通过 位与
运算查看是否注册了相应的事件。
里面保存了就绪了的事件集。
int readySet = selectionKey.readyOps(); selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
可以通过 readyOps()
方法获取所有就绪了的事件,也可以通过 isXxxable()
方法检查某个事件是否就绪。
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
通过 channel()
和 selector()
方法可以获取绑定的Channel和Selector。
可以调用 attach(obj)
方法绑定一个对象到 SelectionKey
上,并在后面需要用到的时候通过 attachment()
方法取出绑定的对象,也可以翻译为 附件
,它可以看作是数据传递的一种媒介,跟ThreadLocal有点类似,在前面绑定数据,在后面使用。
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
当然,也可以在注册Channel到Selector的时候就绑定附件:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
一旦将一个或多个Channel注册到Selector上了,我们就可以调用它的 select()
方法了,它会返回注册时感兴趣的事件中就绪的事件,本文来源于工从号彤哥读源码。
select()方法有三种变体:
select()的返回值为int类型,表示两次select()之间就绪的Channel,即使上一次调用select()时返回的就绪Channel没有被处理,下一次调用select()也不会再返回上一次就绪的Channel。比如,第一次调用select()返回了一个就绪的Channel,但是没有处理它,第二次调用select()时又有一个Channel就绪了,那也只会返回1,而不是2。
一旦调用select()方法返回了有就绪的Channel,我们就可以使用 selectedKeys()
方法来获取就绪的Channel了。
Set<SelectionKey> selectedKeys = selector.selectedKeys();
然后,就可以遍历这些SelectionKey来查看感兴趣的事件是否就绪了:
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
最后,一定要记得调用 keyIterator.remove();
移除已经处理的SelectionKey。
前面我们说了调用select()方法时,调用者线程会进入阻塞状态,直到有就绪的Channel才会返回。其实也不一定,wakeup()就是用来破坏规则的,可以在另外一个线程调用wakeup()方法强行唤醒这个阻塞的线程,这样select()方法也会立即返回。
如果调用wakeup()时并没有线程阻塞在select()上,那么,下一次调用select()将立即返回,不会进入阻塞状态。这跟LockSupport.unpark()方法是比较类似的。
调用close()方法将会关闭Selector,同时也会将关联的SelectionKey失效,但不会关闭Channel。
public class EchoServer { public static void main(String[] args) throws IOException { // 创建一个Selector Selector selector = Selector.open(); // 创建ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 绑定8080端口 serverSocketChannel.bind(new InetSocketAddress(8080)); // 设置为非阻塞模式,本文来源于工从号彤哥读源码 serverSocketChannel.configureBlocking(false); // 将Channel注册到selector上,并注册Accept事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 阻塞在select上 selector.select(); // 如果使用的是select(timeout)或selectNow()需要判断返回值是否大于0 // 有就绪的Channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历selectKeys Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 如果是accept事件 if (selectionKey.isAcceptable()) { // 强制转换为ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = ssc.accept(); System.out.println("accept new conn: " + socketChannel.getRemoteAddress()); socketChannel.configureBlocking(false); // 将SocketChannel注册到Selector上,并注册读事件 socketChannel.register(selector, SelectionKey.OP_READ); } else if (selectionKey.isReadable()) { // 如果是读取事件 // 强制转换为SocketChannel SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 创建Buffer用于读取数据 ByteBuffer buffer = ByteBuffer.allocate(1024); // 将数据读入到buffer中 int length = socketChannel.read(buffer); if (length > 0) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; // 将数据读入到byte数组中 buffer.get(bytes); // 换行符会跟着消息一起传过来 String content = new String(bytes, "UTF-8").replace("/r/n", ""); if (content.equalsIgnoreCase("quit")) { selectionKey.cancel(); socketChannel.close(); } else { System.out.println("receive msg: " + content); } } } iterator.remove(); } } } }
今天我们学习了Java NIO核心组件Selector,到这里,NIO的三个最重要的核心组件我们就学习完毕了,说实话,NIO这块最重要的还是思维的问题,时刻记着在NIO中一个线程是可以处理多个连接的。
看着Java原生NIO实现网络编程似乎也没什么困难的吗?那么为什么还要有Netty呢?下一章我们将正式进入Netty的学习之中,我们将在其中寻找答案。
最后,也欢迎来我的工从号 彤哥读源码 系统地学习 源码&架构 的知识。