前一节中我们提到, Netty 入门的两个准备工作, 一是 Java NIO ,二是 Reactor反应器模式 , 这节我们来了解 Reactor反应器模式
如果不知道 Java NIO 的话,那么推荐先去看一下 Java NIO :
Reactor反应器模式是高性能网络编程在设计和架构层面的基础模式。
很多著名的服务器软件或者中间件都是基于反应器模式实现的, 比如:
所以说如果我们想要懂得 Netty
, 甚至说, 如果我们要完成和胜任高性能的服务器开发, 就必须懂得 Reactor反应器模式
, 这是基础知识。
Reactor反应器模式 是一种为处理服务请求并发提交到一个或者多个服务处理程序的事件设计模式,当客户端请求抵达后,服务处理程序使用多路分配策略,然后同步地派发这些请求至相关的请求处理程序
来源: 百度百科
站在巨人的肩膀上了解 Reactor反应器模式
,Java中 Concurrent并发包
的重要作者之一的 Doug Lea
,在文章《 Scalable IO in Java
》中对 反应器模式
的定义具体如下:
这里给出原文地址: Scalable IO in Java
反应器模式 有两大角色组成:
反应器线程
Handlers处理器
其中, 反应器线程 的主要负责响应IO事件,并且分发到Handlers处理器;
当检测到IO事件,将其发送给相应的Handler处理器去处理
与IO事件绑定,负责IO事件的处理,完成真正的连接建立,通道的读取,处理业务逻辑,负责将结果写出到通道
而 Handlers处理器 主要负责非阻塞的执行业务处理逻辑
下面我们来看看 Reactor反应器模式 是如何实现的
首先我们先来了解一个经典模式: Connection Per Thread
在最初的服务器程序中,我们判断是否存在连接是通过 while(true)
来监听的,如果存在,就调用一个处理函数来处理,下面我们来看一段伪代码
while(true) { socket = serverSocket.accept(); handler(socket); } 复制代码
这种方式最大的问题,就是如果前一个handler没有处理完,后一个socket就无法被接收, 这样就造成了服务器的吞吐量太低,于是出现下面的经典模式:
看下面这段代码:
class ServerPer implements Runnable { public void run() { while(!Thread.interrupted()) { socket = serverSocket.accept(); new Thread(new Handler(socket)).start(); } } } class Handler implements Runnable { private Socket socket; public Handler(Socket socket) { this.socket = socket; } public void run() { // 处理操作 } } 复制代码
这种方式的优点:解决了新连接被严重阻塞的问题, 在一定程度上, 极大提高了服务器的吞吐量,
但是这种方式的缺点也很明显: 一个线程处理一个连接,如果程序存在大量的连接,那么需要耗费大量的线程资源,在系统中,线程资源也是比较昂贵的资源;
而且频繁的创建,销毁,线程的切换也需要代价。因此这种模式也不适用于高并发的应用场景
那么下面我们来看看 Reactor反应器模式 是如何解决这种模式的缺陷
简单来说, 就是 Reactor反应器 和 Handlers处理器 处于同一个线程中执行, 这是最简单的反应器模式
基于 NIO 实现我们需要用到两个方法:
是属于 SelectionKey
选择键中的重要方法
将任意对象作为附件添加到 SelectionKey
实例
取出之前通过 attach
添加到 SelectionKey
实例中的附件
这两个方法是配套使用的
我们把上一节的服务端代码改造下:
class AcceptHandler implements Runnable { private ServerSocketDemo serverSocketDemo; public AcceptHandler(ServerSocketDemo serverSocketDemo) { this.serverSocketDemo = serverSocketDemo; } @Override public void run() { try { SocketChannel accept = serverSocketDemo.serverSocketChannel.accept(); if (null != accept) { ReceiverFile receiverFile = new ReceiverFile(); // 将上传文件保存路径记录下来 receiverFile.uploadSavePath = serverSocketDemo.UPLOAD_SAVE_PATH; NioSocket.MAP.put(accept, receiverFile); new ReadHandler(serverSocketDemo.selector, accept); } } catch (IOException e) { e.printStackTrace(); } } } 复制代码
attach
添加进来 // 绑定选择器 selector = Selector.open(); SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将处理连接的Handler处理器添加进来 register.attach(new AcceptHandler(this)); 复制代码
class ReadHandler implements Runnable { private SocketChannel socketChannel; public ReadHandler(Selector selector, SocketChannel accept) throws IOException { this.socketChannel = accept; socketChannel.configureBlocking(false); SelectionKey selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { try { processData(); } catch (IOException e) { e.printStackTrace(); } } private void processData() throws IOException { ReceiverFile receiverFile = NioSocket.MAP.get(socketChannel); ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY); int len = 0; while ((len = socketChannel.read(buffer)) > 0) { buffer.flip(); if (receiverFile.fileName == null) { // 处理文件名称 if (buffer.capacity() < 4) { continue; } int fileNameLength = buffer.getInt(); byte[] fileNameArr = new byte[fileNameLength]; buffer.get(fileNameArr); String fileName = new String(fileNameArr, NioSocket.CHARSET); System.out.println("文件名称:" + fileName); receiverFile.fileName = fileName; // 处理存储文件 File dir = new File(receiverFile.uploadSavePath); if (!dir.exists()) { dir.mkdir(); } File file = new File((receiverFile.uploadSavePath + File.separator + fileName).trim()); if (!file.exists()) { file.createNewFile(); } receiverFile.outChannel = new FileOutputStream(file).getChannel(); // 长度 if (buffer.capacity() < 8) { continue; } long fileLength = buffer.getLong(); System.out.println("文件大小:" + fileLength); receiverFile.length = fileLength; // 文件内容 if (buffer.capacity() < 0) { continue; } receiverFile.outChannel.write(buffer); } else { // 文件内容 receiverFile.outChannel.write(buffer); } buffer.clear(); } if (len == -1) { receiverFile.outChannel.close(); } } } 复制代码
主体代码不变, 重点是在构造方法中
class ServerSocketDemo implements Runnable{ String UPLOAD_SAVE_PATH = "D://works//111"; ServerSocketChannel serverSocketChannel; Selector selector; public static void main(String[] args) { new Thread(new ServerSocketDemo()).start(); } public ServerSocketDemo() { getUploadSavePath(); // 服务器端编写 try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 绑定端口 serverSocketChannel.bind( new InetSocketAddress( NioSocket.PORT ) ); // 绑定选择器 selector = Selector.open(); SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); register.attach(new AcceptHandler(this)); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { // 监控IO事件,然后分发 dispatch(key); } selectionKeys.clear(); } } catch (Exception e) { e.printStackTrace(); } } private void dispatch(SelectionKey key) { Runnable attachment = (Runnable) key.attachment(); if (null != attachment) { attachment.run(); } } } 复制代码
到这里就将上一节的代码改造成 Reactor
版的, 对比之前的代码, 我们可以看到,
AcceptHandler
和 ReadHandler
, 分别负责处理新连接和读取文件 ServerSocketDemo
只需要负责分发事件处理 客户端代码不变, 经测试, 代码没有问题,大家可以试试
单线程的Reactor相对于经典模式 Connection Per Thread , 不再需要启动成千上万的线程, 效率自然是提升了很多。
但是在单线程中, Reactor反应器 和 Handlers处理器 的执行都在同一个线程上, 这样就会出现一个问题:
这些都是非常严重的问题, 所以引出 多线程的Reactor反应器模式模型
下面我们来看看实现过程
private static final int THREAD_COUNT = 2; Selector[] selectors = new Selector[THREAD_COUNT]; ServerSocketChannel serverSocketChannel; NioReactorMultiThreadServer() { try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 绑定端口 serverSocketChannel.bind( new InetSocketAddress( NioSocket.PORT ) ); // 绑定选择器 for (int i = 0; i < THREAD_COUNT; i++) { selectors[i] = Selector.open(); } // 第一个选择器 监听连接 SelectionKey register = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT); // 附加到选择键上 register.attach(new AcceptHandler(this)); } catch (IOException e) { e.printStackTrace(); } } 复制代码
定义子反应器数组, 并且实现 Runnable
接口, 每条线程负责一个选择器
class SubReactor implements Runnable{ private Selector selector; SubReactor(Selector selector) { this.selector = selector; } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { dispatch(key); } selectionKeys.clear(); } } catch (Exception e) { e.printStackTrace(); } } } 复制代码
在 attach()
方法后面初始化子反应器
SubReactor[] subReactors = null; subReactors = new SubReactor[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; i++) { subReactors[i] = new SubReactor(selectors[i]); } 复制代码
class AcceptHandler implements Runnable{ private NioReactorMultiThreadServer nioReactorMultiThreadServer; AcceptHandler(NioReactorMultiThreadServer nioReactorMultiThreadServer) { this.nioReactorMultiThreadServer = nioReactorMultiThreadServer; } @Override public void run() { try { SocketChannel accept = nioReactorMultiThreadServer.serverSocketChannel.accept(); if (null != accept) { // 采用轮询的方式获取到选择器 new ReadHandler(nioReactorMultiThreadServer.selectors[nioReactorMultiThreadServer.atom.get()], accept); } } catch (IOException e) { e.printStackTrace(); } finally { if (nioReactorMultiThreadServer.atom.incrementAndGet() == nioReactorMultiThreadServer.selectors.length) { nioReactorMultiThreadServer.atom.set(0); } } } } 复制代码
ExecutorService executorService = Executors.newFixedThreadPool(4); @Override public void run() { // 线程池执行 executorService.execute(() -> { try { // synchronized processData(); } catch (IOException e) { e.printStackTrace(); } }); } 复制代码
这样就完成了 多线程版的Reactor反应器模式模型 的代码,代码测试无误
下面是完整版代码
class NioSocket { static final String HOST = "127.0.0.1"; static final int PORT = 23356; static final int BUFFER_CAPACITY = 1024; static final Charset CHARSET = StandardCharsets.UTF_8; static final Map<SocketChannel, ReceiverFile> MAP = new ConcurrentHashMap<>(16); } // 为了简单 class ReceiverFile { public String fileName; public long length; public FileChannel outChannel; public String uploadSavePath; } public class NioReactorMultiThreadServer { private static final int THREAD_COUNT = 2; Selector[] selectors = new Selector[THREAD_COUNT]; ServerSocketChannel serverSocketChannel; private SubReactor[] subReactors = null; String UPLOAD_SAVE_PATH = ""; AtomicInteger atom = new AtomicInteger(0); public static void main(String[] args) { new NioReactorMultiThreadServer().startServer(); } private void startServer() { for (int i = 0; i < THREAD_COUNT; i++) { new Thread(subReactors[i]).start(); } } private void getUploadSavePath() { System.out.println("请输入想要保存文件的路劲:"); Scanner scanner = new Scanner(System.in); UPLOAD_SAVE_PATH = scanner.next(); } private NioReactorMultiThreadServer() { getUploadSavePath(); try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 绑定端口 serverSocketChannel.bind( new InetSocketAddress( NioSocket.PORT ) ); // 绑定选择器 for (int i = 0; i < THREAD_COUNT; i++) { selectors[i] = Selector.open(); } // 第一个选择器 监听连接 SelectionKey register = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT); // 附加到选择键上 register.attach(new AcceptHandler(this)); subReactors = new SubReactor[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; i++) { subReactors[i] = new SubReactor(selectors[i]); } } catch (IOException e) { e.printStackTrace(); } } class SubReactor implements Runnable { private Selector selector; SubReactor(Selector selector) { this.selector = selector; } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { dispatch(key); } selectionKeys.clear(); } } catch (Exception e) { e.printStackTrace(); } } private void dispatch(SelectionKey key) { Runnable attachment = (Runnable) key.attachment(); if (null != attachment) { attachment.run(); } } } } class AcceptHandler implements Runnable { private NioReactorMultiThreadServer nioReactorMultiThreadServer; AcceptHandler(NioReactorMultiThreadServer nioReactorMultiThreadServer) { this.nioReactorMultiThreadServer = nioReactorMultiThreadServer; } @Override public void run() { try { SocketChannel accept = nioReactorMultiThreadServer.serverSocketChannel.accept(); if (null != accept) { ReceiverFile receiverFile = new ReceiverFile(); // 将上传文件保存路径记录下来 receiverFile.uploadSavePath = nioReactorMultiThreadServer.UPLOAD_SAVE_PATH; NioSocket.MAP.put(accept, receiverFile); // 采用轮询的方式获取到选择器 new ReadHandler(nioReactorMultiThreadServer.selectors[nioReactorMultiThreadServer.atom.get()], accept); } } catch (IOException e) { e.printStackTrace(); } finally { if (nioReactorMultiThreadServer.atom.incrementAndGet() == nioReactorMultiThreadServer.selectors.length) { nioReactorMultiThreadServer.atom.set(0); } } } } class ReadHandler implements Runnable { private SocketChannel socketChannel; private ExecutorService executorService = Executors.newFixedThreadPool(4); ReadHandler(Selector selector, SocketChannel accept) throws IOException { this.socketChannel = accept; socketChannel.configureBlocking(false); SelectionKey selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { executorService.execute(() -> { try { processData(); } catch (IOException e) { e.printStackTrace(); } }); } private synchronized void processData() throws IOException { ReceiverFile receiverFile = NioSocket.MAP.get(socketChannel); ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY); int len = 0; while ((len = socketChannel.read(buffer)) > 0) { buffer.flip(); if (receiverFile.fileName == null) { // 处理文件名称 if (buffer.capacity() < 4) { continue; } int fileNameLength = buffer.getInt(); byte[] fileNameArr = new byte[fileNameLength]; buffer.get(fileNameArr); String fileName = new String(fileNameArr, NioSocket.CHARSET); System.out.println("文件名称:" + fileName); receiverFile.fileName = fileName; // 处理存储文件 File dir = new File(receiverFile.uploadSavePath); if (!dir.exists()) { dir.mkdir(); } File file = new File((receiverFile.uploadSavePath + File.separator + fileName).trim()); if (!file.exists()) { file.createNewFile(); } receiverFile.outChannel = new FileOutputStream(file).getChannel(); // 长度 if (buffer.capacity() < 8) { continue; } long fileLength = buffer.getLong(); System.out.println("文件大小:" + fileLength); receiverFile.length = fileLength; // 文件内容 if (buffer.capacity() < 0) { continue; } receiverFile.outChannel.write(buffer); } else { // 文件内容 receiverFile.outChannel.write(buffer); } buffer.clear(); } if (len == -1) { receiverFile.outChannel.close(); } } } 复制代码
客户端代码不变