首先来看一些 Channel 的基础定义:
public interface Channel extends Closeable { public boolean isOpen(); public void close() throws IOException; }
与 Buffer 不同,Channel 的 API 主要是通过接口来定义,不同的操作系统上 Channel 的实现会有根本差别,所以 API 只会描述可以做什么。
InterruptibleChannel 是一个标记接口,表示该 Channel 是可以中断的。
从 Channel 引申出来的接口都是面向字节的子接口, WritableByteChannel 和 ReadableByteChannel。Channel 只能在 ByteBuffer 上操作。
打开 Channel
I/O 可以分为广义的两大类别:File I/O 和 Stream I/O。发现的 Channel 有如下:
- FileChannel
- SocketChannel,ServerSocketChannel,DatagramChannel
Socket 通道可以通过工厂来创建,但是 FileChannel 只能通过 RandomAccessFile、FileInputStream 或 FileOutputStream 对象上调用 getChannel( )方法来获取。
SocketChannel sc = SocketChannel.open( ); sc.connect (new InetSocketAddress ("somehost", someport)); ServerSocketChannel ssc = ServerSocketChannel.open( ); ssc.socket( ).bind (new InetSocketAddress (somelocalport)); DatagramChannel dc = DatagramChannel.open( ); RandomAccessFile raf = new RandomAccessFile ("somefile", "r"); FileChannel fc = raf.getChannel( );
使用 Channel
Channel 把数据传输给 ByteBuffer 对象或者从 ByteBuffer 对象当中获取数据。
public interface ReadableByteChannel extends Channel { public int read(ByteBuffer dst) throws IOException; } public interface WritableByteChannel extends Channel { public int write(ByteBuffer src) throws IOException; } public interface ByteChannel extends ReadableByteChannel, WritableByteChannel {}
通道可以是单向(unidirectional)或者双向的(bidirectional)。一个 channel 类可能实现定义 read( )方法的 ReadableByteChannel 接口,而另一个 channel 类也许实现 WritableByteChannel 接口以 供 write( )方法。实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果 一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。
SocketChannel 一直都是双向的,但是 FileChannel 可不全是,从 FileInputStream 获取的 FileChannel 是可读的,从 FileOutputStream 获取的 FileChannel 是可写的。
ByteChannel 的 read() 和 write() 方法使用 ByteBuffer 对象作为参数。两种方法均返回已传输的字节数,可能比缓冲区的字节数少甚至可能为零。缓冲区的位置也会发生与已传输字节相同数量的前移。如果只进行了部分传输,缓冲区可以被重新 交给通道并从上次中断的地方继续传输。该过程重复进行直到缓冲区的 hasRemaining() 方法返回 false 值。
如下是从一个 Channel 复制数据到另一个 Channel:
package nio.test; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; public class ChannelCopy { public static void main(String[] argv) throws IOException { ReadableByteChannel source = Channels.newChannel(System.in); WritableByteChannel dest = Channels.newChannel(System.out); channelCopy1(source, dest); // alternatively, call channelCopy2 (source, dest); source.close(); dest.close(); } private static void channelCopy1(ReadableByteChannel src, WritableByteChannel dest) throws IOException { ByteBuffer buffer = ByteBuffer.allocateDirect(8); while (src.read(buffer) != -1) { buffer.flip(); dest.write(buffer); buffer.compact();//如果没写完的话,下次继续写 System.out.println(); } buffer.flip(); while (buffer.hasRemaining()) { dest.write(buffer); } } private static void channelCopy2(ReadableByteChannel src, WritableByteChannel dest) throws IOException { ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024); while (src.read(buffer) != -1) { buffer.flip(); while (buffer.hasRemaining()) { dest.write(buffer); } buffer.clear(); } } }
与 Buffer 不同, Channel 不能被重复使用。一个打开的通道即代表与一个特定 I/O 服务的特定连接并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。
调用通道的 close()方法时,可能会导致在通道关闭底层 I/O 服务时线程暂时阻塞,哪怕该通道处于非阻塞模式。
如果一个 Channel 实现 InterruptibleChannel 接口,那么一个线程在一个 Channel 上被阻塞并且同时被中断时,这个 Channel 会被关闭,该线程也会产生一个 ClosedByInterruptException 异常。
Scatter/Gather
Scatter/Gather 的概念是指在多个缓冲区上实现一个简单的 I/O 操作。
对于一个 write 操作而言,数据是从几个缓冲区按顺序抽取(称为 gather)并沿着 Channel 发送的。
对于 read 操作而言,从通道读取的数据会按顺序被散布(称为 scatter)到多个缓冲区,将每个缓冲区填满直至 Channel 中的数据或者缓冲区的最大空间被消耗完。
public interface ScatteringByteChannel extends ReadableByteChannel { public long read(ByteBuffer[] dsts) throws IOException; public long read(ByteBuffer[] dsts, int offset, int length) throws IOException; } public interface GatheringByteChannel extends WritableByteChannel { public long write(ByteBuffer[] srcs) throws IOException; public long write(ByteBuffer[] srcs, int offset, int length) throws IOException; }
通过例子来理解概念:
channel 连接到一个有 48 字节数据等待读取的 socket 上:
ByteBuffer header = ByteBuffer.allocateDirect (10); ByteBuffer body = ByteBuffer.allocateDirect (80); ByteBuffer [] buffers = { header, body }; int bytesRead = channel.read (buffers);
一旦 read( )方法返回,bytesRead 就被赋予值 48,header 缓冲区将包含前 10 个从通道读取的字节而 body 缓冲区则包含接下来的 38 个字节。
用一个 gather 操作将多个缓冲区的数据组合并发送出去。
body.clear( ); body.put("FOO".getBytes()).flip( ); // "FOO" as bytes header.clear( ); header.putShort (TYPE_FILE).putLong(body.limit()).flip( ); long bytesWritten = channel.write (buffers);
带 offset 和 length 参数版本的 read( ) 和 write( )方法使得我们可以使用缓冲区 列的子集 缓冲区。
这里的 offset 值指哪个缓冲区将开始被使用,而不是指数据的 offset。
举个例子,假设我们有一个五元素的 fiveBuffers 列,它已经被 初始化并引用了五个缓冲区,下面的代码将会写第二个、第三个和第四个缓冲区的内容:
int bytesRead = channel.write (fiveBuffers, 1, 3);
【参考资料】
- Java Nio
—EOF—