Netty
是JBOSS提供的一款Java的开源工具,是基于 NIO
的客户端/服务端的编码框架,同时 Netty
也具有高性能,高扩展,异步事件驱动等特性受到各类应用的深切拥戴
基于 Netty
,可以快速开发网络服务器和客户端的应用程序
Netty
应用广泛,已经有成百上千的分布式中间件,各种开源项目以及各类商业项目的应用。如: Kafka
、 ElasticSearch
、 Dubbo
等都在使用 Netty
,这广泛的使用率对于它的巨大优点是密不可分的,大致总结如下:
这里的 异步事件驱动 其实可以分为:
表示为非阻塞,标准IO操作都是阻塞模式的
Future-Listener机制,方便主动获取获通过通知机制获得IO操作结果
学习 Netty
前,我们需要做一些准备工作,准备工作做得好,学习 Netty
不烦恼
在 JDK1.4之前
,Java的IO操作都是阻塞的,为了弥补不足,引入了新异步IO库: javaNewIO
,简称 NIO
Reactor是高性能网络编程在设计和架构层面的基础模式,了解了 Reactor
反应器模式,才能轻松的学习和掌握 Netty
,而且 Netty
的整体架构就是 Reactor
反应器模式
掌握以上两大知识点,我们再开始我们的 Netty
学习旅程,那么接下来我们先从 javaNIO
开始说起
javaNIO是一个基于缓冲区的,基于通道的I/O操作方法,不同于 标准I/O 操作方法, 标准I/O操作 的读写都是阻塞模式,而 NIO 的读写为非阻塞模式,且 javaNIO 的效率远高于 标准I/O
那么, NIO 是如何做到非阻塞的呢,我们先看下它的IO模型
在Java中,socket连接模式默认是阻塞模式,但是在Linux下,可以通过设置将socket变成非阻塞模式,使用非阻塞模式的IO读写,叫做 同步非阻塞IO ,出现以下两种情况:
这种情况下,如果为了读取到最终的数据,用户线程需要不断轮询,直到出现存在数据的情况,这种方式的缺点很明显:
所以为了避免同步非阻塞IO中轮询等待的问题,引出了 IO多路复用模型
在 IO多路复用模型 中,引入了一个新的系统调用: 查询IO的就绪状态 ,通过该系统调用,一个进程可以监视多个文件描述符,一旦某个文件描述符就绪,内核能够将就绪的状态返回给应用程序,随后应用程序通过就绪的状态,进行相应的IO系统操作
下面进入到真正的 NIO
的学习中,
NIO
是由以下三个核心组件组成
缓冲区,应用程序和 Channel
的主要交互操作区域
通道,类似于输入输出流的合体
选择器,负责IO事件的查询器,查询 Channel
的IO事件是否就绪, 和通道属于监控和被监控的关系
下来我们先来看了解 Buffer
缓冲区,本质是一块内存块,既可以写入数据,也可以从中读取数据
表示Buffer内部容量的大小,如果写入的数据量超过 capacity
,那么将不再写入并且会抛出异常: java.nio.BufferOverflowException
表示当前的位置, position
和缓冲区读写模式有关,
在写入模式下:
在读模式下:
关于读写模式如何切换,下面讲,这里涉及到position和limit的变化
表示读写的最大上限,在刚进入到写模式时,读写的最大上限=capacity容器大小
在进入读模式下, limit=写模式下的position
Buffer类是一个非线程安全类
Buffer类是一个抽象类,位于 java.nio
中,其子类对应Java中的主要数据类型,内部是由对应子类类型的数组构成,下面看验证过程:
DoubleBuffer buffer = DoubleBuffer.allocate(100); //建立一个内部容量大小为100的Buffer 复制代码
跟踪其源码:
public static DoubleBuffer allocate(intcapacity){ if(capacity<0) throw new IllegalArgumentException(); return new HeapDoubleBuffer(capacity,capacity); } //-------------HeapDoubleBuffer---------------- HeapDoubleBuffer(intcap,intlim){ super(-1,0,lim,cap,newdouble[cap],0); } //---------------DoubleBuffer--------------- DoubleBuffer(int mark,int pos,int lim,int cap, double[] hb,int offset) { super(mark,pos,lim,cap); this.hb=hb; this.offset=offset; } 复制代码
专门用来内存映射的类型
所有Buffer的创建过程都是一样的,不再一一举例,下面说几个重要概念
我猜猜,肯定有很多人会想到 StringBuffer
,O(∩_∩)O哈哈~,不一样的
创建Buffer对象,并分配内存空间,并且默认情况下,该Buffer处于 写模式 下,不信我们来看结果:
这里我就采用 ByteBuffer
ByteBuffer buffer=ByteBuffer.allocate(100); private static void show(ByteBuffer buffer){ System.out.print("position:"+buffer.position()); System.out.print("/t"); System.out.print("capacity:"+buffer.capacity()); System.out.print("/t"); System.out.println("limit:"+buffer.limit()); System.out.println("-----------------"); } //position:0 capacity:100 limit:100 复制代码
position为0,limit和初识容量大小相等,说明是写入模式
将数据写入到Buffer中
buffer.put("helloworld".getBytes()); //继续调用show方法,查看position的变化 //position:11 capacity:100 limit:100 复制代码
position变成11,其余不变
翻转,将写模式转变成读模式
buffer.flip(); //继续调用show方法,查看position的变化 //position:0 capacity:100 limit:11 复制代码
对于翻转前和翻转后,limit变成翻转前的position值,position重置为0,当position>=limit时,就没有数据可以读取
那么,如何再转为写模式呢?
这两个方法都可以将读模式转变成写模式,
clear:清空
compact:压缩
将模式转成读模式后,可以开始从缓冲区读取数据,每读一个数据,position+1
buffer.get(); 复制代码
如果需要读取到整个数组,调用
buffer.array() 复制代码
倒带,就是如果已经读完的数据,需要再读一次,就可以调用rewind()方法
allocate put flip get clear/compact
Buffer的重点操作在于对position和limit的变化,大家可以多看看对应方法的源码
上面说到, JavaNIO
是一个基于缓冲区的,基于通道的I/O操作方法,在 NIO
中,可以将连接想象成通道,一个连接就是一个通道
作为 NIO
的核心组件之一,根据不同的传输协议有不同类型的通道实现
本质上 NIO
的I/O操作方法就是在操作 Buffer
下面我们一个个来学习
文件通道,文件通道是一个专门用来操作文件的通道,既可以从文件读取数据,也可以将数据写入到文件中,
FileChannel
是一个阻塞类型的通道,不可以设置为非阻塞模式
//得到读取通道 FileChannel fisChannel=new FileInputStream("").getChannel(); //得到输出通道 FileChannel fosChannel=new FileOutputStream("").getChannel(); //通过文件随机访问类得到通道 FileChannel AccChannel=new RandowAccessFile("","rw").getChannel(); 复制代码
不同类型的流得到不同意义上的通道,
本质上 NIO
的I/O操作方法就是在操作 Buffer
,一定要注意这句话,意思是:读取数据,就是将数据写入到 Buffer
中,故而这里的 Buffer
模式是写模式
//创建一个ByteBuffer,容量大小为1024 ByteBuffer buffer=ByteBuffer.allocate(1024); //因为刚创建出来的Buffer的模式就是写模式,所以我们不需要进行转换 //调用读取通道的read()读取,返回读取到的数据量 intlen=fisChannel.read(buffer); 复制代码
读取到数据后,我们想将数据写入到指定的文件中,我们可以这样做:
//切换buffer的模式:读模式 buffer.flip(); //写入到指定的文件中,返回写入成功的 int len=fosChannel.write(buffer); //切换buffer的模式:写模式 buffer.clear(); 复制代码
这里为什么需要切换 Buffer
的模式?
输出通道如果想将数据写入到文件中的流程:
Buffer
所以就需要将模式切换,同理,调用 clear()
也是一样的道理。
在将缓冲区写入通道时,是由操作系统来完成的,处于性能问题,不可能每次都实时写入,所以为了保证数据最终都真正的写入磁盘,所以需要调用通道的强制刷新来完成
fosChannel.force(true); 复制代码
和使用流方式是一样的,通道也需要关闭
fisChannel.close(); fosChannel.close(); 复制代码
下面我们来使用 FileChannel
来做一个完整的案例:
public class CopyFileByFileChannel { static ByteBuffer buffer = ByteBuffer.allocate(1024); public static void main(String[] args) { copy_file(); } private static void copy_file() { FileChannel fisChannel = null; FileChannel fosChannel = null; FileInputStream fis = null; FileOutputStream fos = null; try { fis = new FileInputStream("D://work//web//study-netty//src//main//java//top//zopx//study//nio//CopyFileByFileChannel.java"); fos = new FileOutputStream("D://work//web//study-netty//src//main//java//top//zopx//study//nio//CopyFileByFileChannel.txt"); fisChannel = fis.getChannel(); fosChannel = fos.getChannel(); while (fisChannel.read(buffer) != -1) { buffer.flip(); // int outLen = 0; // while ((outLen = fosChannel.write(buffer)) != 0) { // System.out.println("outLen:"+ outLen); // } fosChannel.write(buffer); buffer.clear(); } fosChannel.force(true); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != fis) { fis.close(); } if (null != fos) { fos.close(); } if (null != fisChannel) { fisChannel.close(); } if (null != fosChannel) { fosChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } } } 复制代码
事实上,针对文件复制的部分, NIO
也为我们提供了非常友好的一个方法,这里给出关键代码
long size = fisChannel.size(); long pos = 0; long count = 0; while (pos < size) { count = size - pos > 1024 ? 1024 : size - pos; pos += fosChannel.transferFrom(fisChannel, pos, count); } 复制代码
避免了我们在创建 Buffer
后的模式切换问题
套接字通道,基于TCP面向连接的,用于客户端网络的通道,负责网络连接数据传输, SocketChannel
分为阻塞和非阻塞模式,可以通过以下配置来设置
socketChannel.configureBlocking(false); 复制代码
阻塞模式下的执行方式和效率和标准IO下的 Socket
是一样的,所以不设置为阻塞模式
那么,接下来我们来看看如何得到 SocketChannel
的实例
socketChannel=SocketChannel.open(); //非阻塞模式 socketChannel.configureBlocking(false); 复制代码
public static final String HOST="127.0.0.1"; public static final int PORT=36589; socketChannel.connect(new InetSocketAddress(HOST,PORT)); while(!socketChannel.finishConnect()){} 复制代码
连接到服务端很简单,通过 connect()
方法就可以,但是在非阻塞模式下,客户端连接到服务端,会立即返回连接结果,不管连接是否成功,所以需要通过 自旋
的方式,判断 socketChannel
是否真正的连接到了服务端
连接到服务端后,就很简单了, 操作数据
的过程其实就是在操作 Buffer
的过程,这里就不再累述,随后通过完整的例子来操作
在关闭 SocketChannel
前,建议先给服务端发送一个结束标志,然后再关闭
socketChannel.shutdownOutput(); socketChannel.close(); 复制代码
服务端通道,面向连接,用于服务端网络的通道,负责连接监听,和 SocketChannel
一样,分为阻塞和非阻塞模式,配置方式都是一样的
server.configureBlocking(false); 复制代码
server=ServerSocketChannel.open(); server.configureBlocking(false); 复制代码
server.bind(new InetSocketAddress(36589)); 复制代码
server.close(); 复制代码
其他的操作数据等过程和 SocketChannel
是一样的,而且想真正的实现一个 ServerSocketChannel
的完整小demo,还需要和 Selector
配合使用
是基于 UDP 无连接的传输协议的数据报通道,分为阻塞和非阻塞模式,配置方式
open.configureBlocking(false); 复制代码
open=DatagramChannel.open(); open.configureBlocking(false); 复制代码
不多说了,标准写法
open.bind(new InetSocketAddress(52485)); 复制代码
这里的读取数据和之前不同,不再是通过 read()
方法来读取:
open.receive(buffer); 复制代码
发送数据也不再使用 write()
方式,而是:
open.send(buffer,newInetSocketAddress()) 复制代码
第二个参数:你想要发送给的客户端
以一个小例子让大家理解下 DatagramChannel
的使用方法
public class DatagramOpenChannel { public static void main(String[] args) { int port = getPort(); datagram_open_channel(port); } private static int getPort() { System.out.println("请输入你要绑定的端口号:"); Scanner scanner = new Scanner(System.in); return scanner.nextInt(); } private static void datagram_open_channel(int port) { DatagramChannel open = null; try { open = DatagramChannel.open(); open.configureBlocking(false); open.bind(new InetSocketAddress(port)); read(open); send(open); } catch (IOException e) { e.printStackTrace(); } finally { if (null != open) { try { open.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static void send(DatagramChannel open) throws IOException { System.out.println("输入的内容格式:port@msg"); Scanner scanner = new Scanner(System.in); ByteBuffer buffer = ByteBuffer.allocate(1024); while (scanner.hasNext()) { String next = scanner.next(); if (next.contains("@")) { String[] split = next.split("@"); int port = Integer.parseInt(split[0]); String msg = split[1]; buffer.put(msg.getBytes()); buffer.flip(); open.send(buffer, new InetSocketAddress("127.0.0.1", port)); buffer.clear(); } } } private static void read(DatagramChannel open) throws IOException { new Thread(() -> { ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { try { SocketAddress receive = open.receive(buffer); if (null != receive) { buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.limit())); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } } }).start(); } } 复制代码
测试方式:同时开启两个客户端,按照指定格式发送数据
只是一个小案例,很多地方没有做判断,大家可以在此基础上完善
选择器:是 NIO
组件中的重要角色,那么什么是选择器?
前面我们说到, NIO
的模式是 IO多路复用模型
,而选择器(Selector)就是为了完成IO的多路复用,选择器在其中就是起到查询IO的就绪状态的作用,通过选择器可以同时监控多个通道的IO状态,
需要注意的是,选择器只适用于非阻塞通道的情况下,所以 FileChannel
是不适用的
通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件,也正符合了 IO多路复用模型 的条件
有数据可读的通道,处于 读就绪 状态
一个等待写入数据的通道,处于 写就绪 状态
某个通道,完成了和对端的握手连接,处于 连接就绪 状态
某个服务端通道,监听到一个新连接的到来,处于 接收就绪 状态
selector=Selector.open(); 复制代码
//注册选择器并绑定接收就绪状态 serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); 复制代码
我们需要注意,
注册选择器的通道必须是非阻塞模式
不是所有的通道都支持四种IO事件,比如:
while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { System.out.println("存在新链接进来"); } else if (key.isConnectable()) { System.out.println("连接就绪"); } else if (key.isReadable()) { System.out.println("可读"); } else if (key.isWritable()) { System.out.println("可写"); } } } 复制代码
重点:
我们改造下那个例子:
Selector selector=Selector.open(); //DatagramChannel是无连接的,所以我直接绑定的读就绪 open.register(selector,SelectionKey.OP_READ); 复制代码
read()
方法 private static void read(Selector selector) throws IOException { new Thread(() -> { ByteBuffer buffer = ByteBuffer.allocate(1024); try { while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { DatagramChannel open = (DatagramChannel) key.channel(); SocketAddress receive = open.receive(buffer); if (null != receive) { buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.limit())); buffer.clear(); } } } } } catch (Exception e) { e.printStackTrace(); } }).start(); } 复制代码
上面讲解 SocketChannel
和 ServerSocketChannel
时,没有小栗子展示,接下来我们重点来对这两者进行介绍
该栗子比较复杂,请好好消化
首先,我们先来讲解下需求:
客户端选择文件上传到服务端,保存到服务端指定的文件夹下
class NioSocket { static final String HOST = "127.0.0.1"; static final int PORT = 23356; static final int BUFFER_CAPACITY = 1024; static final Charset CHARSET = StandardCharsets.UTF_8; } // 为了简单 class ReceiverFile { public String fileName; public long length; public FileChannel outChannel; } 复制代码
class SocketDemo { private static String UPLOAD_FILE = ""; public static void main(String[] args) { send_file(); } private static void send_file() { changeUploadFile(); File file = new File(UPLOAD_FILE); if (!file.exists()) { System.out.println("文件不存在"); return; } try { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress( NioSocket.HOST, NioSocket.PORT )); while (!socketChannel.finishConnect()) { // 异步模式, 自旋验证是否已经成功连接到服务器端 // 这里也可以做其他事情 } System.out.println("成功连接到服务器端"); ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY); ByteBuffer encode = NioSocket.CHARSET.encode(file.getName()); // 发送文件名称长度 // 这里如果直接使用 encode.capacity() 的话, 会多两个字节的长度 buffer.putInt(file.getName().trim().length()); // buffer.flip(); // socketChannel.write(buffer); // buffer.clear(); System.out.printf("文件名称长度发送:%s /n" , encode.capacity()); // 发送文件名称 buffer.put(encode); // socketChannel.write(encode); System.out.printf("文件名称发送:%s /n", file.getName()); // 发送文件大小 buffer.putLong(file.length()); // buffer.flip(); // socketChannel.write(buffer); // buffer.clear(); System.out.printf("发送文件长度:%s /n", file.length()); // 发送文件 int len = 0; long progess = 0; FileChannel fileChannel = new FileInputStream(file).getChannel(); while ((len = fileChannel.read(buffer)) > 0) { buffer.flip(); socketChannel.write(buffer); buffer.clear(); progess += len; System.out.println("上传文件进度:" + (progess / file.length() * 100) + "%"); } // 发送完成, 常规关闭操作 if (len == -1) { // 发送完成, 关闭操作 fileChannel.close(); socketChannel.shutdownOutput(); socketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } private static void changeUploadFile() { System.out.println("请输入想要上传文件的完整路径"); Scanner scanner = new Scanner(System.in); UPLOAD_FILE = scanner.next(); } } 复制代码
关于我注释掉的地方, 是我在测试过程中遇到的问题:
如果把信息分开发送的话, 那么服务端接收可能会出现如下问题
Exception in thread "main" java.nio.BufferUnderflowException 复制代码
class ServerSocketDemo { private static String UPLOAD_SAVE_PATH = "D://works//111"; private static final Map<SelectableChannel, ReceiverFile> MAP = new ConcurrentHashMap<>(); public static void main(String[] args) { receive_file(); } private static void receive_file() { getUploadSavePath(); // 服务器端编写 try { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 绑定端口 serverSocketChannel.bind( new InetSocketAddress( NioSocket.PORT ) ); // 绑定选择器 Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 轮训 while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 判断事件 if (key.isAcceptable()) { accept(key, selector); } else if (key.isReadable()) { processData(key); } } } selector.close(); serverSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } private static void processData(SelectionKey key) throws IOException { ReceiverFile receiverFile = MAP.get(key.channel()); SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY); int len = 0; while ((len = socketChannel.read(buffer)) > 0) { buffer.flip(); if (receiverFile.fileName == null) { // 处理文件名称 if (buffer.capacity() < 4) { continue; } int fileNameLength = buffer.getInt(); byte[] fileNameArr = new byte[fileNameLength]; buffer.get(fileNameArr); String fileName = new String(fileNameArr, NioSocket.CHARSET); System.out.println("文件名称:" + fileName); receiverFile.fileName = fileName; // 处理存储文件 File dir = new File(UPLOAD_SAVE_PATH); if (!dir.exists()) { dir.mkdir(); } File file = new File((UPLOAD_SAVE_PATH + File.separator + fileName).trim()); if (!file.exists()) { file.createNewFile(); } receiverFile.outChannel = new FileOutputStream(file).getChannel(); // 长度 if (buffer.capacity() < 8) { continue; } long fileLength = buffer.getLong(); System.out.println("文件大小:" + fileLength); receiverFile.length = fileLength; // 文件内容 if (buffer.capacity() < 0) { continue; } receiverFile.outChannel.write(buffer); } else { // 文件内容 receiverFile.outChannel.write(buffer); } buffer.clear(); } if (len == -1) { receiverFile.outChannel.close(); } } private static void accept(SelectionKey key, Selector selector) throws IOException { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel accept = channel.accept(); accept.configureBlocking(false); accept.register(selector, SelectionKey.OP_READ); // 通道和File进行匹配 ReceiverFile receiverFile = new ReceiverFile(); MAP.put(accept, receiverFile); } private static void getUploadSavePath() { System.out.println("请输入想要保存文件的路劲:"); Scanner scanner = new Scanner(System.in); UPLOAD_SAVE_PATH = scanner.next(); } } 复制代码
好, 到此NIO的知识点就完结了, 可以看到知识点虽然很多, 但其实没有很复杂, 根据上面的知识点一点一点的学习下来, 很容易就能掌握