异步IO在IO操作的两个阶段,都不会使线程阻塞。 Java 的 I/O 依赖于操作系统的实现。
Java NIO 的通道类似流,但又有些不同:
关键的Buffer实现 ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer
Buffer两种模式、三个属性:
capacity
作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。
position
当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1. 当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0. 当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
limit
在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity。 当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)
参考链接:Buffer原理 www.cnblogs.com/chenpi/p/64…
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
select()方法
select()阻塞到至少有一个通道在你注册的事件上就绪了。 select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
selectedKeys()方法
调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。
参考链接:操作系统层面分析Selector原理 zhhphappy.iteye.com/blog/203289…
public class NIOServerSocket { //存储SelectionKey的队列 private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>(); private static Selector selector = null; //添加SelectionKey到队列 public static void addWriteQueue(SelectionKey key){ synchronized (writeQueue) { writeQueue.add(key); //唤醒主线程 selector.wakeup(); } } public static void main(String[] args) throws IOException { // 1.创建ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 2.绑定端口 serverSocketChannel.bind(new InetSocketAddress(60000)); // 3.设置为非阻塞 serverSocketChannel.configureBlocking(false); // 4.创建通道选择器 selector = Selector.open(); /* * 5.注册事件类型 * * sel:通道选择器 * ops:事件类型 ==>SelectionKey:包装类,包含事件类型和通道本身。四个常量类型表示四种事件类型 * SelectionKey.OP_ACCEPT 获取报文 SelectionKey.OP_CONNECT 连接 * SelectionKey.OP_READ 读 SelectionKey.OP_WRITE 写 */ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { System.out.println("服务器端:正在监听60000端口"); // 6.获取可用I/O通道,获得有多少可用的通道 int num = selector.select(); if (num > 0) { // 判断是否存在可用的通道 // 获得所有的keys Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 使用iterator遍历所有的keys Iterator<SelectionKey> iterator = selectedKeys.iterator(); // 迭代遍历当前I/O通道 while (iterator.hasNext()) { // 获得当前key SelectionKey key = iterator.next(); // 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。 iterator.remove(); // 判断事件类型,做对应的处理 if (key.isAcceptable()) { ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = ssChannel.accept(); System.out.println("处理请求:"+ socketChannel.getRemoteAddress()); // 获取客户端的数据 // 设置非阻塞状态 socketChannel.configureBlocking(false); // 注册到selector(通道选择器) socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { System.out.println("读事件"); //取消读事件的监控 key.cancel(); //调用读操作工具类 NIOHandler.read(key); } else if (key.isWritable()) { System.out.println("写事件"); //取消读事件的监控 key.cancel(); //调用写操作工具类 NIOHandler.write(key); } } }else{ synchronized (writeQueue) { while(writeQueue.size() > 0){ SelectionKey key = writeQueue.remove(0); //注册写事件 SocketChannel channel = (SocketChannel) key.channel(); Object attachment = key.attachment(); channel.register(selector, SelectionKey.OP_WRITE,attachment); } } } } } } 复制代码
public class NIOHandler { //构造线程池 private static ExecutorService executorService = Executors.newFixedThreadPool(10); public static void read(final SelectionKey key){ //获得线程并执行 executorService.submit(new Runnable() { @Override public void run() { try { SocketChannel readChannel = (SocketChannel) key.channel(); // I/O读数据操作 ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int len = 0; while (true) { buffer.clear(); len = readChannel.read(buffer); if (len == -1) break; buffer.flip(); while (buffer.hasRemaining()) { baos.write(buffer.get()); } } System.out.println("服务器端接收到的数据:"+ new String(baos.toByteArray())); //将数据添加到key中 key.attach(baos); //将注册写操作添加到队列中 NIOServerSocket.addWriteQueue(key); } catch (IOException e) { e.printStackTrace(); } } }); } public static void write(final SelectionKey key) { //拿到线程并执行 executorService.submit(new Runnable() { @Override public void run() { try { // 写操作 SocketChannel writeChannel = (SocketChannel) key.channel(); //拿到客户端传递的数据 ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment(); System.out.println("客户端发送来的数据:"+new String(attachment.toByteArray())); ByteBuffer buffer = ByteBuffer.allocate(1024); String message = "你好,我是服务器!!"; buffer.put(message.getBytes()); buffer.flip(); writeChannel.write(buffer); writeChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } 复制代码
public class NIOClientSocket { public static void main(String[] args) throws IOException { //使用线程模拟用户 并发访问 for (int i = 0; i < 1; i++) { new Thread(){ public void run() { try { //1.创建SocketChannel SocketChannel socketChannel=SocketChannel.open(); //2.连接服务器 socketChannel.connect(new InetSocketAddress("localhost",60000)); //写数据 String msg="我是客户端"+Thread.currentThread().getId(); ByteBuffer buffer=ByteBuffer.allocate(1024); buffer.put(msg.getBytes()); buffer.flip(); socketChannel.write(buffer); socketChannel.shutdownOutput(); //读数据 ByteArrayOutputStream bos = new ByteArrayOutputStream(); int len = 0; while (true) { buffer.clear(); len = socketChannel.read(buffer); if (len == -1) break; buffer.flip(); while (buffer.hasRemaining()) { bos.write(buffer.get()); } } System.out.println("客户端收到:"+new String(bos.toByteArray())); socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } }; }.start(); } } } 复制代码
参考链接:SocketChannel.read blog.csdn.net/cao47820824…
参考链接:NIO坑 www.jianshu.com/p/1af407c04…