Reactor pattern(反应器模式)是用于处理通过一个或多个输入同时传递给服务器的服务请求的事件处理模式。服务处理程序复用传入的请求,并将它们同步分派给关联的handler。 关键几点:
负责响应事件,将事件分发绑定了该事件的Handler处理。对应netty 的NioEventLoop.run(),processSelectedKeys()。
事件处理器,绑定了某类事件,负责执行对应事件的任务对事件进行处理。对应netty的IdleStateHandler等。
Acceptor属于handler中的一种,因为更加特殊,独立出来讲,是reactor的事件接收类,负责初始化selector和接收缓冲队列。对应netty的ServerBootstrapAcceptor。
Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的事件循环逻辑。 mainReactor可以只有一个,但subReactor一般会有多个。mainReacto线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信。 源码解析
bossGroup = new NioEventLoopGroup(); workGroup = new NioEventLoopGroup(4); 复制代码
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { children = new SingleThreadEventExecutor[nThreads]; ... for (int i = 0; i < nThreads; i ++) { ... children[i] = newChild(threadFactory, args); ... } } 复制代码
@Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); } 复制代码
在这里创建了mainReactor和subReactor线程池,并且创建了eventLoop线程
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); } 复制代码
每个eventLoop线程都会有自身的selector,这里的eventLoop线程还未启动,后面启动之后,会执行run()里面的selector.select。
ChannelFuture regFuture = group().register(channel); 这里的group()是bossGroup
@Override public ChannelFuture register(Channel channel) { return next().register(channel); } 复制代码
next()执行
@Override public EventLoop next() { return (EventLoop) super.next(); } 复制代码
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; } } 复制代码
从线程池中取出第一个eventLoop
@Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { ... channel.unsafe().register(this, promise); return promise; } 复制代码
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { } 复制代码
这里将mainReactor的eventLoop和服务器的NioServerSocketChannel绑定。 因为刚开始启动是main线程,执行eventLoop.execute,在这里mainReactor只启动了一个线程。
@Override public void execute(Runnable task) { boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); ... } ... } 复制代码
在execute中执行startThread(),正式启动mainReactor线程循环,并且将register0(promise)这个Task加入taskQueue中,让mainReactor循环执行。
private void register0(ChannelPromise promise) { doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } 复制代码
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); ... } } 复制代码
这里将mainReactor 中eventLoop的selector注册一个为0的操作监听位,并将服务器的NioServerSocketChannel绑定到mainSubReactor线程上 在doBind()-->doBind0()-->channel.bind()-->…-->next.invokeBind()-->HeadContext. Bind()-->unsafe.bind()-->pipeline.fireChannelActive()-->channel.read()-->…-->doBeginRead()中修改为OP_ACCEPT(16)操作监听位。
@Override protected void doBeginRead() throws Exception { … final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } } 复制代码
自此,mainReactor的eventLoop从run开始循环执行selector.select。 注:readInterestOp的值来自于创建NioServerSocketChannel的构造函数 复制代码
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } 复制代码
在收到客户端连接后,将会在ServerBootstrapAcceptor中把客户端的Channel注册在subReactor线程上,并将这个channel绑定到subReactor线程的selector上,监听客户端channel的OP_READ事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } 复制代码
当监听到客户端连接,执行服务器AbstractNioUnsafe的read();
@Override public void read() { ... int localRead = doReadMessages(readBuf); ... for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } ... pipeline.fireChannelReadComplete(); ... } 复制代码
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); ... buf.add(new NioSocketChannel(this, ch)); ... } 复制代码
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } 复制代码
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } 复制代码
设置客户端channel监听位的值为OP_READ(1)
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } } 复制代码
ServerBootstrapAcceptor不仅将subReactor绑定客户端channel,还为客户端channel进行一些参数的初始化
@Override public ChannelFuture register(Channel channel) { return next().register(channel); } 复制代码
和上面的register一样,只是将mainReactor线程池改为了subReactor线程池。 在这里将从subReactor线程池中取一个线程的selector与客户端channel绑定,并监听该客户端0事件。
@Override public ChannelPipeline fireChannelReadComplete() { head.fireChannelReadComplete(); if (channel.config().isAutoRead()) { read(); } return this; } 复制代码
read()-->tail.read()-->next.invokeRead()-->HeadContext. read()-->…--> doBeginRead()
@Override protected void doBeginRead() throws Exception { ... final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } } 复制代码
在这里将监听位改为OP_READ(1)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); ... } 复制代码
进到NioByteUnsafe的read()方法
@Override public final void read() { ... final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); ... byteBuf = allocHandle.allocate(allocator); ... pipeline.fireChannelRead(byteBuf); ... } 复制代码
@Override public ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this; } 复制代码
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } 复制代码
public class InBoundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerB: " + msg); super.channelRead(ctx, msg); } } 复制代码
在这里将接收到客户端消息处理
Reactor线程池有多少个,就会创建多少个selector, mainReactor的eventLoop会与服务器的channel绑定,并只关注服务器channel的ACCEPT事件,subReactor的eventLoop会与客户端的channel绑定,并只关注客户端channel的READ事件。
mainReactor和subReactor循环各自的selector,mainReactor会循环ACCEPT事件的selector,subReactor会循环READ事件的selector,mainReactor接受到客户端连接后,会执行ServerBootstrapAcceptor的channelRead方法,将客户端连接与subReactor绑定。