Reactor模型是基于事件驱动的模型,是高性能网络编程中非常重要概念,常用于解决多核服务器下的如何处理海量I/O问题。Java中大名鼎鼎的Netty网络编程框架的线程模型正是基于Reactor模型。
本文主要基于Doug Lea的文章 Scalable IO in Java 来介绍下Reactor模型。
本文所有内容均基于前人资料总结而成,如有侵权必删。
初学Java网络编程的时候,我们学过使用ServerSocket以及Socket来编码客户端与服务端的网络通讯程序。常见的服务端逻辑如下:
class Server implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); // or, single-threaded, or a thread pool } catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } } }
上述样例代码是一个典型的TPC(Thread Per Connection)模式,这种模式有几个显而易见的问题:
因此,这种TPC的网络模式只适合并发请求量较小的业务情景。
为了解决以上的问题,于是有了Reactor模型。
先来看看最简单的单线程版本Reactor模型。如下图所示:
配图是Doug Lea在他的文章中的图。从图里可以看到:
值得注意的是,这张图比较容易混淆的地方是Acceptor并不是一个单独的线程,而是Reactor线程内部的一个实例;且以上所有步骤都是在同一个Reactor线程内完成的。
因为只有一个线程,所以这种单线程的Reactor模型,并没有充分利用多核服务器的CPU资源,性能上甚至不如上边提到的TPC。
Java一般基于NIO的来实现Reactor模型,样例如下:
class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //附加一个Acceptor实例 sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //调用之前注册的callback对象 if (r != null) { r.run(); } } // inner class class Acceptor implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); if (channel != null) new Handler(selector, channel); } catch (IOException ex) { /* ... */ } } } }
class Handler implements Runnable { final SocketChannel channel; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // 一般来说先注册读时间 sk = channel.register(selector, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } void read() throws IOException { channel.read(input); if (inputIsComplete()) { // 进行其他业务处理 process(); state = SENDING; // 一般来说读完数据之后,重新注册写实践到Selector中 sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { channel.write(output); //write事件结束后, 关闭select key if (outputIsComplete()) { sk.cancel(); } } }
如上实现,Acceptor只是Reactor线程内部实例,相当于一个特殊的Handler。
单线程版本的主要问题之一是:I/O处理逻辑与Reactor耦合共用了同一个线程。于是,将I/O处理逻辑剥离出来,交由单独的线程池来运行。于是有如下设计:
实现上只需要变更Handler的代码,由一个全局的线程池来执行任务即可:
class Handler implements Runnable { // 全局公用的线程池 static PooledExecutor pool = new PooledExecutor(...); static final int PROCESSING = 3; // ... synchronized void read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } }
该版本存在问题是:acceptor与Reactor共用一个线程,如果有海量的请求或者连接时候需要身份认证等耗时操作,会阻塞Reactor线程,影响了Reactor线程监控I/O事件而成为性能瓶颈。
为了解决以上问题,多Reactor多线程版本,引入多个Reactor,部分Reactor线程专门负责处理请求事件,以应对海量的请求或者耗时的连接操作;而其他的Reactor线程负责监听其他I/O事件。于是有如下设计:
这个版本有如下特点:
Netty的服务端在初始化ServerBootstrap的时候可以指定parentGroup和childGroup两个线程池。这两个线程池正好对应承载mainReactor和subReactor的两个线程池。
// io.netty.bootstrap.ServerBootstrap @Override public ServerBootstrap group(EventLoopGroup group) { return group(group, group); } /** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
因此通过控制parentGroup和childGroup的线程池大小,可以实现以上各个版本的reactor模型。
NioEventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ServerHandlerInitializer());
NioEventLoopGroup eventGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(eventGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ServerHandlerInitializer());
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ServerHandlerInitializer());