转载

Netty编程实战之:Reactor反应器模式

前一节中我们提到, Netty 入门的两个准备工作, 一是 Java NIO ,二是 Reactor反应器模式 , 这节我们来了解 Reactor反应器模式

如果不知道 Java NIO 的话,那么推荐先去看一下 Java NIO

  • Netty编程实战之:掌控NIO

什么是Reactor反应器模式

Reactor反应器模式是高性能网络编程在设计和架构层面的基础模式。

Reactor的应用领域

很多著名的服务器软件或者中间件都是基于反应器模式实现的, 比如:

  • Nginx
  • Redis
  • Netty
  • ...

所以说如果我们想要懂得 Netty , 甚至说, 如果我们要完成和胜任高性能的服务器开发, 就必须懂得 Reactor反应器模式 , 这是基础知识。

Reactor反应器模式

Reactor反应器模式 是一种为处理服务请求并发提交到一个或者多个服务处理程序的事件设计模式,当客户端请求抵达后,服务处理程序使用多路分配策略,然后同步地派发这些请求至相关的请求处理程序

来源: 百度百科

组成结构

站在巨人的肩膀上了解 Reactor反应器模式 ,Java中 Concurrent并发包 的重要作者之一的 Doug Lea ,在文章《 Scalable IO in Java 》中对 反应器模式 的定义具体如下:

这里给出原文地址: Scalable IO in Java

反应器模式 有两大角色组成:

  • 反应器线程

  • Handlers处理器

其中, 反应器线程 的主要负责响应IO事件,并且分发到Handlers处理器;

当检测到IO事件,将其发送给相应的Handler处理器去处理

与IO事件绑定,负责IO事件的处理,完成真正的连接建立,通道的读取,处理业务逻辑,负责将结果写出到通道

Handlers处理器 主要负责非阻塞的执行业务处理逻辑

下面我们来看看 Reactor反应器模式 是如何实现的

实现

首先我们先来了解一个经典模式: Connection Per Thread

在最初的服务器程序中,我们判断是否存在连接是通过 while(true) 来监听的,如果存在,就调用一个处理函数来处理,下面我们来看一段伪代码

while(true) {
    socket = serverSocket.accept();
    handler(socket);
}
复制代码

这种方式最大的问题,就是如果前一个handler没有处理完,后一个socket就无法被接收, 这样就造成了服务器的吞吐量太低,于是出现下面的经典模式:

Connection Per Thread (一个线程处理一个连接)

看下面这段代码:

class ServerPer implements Runnable {
    public void run() {
        while(!Thread.interrupted()) {
            socket = serverSocket.accept();
            new Thread(new Handler(socket)).start();
        }
    }    
}

class Handler implements Runnable {
    private Socket socket;
    public Handler(Socket socket) {
        this.socket = socket;
    }
    public void run() {
        // 处理操作
    }
}
复制代码

这种方式的优点:解决了新连接被严重阻塞的问题, 在一定程度上, 极大提高了服务器的吞吐量,

但是这种方式的缺点也很明显: 一个线程处理一个连接,如果程序存在大量的连接,那么需要耗费大量的线程资源,在系统中,线程资源也是比较昂贵的资源;

而且频繁的创建,销毁,线程的切换也需要代价。因此这种模式也不适用于高并发的应用场景

那么下面我们来看看 Reactor反应器模式 是如何解决这种模式的缺陷

单线程Reactor

什么是单线程Reactor

简单来说, 就是 Reactor反应器Handlers处理器 处于同一个线程中执行, 这是最简单的反应器模式

基于NIO实现的Reactor

基于 NIO 实现我们需要用到两个方法:

是属于 SelectionKey 选择键中的重要方法

attach(Object o)

将任意对象作为附件添加到 SelectionKey 实例

attachment()

取出之前通过 attach 添加到 SelectionKey 实例中的附件

这两个方法是配套使用的

实现代码

我们把上一节的服务端代码改造下:

  • 创建处理连接的Handler处理器
class AcceptHandler implements Runnable {

    private ServerSocketDemo serverSocketDemo;
    public AcceptHandler(ServerSocketDemo serverSocketDemo) {
        this.serverSocketDemo = serverSocketDemo;
    }

    @Override
    public void run() {
        try {
            SocketChannel accept = serverSocketDemo.serverSocketChannel.accept();
            if (null != accept) {
                ReceiverFile receiverFile = new ReceiverFile();
                
                // 将上传文件保存路径记录下来
                receiverFile.uploadSavePath = serverSocketDemo.UPLOAD_SAVE_PATH;
                NioSocket.MAP.put(accept, receiverFile);
                
                new ReadHandler(serverSocketDemo.selector, accept);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码
  • 在服务端注册完IO事件后需要通过 attach 添加进来
// 绑定选择器
selector = Selector.open();
SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 将处理连接的Handler处理器添加进来
register.attach(new AcceptHandler(this));
复制代码
  • 创建读取的Handler处理器
class ReadHandler implements Runnable {

    private SocketChannel socketChannel;

    public ReadHandler(Selector selector, SocketChannel accept) throws IOException {
        this.socketChannel = accept;

        socketChannel.configureBlocking(false);
        SelectionKey selectionKey = socketChannel.register(selector, 0);
        
        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    @Override
    public void run() {
        try {
            processData();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void processData() throws IOException {
        ReceiverFile receiverFile = NioSocket.MAP.get(socketChannel);

        ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY);

        int len = 0;

        while ((len = socketChannel.read(buffer)) > 0) {

            buffer.flip();

            if (receiverFile.fileName == null) {

                // 处理文件名称
                if (buffer.capacity() < 4) {
                    continue;
                }

                int fileNameLength = buffer.getInt();

                byte[] fileNameArr = new byte[fileNameLength];
                buffer.get(fileNameArr);

                String fileName = new String(fileNameArr, NioSocket.CHARSET);
                System.out.println("文件名称:" + fileName);
                receiverFile.fileName = fileName;

                // 处理存储文件
                File dir = new File(receiverFile.uploadSavePath);
                if (!dir.exists()) {
                    dir.mkdir();
                }

                File file = new File((receiverFile.uploadSavePath + File.separator + fileName).trim());
                if (!file.exists()) {
                    file.createNewFile();
                }

                receiverFile.outChannel = new FileOutputStream(file).getChannel();

                // 长度
                if (buffer.capacity() < 8) {
                    continue;
                }

                long fileLength = buffer.getLong();
                System.out.println("文件大小:" + fileLength);
                receiverFile.length = fileLength;

                // 文件内容
                if (buffer.capacity() < 0) {
                    continue;
                }

                receiverFile.outChannel.write(buffer);
            } else {
                // 文件内容
                receiverFile.outChannel.write(buffer);
            }

            buffer.clear();
        }

        if (len == -1) {
            receiverFile.outChannel.close();
        }
    }
}
复制代码

主体代码不变, 重点是在构造方法中

  • 启动代码
class ServerSocketDemo implements Runnable{

    String UPLOAD_SAVE_PATH = "D://works//111";
    ServerSocketChannel serverSocketChannel;
    Selector selector;

    public static void main(String[] args) {
        new Thread(new ServerSocketDemo()).start();
    }

    public ServerSocketDemo() {
        getUploadSavePath();

        // 服务器端编写
        try {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);

            // 绑定端口
            serverSocketChannel.bind(
                    new InetSocketAddress(
                            NioSocket.PORT
                    )
            );

            // 绑定选择器
            selector = Selector.open();
            SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            register.attach(new AcceptHandler(this));

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                for (SelectionKey key : selectionKeys) {
                    // 监控IO事件,然后分发
                    dispatch(key);
                }
                selectionKeys.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void dispatch(SelectionKey key) {
        Runnable attachment = (Runnable) key.attachment();
        if (null != attachment) {
            attachment.run();
        }
    }
}
复制代码

到这里就将上一节的代码改造成 Reactor 版的, 对比之前的代码, 我们可以看到,

  • 我们定义了 AcceptHandlerReadHandler , 分别负责处理新连接和读取文件
  • 反应器类 ServerSocketDemo 只需要负责分发事件处理

客户端代码不变, 经测试, 代码没有问题,大家可以试试

多线程Reactor

单线程的Reactor相对于经典模式 Connection Per Thread , 不再需要启动成千上万的线程, 效率自然是提升了很多。

但是在单线程中, Reactor反应器Handlers处理器 的执行都在同一个线程上, 这样就会出现一个问题:

  • 如果其中的某一个Handler阻塞, 会导致其他所有的Handler都得不到执行, 甚至包括接收新连接处理器
  • 目前服务器都是多核的, 单线程模式不能充分利用服务器资源

这些都是非常严重的问题, 所以引出 多线程的Reactor反应器模式模型

优化过程

  • 引入多个选择器
  • 设计子反应器,一个反应器负责查询一个选择器, 专门负责Handler事件分发
  • 在Handler处理器中开启线程池, 通过多线程执行业务操作

下面我们来看看实现过程

引入多个选择器

private static final int THREAD_COUNT = 2;

Selector[] selectors = new Selector[THREAD_COUNT];
ServerSocketChannel serverSocketChannel;

NioReactorMultiThreadServer() {
    try {
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        // 绑定端口
        serverSocketChannel.bind(
            new InetSocketAddress(
                NioSocket.PORT
            )
        );

        // 绑定选择器
        for (int i = 0; i < THREAD_COUNT; i++) {
            selectors[i] = Selector.open();
        }

        // 第一个选择器 监听连接
        SelectionKey register = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT);

        // 附加到选择键上
        register.attach(new AcceptHandler(this));

    } catch (IOException e) {
        e.printStackTrace();
    }
}
复制代码

设计子反应器

定义子反应器数组, 并且实现 Runnable 接口, 每条线程负责一个选择器

class SubReactor implements Runnable{
    private Selector selector;
    SubReactor(Selector selector) {
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                for (SelectionKey key : selectionKeys) {
                    dispatch(key);
                }
                selectionKeys.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

attach() 方法后面初始化子反应器

SubReactor[] subReactors = null;

subReactors = new SubReactor[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
    subReactors[i] = new SubReactor(selectors[i]);
}
复制代码

优化处理器

  • 连接处理器
class AcceptHandler implements Runnable{
    private NioReactorMultiThreadServer nioReactorMultiThreadServer;
    AcceptHandler(NioReactorMultiThreadServer nioReactorMultiThreadServer) {
        this.nioReactorMultiThreadServer = nioReactorMultiThreadServer;
    }

    @Override
    public void run() {
        try {
            SocketChannel accept = nioReactorMultiThreadServer.serverSocketChannel.accept();
            if (null != accept) {
                // 采用轮询的方式获取到选择器
                new ReadHandler(nioReactorMultiThreadServer.selectors[nioReactorMultiThreadServer.atom.get()], accept);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (nioReactorMultiThreadServer.atom.incrementAndGet() == nioReactorMultiThreadServer.selectors.length) {
                nioReactorMultiThreadServer.atom.set(0);
            }
        }
    }
}
复制代码
  • 读取处理器,引入线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);

@Override
public void run() {
    // 线程池执行
    executorService.execute(() -> {
        try {
            // synchronized
            processData();
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
}
复制代码

这样就完成了 多线程版的Reactor反应器模式模型 的代码,代码测试无误

下面是完整版代码

class NioSocket {
    static final String HOST = "127.0.0.1";
    static final int PORT = 23356;
    static final int BUFFER_CAPACITY = 1024;
    static final Charset CHARSET = StandardCharsets.UTF_8;
    static final Map<SocketChannel, ReceiverFile> MAP = new ConcurrentHashMap<>(16);
}

// 为了简单
class ReceiverFile {
    public String fileName;
    public long length;
    public FileChannel outChannel;
    public String uploadSavePath;
}

public class NioReactorMultiThreadServer {

    private static final int THREAD_COUNT = 2;

    Selector[] selectors = new Selector[THREAD_COUNT];
    ServerSocketChannel serverSocketChannel;

    private SubReactor[] subReactors = null;

    String UPLOAD_SAVE_PATH = "";
    AtomicInteger atom = new AtomicInteger(0);

    public static void main(String[] args) {
        new NioReactorMultiThreadServer().startServer();
    }

    private void startServer() {
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(subReactors[i]).start();
        }
    }

    private void getUploadSavePath() {
        System.out.println("请输入想要保存文件的路劲:");
        Scanner scanner = new Scanner(System.in);
        UPLOAD_SAVE_PATH = scanner.next();
    }

    private NioReactorMultiThreadServer() {
        getUploadSavePath();

        try {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);

            // 绑定端口
            serverSocketChannel.bind(
                    new InetSocketAddress(
                            NioSocket.PORT
                    )
            );

            // 绑定选择器
            for (int i = 0; i < THREAD_COUNT; i++) {
                selectors[i] = Selector.open();
            }

            // 第一个选择器 监听连接
            SelectionKey register = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT);

            // 附加到选择键上
            register.attach(new AcceptHandler(this));

            subReactors = new SubReactor[THREAD_COUNT];
            for (int i = 0; i < THREAD_COUNT; i++) {
                subReactors[i] = new SubReactor(selectors[i]);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    class SubReactor implements Runnable {
        private Selector selector;

        SubReactor(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();

                    for (SelectionKey key : selectionKeys) {
                        dispatch(key);
                    }
                    selectionKeys.clear();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private void dispatch(SelectionKey key) {
            Runnable attachment = (Runnable) key.attachment();
            if (null != attachment) {
                attachment.run();
            }
        }
    }
}

class AcceptHandler implements Runnable {
    private NioReactorMultiThreadServer nioReactorMultiThreadServer;

    AcceptHandler(NioReactorMultiThreadServer nioReactorMultiThreadServer) {
        this.nioReactorMultiThreadServer = nioReactorMultiThreadServer;
    }

    @Override
    public void run() {
        try {
            SocketChannel accept = nioReactorMultiThreadServer.serverSocketChannel.accept();
            if (null != accept) {
                ReceiverFile receiverFile = new ReceiverFile();

                // 将上传文件保存路径记录下来
                receiverFile.uploadSavePath = nioReactorMultiThreadServer.UPLOAD_SAVE_PATH;
                NioSocket.MAP.put(accept, receiverFile);

                // 采用轮询的方式获取到选择器
                new ReadHandler(nioReactorMultiThreadServer.selectors[nioReactorMultiThreadServer.atom.get()], accept);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (nioReactorMultiThreadServer.atom.incrementAndGet() == nioReactorMultiThreadServer.selectors.length) {
                nioReactorMultiThreadServer.atom.set(0);
            }
        }
    }
}

class ReadHandler implements Runnable {

    private SocketChannel socketChannel;
    private ExecutorService executorService = Executors.newFixedThreadPool(4);

    ReadHandler(Selector selector, SocketChannel accept) throws IOException {
        this.socketChannel = accept;

        socketChannel.configureBlocking(false);
        SelectionKey selectionKey = socketChannel.register(selector, 0);

        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    @Override
    public void run() {
        executorService.execute(() -> {
            try {
                processData();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    private synchronized void processData() throws IOException {
        ReceiverFile receiverFile = NioSocket.MAP.get(socketChannel);

        ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY);

        int len = 0;

        while ((len = socketChannel.read(buffer)) > 0) {

            buffer.flip();

            if (receiverFile.fileName == null) {

                // 处理文件名称
                if (buffer.capacity() < 4) {
                    continue;
                }

                int fileNameLength = buffer.getInt();

                byte[] fileNameArr = new byte[fileNameLength];
                buffer.get(fileNameArr);

                String fileName = new String(fileNameArr, NioSocket.CHARSET);
                System.out.println("文件名称:" + fileName);
                receiverFile.fileName = fileName;

                // 处理存储文件
                File dir = new File(receiverFile.uploadSavePath);
                if (!dir.exists()) {
                    dir.mkdir();
                }

                File file = new File((receiverFile.uploadSavePath + File.separator + fileName).trim());
                if (!file.exists()) {
                    file.createNewFile();
                }

                receiverFile.outChannel = new FileOutputStream(file).getChannel();

                // 长度
                if (buffer.capacity() < 8) {
                    continue;
                }

                long fileLength = buffer.getLong();
                System.out.println("文件大小:" + fileLength);
                receiverFile.length = fileLength;

                // 文件内容
                if (buffer.capacity() < 0) {
                    continue;
                }

                receiverFile.outChannel.write(buffer);
            } else {
                // 文件内容
                receiverFile.outChannel.write(buffer);
            }

            buffer.clear();
        }

        if (len == -1) {
            receiverFile.outChannel.close();
        }
    }
}
复制代码

客户端代码不变

原文  https://juejin.im/post/5f0a70dde51d45348424f1d6
正文到此结束
Loading...