Java NIO全称为Non-blocking IO或者New IO,从名字我们知道NIO是非阻塞的IO,而Java IO则是阻塞的IO。在一般的情况下阻塞是低效率的,特别是在高并发的场景下面,因此Java引入了NIO。NIO相比IO来说主要有以下几个区别:
NIO中主要有以下三个概念:通道、缓冲区和Selectors。
Java NIO Channel通道和流非常相似,主要有以下几点区别:
Java的NIO读写都是在通道中进行的,通道涵盖了网络UDP,TCP网络IO和文件IO:
DatagramChannel用于处理UDP连接。
DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(8888)); 复制代码
ByteBuffer buf = ByteBuffer.allocate(1024); buf.clear(); channel.receive(buf); 复制代码
String msg = "Current time is: " + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(1024); buf.clear(); buf.put(msg.getBytes()); buf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("host", port)); 复制代码
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("host", 80)); 复制代码
ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = socketChannel.read(buf); 复制代码
如果 read()返回 -1, 表明连接已经中断。
String msg = "Current Time is: " + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(msg.getBytes()); buf.flip(); while(buf.hasRemaining()) { channel.write(buf); } 复制代码
socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("host", 80)); while (!socketChannel.finishConnect()) { } 复制代码
我们可以设置 SocketChannel 为异步模式, 这样 connect, read, write 都是异步的了。在异步模式中, 或许连接还没有建立, connect 方法就返回了, 因此我们需要检查当前是否是连接到了主机,因此通过一个 while 循环来判断。
RandomAccessFile aFile = new RandomAccessFile("test.txt", "rw"); FileChannel inChannel = aFile.getChannel(); 复制代码
ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); 复制代码
String newData = "Current time is: " + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(1024); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while (buf.hasRemaining()) { channel.write(buf); } 复制代码
channel.close(); 复制代码
long pos = channel.position(); channel.position(pos + 123); 复制代码
我们可以通过 channel.size()获取关联到这个 Channel 中的文件的大小。注意, 这里返回的是文件的大小, 而不是 Channel 中剩余的元素个数。
channel.truncate(1024); 复制代码
将文件的大小截断为1024字节。
channel.force(true); 复制代码
强制将缓存中的数据写入文件中:
ServerSocketChannel顾名思义,它是用来监听server端的socket连接。 打开和关闭
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.close(); 复制代码
使用ServerSocketChannel的accept()方法来监听客户端的TCP连接请求,accept()方法是阻塞的,直到有连接进来。
while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //do something with socketChannel... } 复制代码
如果设定ServerSocketChannel是非阻塞的,则accept()方法不会阻塞。如果返回的是null证明没有新的连接,如果不是null,则有新的连接请求。
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); if(socketChannel != null) { // do something with socketChannel... } } 复制代码
Java NIO Buffers用于和NIO Channel交互。正如你已经知道的,我们从channel中读取数据到buffers里,从buffer把数据写入到channels。
buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。
利用Buffer读写数据,通常遵循四个步骤:
当写入数据到buffer中时,buffer会记录已经写入的数据大小。当需要读数据时,通过flip()方法把buffer从写模式调整为读模式;在读模式下,可以读取所有已经写入的数据。
当读取完数据后,需要清空buffer,以满足后续写入操作。清空buffer有两种方式:调用clear()或compact()方法。clear会清空整个buffer,compact则只清空已读取的数据,未被读取的数据会被移动到buffer的开始位置,写入位置则近跟着未读数据之后。
这里有一个简单的buffer案例,包括了write,flip和clear操作:
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel(); // create buffer with capacity of 48 bytes ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); //read into buffer. while (bytesRead != -1) { buf.flip(); // make buffer ready for read while(buf.hasRemaining()){ System.out.print((char) buf.get()); // read 1 byte at a time } buf.clear(); //make buffer ready for writing bytesRead = inChannel.read(buf); } aFile.close(); 复制代码
Buffer的容量,位置,上限(Buffer Capacity, Position and Limit) buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据。这块内存被NIO Buffer管理,并提供一系列的方法用于更简单的操作这块内存。
一个Buffer有三个属性是必须掌握的,分别是:
position和limit的具体含义取决于当前buffer的模式。capacity在两种模式下都表示容量。
下面有张示例图,描诉了不同模式下position和limit的含义:
作为一块内存,buffer有一个固定的大小,叫做capacity容量。也就是最多只能写入容量值得字节,整形等数据。一旦buffer写满了就需要清空已读数据以便下次继续写入新的数据。
当写入数据到Buffer的时候需要中一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity - 1。
当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归零,每次读取后,position向后移动。
在写模式,limit的含义是我们所能写入的最大数据量。它等同于buffer的容量。
一旦切换到读模式,limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。
数据读取的上限时buffer中已有的数据,也就是limit的位置(原position所指的位置)。
Java NIO有如下具体的Buffer类型:
ByteBuffer MappedByteBuffer CharBuffer DoubleBuffer FloatBuffer IntBuffer LongBuffer ShortBuffer 正如你看到的,Buffer的类型代表了不同数据类型,换句话说,Buffer中的数据可以是上述的基本类型;
为了获取一个Buffer对象,你必须先分配。每个Buffer实现类都有一个allocate()方法用于分配内存。下面看一个实例,开辟一个48字节大小的buffer:
ByteBuffer buf = ByteBuffer.allocate(48);
开辟一个1024个字符的CharBuffer:
CharBuffer buf = CharBuffer.allocate(1024);
写数据到Buffer有两种方法:
int bytesRead = inChannel.read(buf); // read into buffer
buf.put(127);
put方法有很多不同版本,对应不同的写数据方法。例如把数据写到特定的位置,或者把一个字节数据写入buffer。看考JavaDoc文档可以查阅的更多数据。
flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。也就是说,现在position代表的是读取位置,limit表示的是已写入的数据位置。
从Buffer读取数据(Reading Data from a Buffer) 冲Buffer读数据也有两种方式。
// read from buffer into channel. int bytesWritten = inChannel.write(buf); 复制代码
调用get读取数据的例子:
byte aByte = buf.get();
get也有诸多版本,对应了不同的读取方式。
Buffer.rewind()方法将position置为0,这样我们可以重复读取buffer中的数据。limit保持不变。
一旦我们从buffer中读取完数据,需要复用buffer为下次写数据做准备。只需要调用clear或compact方法。
clear方法会重置position为0,limit为capacity,也就是整个Buffer清空。实际上Buffer中数据并没有清空,我们只是把标记为修改了。
如果Buffer还有一些数据没有读取完,调用clear就会导致这部分数据被“遗忘”,因为我们没有标记这部分数据未读。
针对这种情况,如果需要保留未读数据,那么可以使用compact。 因此compact和clear的区别就在于对未读数据的处理,是保留这部分数据还是一起清空。
通过mark方法可以标记当前的position,通过reset来恢复mark的位置,这个非常像canvas的save和restore:
buffer.mark(); // call buffer.get() a couple of times, e.g. during parsing. buffer.reset(); // set position back to mark. 复制代码
Selector是Java NIO中的一个组件,用于检查一个或多个NIO Channel的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
用单线程处理多个channels的好处是我需要更少的线程来处理channel。实际上,你甚至可以用一个线程来处理所有的channels。从操作系统的角度来看,切换线程开销是比较昂贵的,并且每个线程都需要占用系统资源,因此暂用线程越少越好。
需要留意的是,现代操作系统和CPU在多任务处理上已经变得越来越好,所以多线程带来的影响也越来越小。如果一个CPU是多核的,如果不执行多任务反而是浪费了机器的性能。不过这些设计讨论是另外的话题了。简而言之,通过Selector我们可以实现单线程操作多个channel。
这有一幅示意图,描述了单线程处理三个channel的情况:
创建一个Selector可以通过Selector.open()方法: Selector selector = Selector.open();
为了同Selector挂了Channel,我们必须先把Channel注册到Selector上,这个操作使用SelectableChannel.register():
channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); 复制代码
Channel必须是非阻塞的。所以FileChannel不适用Selector,因为FileChannel不能切换为非阻塞模式。Socket channel可以正常使用。
注意register的第二个参数,这个参数是一个“关注集合”,代表我们关注的channel状态,有四种基础类型可供监听:
一个channel触发了一个事件也可视作该事件处于就绪状态。因此当channel与server连接成功后,那么就是“连接就绪”状态。server channel接收请求连接时处于“可连接就绪”状态。channel有数据可读时处于“读就绪”状态。channel可以进行数据写入时处于“写就绪”状态。
上述的四种就绪状态用SelectionKey中的常量表示如下:
如果对多个事件感兴趣可利用位的或运算结合多个常量,比如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
在上一小节中,我们利用register方法把Channel注册到了Selectors上,这个方法的返回值是SelectionKeys,这个返回的对象包含了一些比较有价值的属性:
这个“关注集合”实际上就是我们希望处理的事件的集合,它的值就是注册时传入的参数,我们可以用按为与运算把每个事件取出来:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE; 复制代码
"就绪集合"中的值是当前channel处于就绪的值,一般来说在调用了select方法后都会需要用到就绪状态,select的介绍在胡须文章中继续展开。
int readySet = selectionKey.readyOps();
从“就绪集合”中取值的操作类似于“关注集合”的操作,当然还有更简单的方法,SelectionKey提供了一系列返回值为boolean的的方法:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable(); 复制代码
从SelectionKey操作Channel和Selector非常简单:
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector(); 复制代码
我们可以给一个SelectionKey附加一个Object,这样做一方面可以方便我们识别某个特定的channel,同时也增加了channel相关的附加信息。例如,可以把用于channel的buffer附加到SelectionKey上:
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment(); 复制代码
附加对象的操作也可以在register的时候就执行:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
一旦我们向Selector注册了一个或多个channel后,就可以调用select来获取channel。select方法会返回所有处于就绪状态的channel。 select方法具体如下:
selectNow()不会阻塞,根据当前状态立刻返回合适的channel。
select()方法的返回值是一个int整形,代表有多少channel处于就绪了。也就是自上一次select后有多少channel进入就绪。举例来说,假设第一次调用select时正好有一个channel就绪,那么返回值是1,并且对这个channel做任何处理,接着再次调用select,此时恰好又有一个新的channel就绪,那么返回值还是1,现在我们一共有两个channel处于就绪,但是在每次调用select时只有一个channel是就绪的。
在调用select并返回了有channel就绪之后,可以通过选中的key集合来获取channel,这个操作通过调用selectedKeys()方法:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
还记得在register时的操作吧,我们register后的返回值就是SelectionKey实例,也就是我们现在通过selectedKeys()方法所返回的SelectionKey。
遍历这些SelectionKey可以通过如下方法:
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } 复制代码
上述循环会迭代key集合,针对每个key我们单独判断他是处于何种就绪状态。
注意keyIterator.remove()方法的调用,Selector本身并不会移除SelectionKey对象,这个操作需要我们收到执行。当下次channel处于就绪是,Selector任然会吧这些key再次加入进来。
SelectionKey.channel返回的channel实例需要强转为我们实际使用的具体的channel类型,例如ServerSocketChannel或SocketChannel.
由于调用select而被阻塞的线程,可以通过调用Selector.wakeup()来唤醒即便此时已然没有channel处于就绪状态。具体操作是,在另外一个线程调用wakeup,被阻塞与select方法的线程就会立刻返回。
当操作Selector完毕后,需要调用close方法。close的调用会关闭Selector并使相关的SelectionKey都无效。channel本身不管被关闭。
这有一个完整的案例,首先打开一个Selector,然后注册channel,最后检测Selector的状态:
Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while (true) { int readyChannels = selector.select(); if (readyChannels == 0) continue; Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } } 复制代码