Reactor模型是一种反应式编程方式,一般使用分发器根据所产生的事件不同选择不同的handler来处理,也就是所谓的事件驱动机制。本文会详细分析该模式的使用场景以及在Netty中的使用形式也就是Netty的线程模型,另外本文着重于原理,需要对Netty相关组件有一定了解。
Reactor使用一个多路分发器管理所有的事件,当事件被触发时,多路分发器会发出通知,Reactor线程则自动的执行对应的逻辑,这种模式就是Reactor模式。传统的BIO模式,一个连接请求对应一条线程处理,虽然使用池化技术复用线程,但是应对大量请求仍然需要大量的线程来做支持,在Reactor模式下大量的请求会被多路分发器所管理,当请求可以被处理时才会转到对应的线程去处理,对于大大提高了系统的负载能力。
Reactor模式有着众多不同的形式,理解不同的形式要理解后端是如何处理请求的:当后端接收到一个请求,简化下会做以下四个操作,所谓的不同形式是这些操作具体在哪个线程中处理,下面详细分析:
单线程Reactor模式意思是以上 1,2,3,4
操作都在一个Reactor线程中执行,虽然Reactor的读取写回不会造成阻塞,但是业务操作就很可能造成阻塞,并且单线程对CPU的利用顶多到100%,因此处理能力是有限的,对于小量连接情况下问题不大,对于大量链接情况下,单个NIO线程因处理能力优先会导致连接大量超时,由于超时重试机制,导致系统负载越来越重。
因此演变出多线程Reactor模式。
从图可以看出多线程Reactor模式在单线程模式下把 3
业务操作(decode,compute,encode)等放到单独的线程池中处理,保证Reactor线程不会阻塞,而Reactor线程仍然要处理 1,2,4
三个操作,因此处理能力受限于Reactor线程的处理,Reactor虽然是事件机制的异步处理,但是其承担连接负载量上来时仍然承受很大压力,因此主从Reactor模式被提了出来。
相比多线程Reactor模式,主从Reactor模式下把步骤 2 读取,4 写回
两个操作放在 从Reactor线程池 中执行,那么 主Reactor线程 ,也就是Acceptor只负责建立连接,建立之后将其注册到 从Reactor线程 中,因此大大提高了负载能力。这种模式的核心是 使用多个Reactor线程来分担连接 ,同时为了避免并发问题还要保证一个Reactor线程可以处理多个连接,一个连接只能注册一个Reactor线程上。在Netty中一般推荐使用这种模式,主Reactor线程为Netty中的Boss线程,从Reactor线程为Worker线程,另外使用业务线程池处理对应的业务逻辑,这三者构成了Netty的线程模型。
从上面对于Reactor模式的分析,近而理解Netty线程模型的设计,在Netty中为了实现Reactor模式抽象出几个角色:
java.nio.channels.Selector
Channel
的本质是对操作系统产生的Socket的映射,并且提供绑定到Selector多路分发器上的能力。
在这里可以顺便复习下基础知识,当一个请求传递过来时,到达的是数据链路层,然后IP层,再接着是传输层,在传输层这里被转换为对应的TCP/UDP协议,操作系统为了给应用提供统一的不同协议的封装操作,所以产生了Socket,换句话说Socket接口是TCP/IP网络的API接口函数。
Socket属于内核对象,因此应用层要使用时必须通过操作系统提供的接口访问,具体流程当建立socket连接时,操作系统会产生一个文件操作符给应用,应用拿到文件操作符之后相当于拿到了对应的socket,然后每次调用相应的函数,比如read,close等都需要把对应的文件描述符传递给操作系统,操作系统再定位到具体的socket完成调用。
而 Channel
实际上就是应用层对文件描述符的一个封装,然后利用其它机制做了更高一层的封装比如异步,多路复用等等。
EventLoop
字面意思是事件循环,对应图中的Reactor线程,以 NioEventLoop
为例,实现原理是 使用一个单线程,一个任务队列,一个多路复用器Selector,该线程不断地执行任务,并且监控多路复用器所产生的事件 ,由于 EventLoop
是单线程,因此在使用时往往要注意耗时操作,阻塞操作都不要放到 EventLoop
中,其适合处理耗时短并且简单的任务。
在Netty中推荐使用 主从Reactor模式
,因此最佳实践是Boss配置1个Reactor线程,因为服务端Socket监听端口只能被一个Socket所占有,并且连接的建立是十分迅速的,因此Boss这里不会有性能问题。Worker根据自己的业务配置多个Reactor线程,当Boss建立起连接之后,也就是得到了对应的 SocketChannel
,会把该Channel交给Worker线程处理,这里的原因是读取与写入往往是耗时操作,虽然Selector多路复用器是线程安全的设计,但是如果一个Selector管理大量的Channel,会造成自身任务繁重,一些处理不过来的Channel会超时断开或者重试,因此众多Worker的目的就是为了平衡Selector的压力。
由于 EventLoop
不能被阻塞,但是业务操作具有太多的不可控性,因此往往还会引入 ThreadPool
业务线程池来分担Worker的压力, ThreadPool
主要是处理用户的业务,此时Worker只负责处理读取,解码,编码,写回等简单高效的任务,进一步提高系统的负载能力,这也是主从Reactor模式的设计目的。
ServerBootstrap bootstrap = new ServerBootstrap(); // 初始化处理线程 EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(16); // 设置服务 bootstrap.group(boss, worker); // ...
ChannelPipeline
是聚合了 ChannelHandler
的管道,本质上是一个职责链路,在Worker线程中会对每一个Channel执行其所对应的pipeline链,完成整个生命周期。
我画了一个不太好看的图描述整个结构,其中蓝色表示 ChannelInboundHandler
,红色表示 ChannelOutboundHandler
,绿色则承担两个角色,而每个Handler又会使用 ChannelHandlerContext
封装起来,在 ChannelPipeline
中组装成双向链表。
一个请求被Accept之后要从Head开始流转所有的 ChannelInboundHandler
,然后再从Tail开始流转所有的 ChannelOutboundHandler
。
上述Netty关于主从Reactor模式的实现并没有包含业务线程池,在开发中由于业务的复杂性,往往要针对项目的业务维护一套业务线程池,好处是做业务隔离,不会由于单条业务出错而阻塞对应的Reactor线程,那么这个业务线程池该怎么做?
业务处理一般使用的是 ChannelInboundHandler
封装的一个处理逻辑,也就是上述Pipeline链中的 BizHandler
,以Dubbo的做法为例,Dubbo使用了一个 com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler
,该类会把对应的Worker产生的事件放入到业务线程池中处理,这样就完成了 Reactor
线程到 ThreadPool
业务线程池的转换。
public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { ..... } }
业务处理完成之后写回数据是在 com.alibaba.dubbo.remoting.transport.netty.NettyChannel
中进行,该写回实际上是委托Netty的 NioAcceptedSocketChannel
,然后封装一个任务放入到Worker中执行,此时就完成了从 ThreadPool
业务线程回到 Reactor
线程的切换。
public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 这里委托给Netty的NioAcceptedSocketChannel执行写回 ChannelFuture future = channel.write(message); if (sent) { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } ..... }