转载

Netty系列文章之服务端启动分析

本文主要分析 Netty服务端的启动,以便对Netty框架有一个基本的认识,我用的Netty版本是 netty-4.1.29 ,之前的文章 Netty 系列文章之基本组件概览 对Netty的基本组件做了一个简单的介绍,算是对本文分析Netty服务端的启动做一个基础铺垫

服务端代码

该源码出自 netty官方提供的 服务端demo,详细地址: github.com/netty/netty…

我做了一点小改动,代码如下:

public final class EchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
复制代码

服务端启动流程概览:

ServerBootStrap
NioEventLoopGroup
Channel
ChannelHandler

在之前的文章我就提到过, ServerBootstrap 是Netty服务端的启动辅助类,帮助Netty服务端完成初始化,下面我将深入代码,仔细分析Netty服务端启动过程中各组件的创建与初始化

Channel的创建和初始化过程

Channel是Netty的网络操作抽象类,对应于JDK底层的 Socket,Netty服务端的Channel类型是 NioServerSocketChannel 。下面来分析 NioServerSocketChannel 的创建和初始化

NioServerSocketChannel的创建

NioServerSocketChannel 的创建实际上是从 ServerBootStrapbind() 方法开始的,进入 bind() 源码分析( AbstractBootstrap 的bind()方法):

......
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
......
/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    }
    ......
}
复制代码

在源码里注意到一个 initAndRegister() 方法,这个方法就负责 NioServerSocketChannel 的初始化和注册操作,走进 initAndRegister() 方法,如下图所示:

Netty系列文章之服务端启动分析

从上图可以看出,源码里是调用 channelFactory.newChannel() 来创建 channel , 走进 ChannelFactory 发现该接口被 @Deprecated 注解标注了,说明是一个过时的接口:

@Deprecated
public interface ChannelFactory<T extends Channel> {
    /**
     * Creates a new channel.
     */
    T newChannel();
}
复制代码

我用的Netty版本是 Netty-4.1.29 ,其Netty API 文档 中介绍 io.netty.bootstrap.ChannelFactory 提到用 io.netty.channel.ChannelFactory 代替。

这里 ChannelFactory 只是一个工厂接口,真正创建 Channel 的是 ReflectiveChannelFactory 类,它是 ChannelFactory 的一个重要实现类,该类通过反射方式创建 Channel ,源代码如下:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}
复制代码

其中 newChannel() 方法通过 clazz.getConstructor().newInstance() 来创建 Channel,即通过反射方式来创建 Channel,而这个 clazz 就是 通过 ServerBootStrapchannel 方法传入的,最开始的服务端代码传入的 NioServerSocketChannel ,所以对应通过反射创建了 NioServerSocketChannel ,并且 ChannelFactory 的初始化也是在该方法中进行的,代码如下:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
复制代码

到此, NioServerSocketChannel 的创建过程大体结束,再次总结一下:

  • ServerBootstrap 中的 ChannelFactory 的实现是 ReflectiveChannelFactory
  • 生成的 Channel的具体类型是 NioServerSocketChannel
  • Channel 的实例化过程其实就是调用 ChannelFactory.newChannel 方法,实际上是通过反射方式进行创建的

NioServerSocketChanel的实例化过程

在前面的分析中,NioServerSocketChannel是通过反射创建的,它的构造方法如下:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}
/**
 * Create a new instance
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
复制代码

方法newSocket利用 provider.openServerSocketChannel() 生成Nio中的 ServerSocketChannel 对象,然后调用重载的构造器:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码

该构造器中,调用了父类的构造器,传入的参数是 SelectionKey.OP_ACCEPT ,这个参数对于有Java NIO编程经验的人来说应该非常熟悉,在Java NIO中服务端要监听客户端的连接请求,就向多路复用器 Selector 注册 SelectionKey.OP_ACCEPT 客户端连接事件,而Netty又是基于 Java NIO开发的,这里可见一斑。接着进入父类构造器:

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
复制代码

然后:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
复制代码

设置当前 ServerSocketChannel为非阻塞通道,然后再次进入父类构造器 AbstractChannel(Channel parent) :

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
复制代码
  • parent属性设置为null
  • 初始化unsafe,用来负责底层的connect,register,read和write操作
  • 初始化pipeline,在实例化一个Channel的同时,当有事件发生的时候,pipeline负责调用相应的Handler进行处理

关于 unsafe

Netty中的 unsafe 不是JDK中的 sun.misc.Unsafe ,该 unsafe 实际是封装了 Java 底层 Socket的操作,因此是沟通 Netty上层和Java 底层重要的桥梁。

ChannelPipeline的初始化

每个Channel都有对应的 ChannelPipeline ,当一个Channel被创建时,对应的ChannelPipeline也会自动创建,在上面分析 NioServerSocketChannel 实例化过程就看到,在其父类构造器中,有初始化一个 pipeline ,对应源代码如下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    .....
    private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;
    ......
    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    ......
        /**
     * Returns a new {@link DefaultChannelPipeline} instance.
     */
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
复制代码

从上面代码看到,pipeline最终被初始化为一个 DefaultChannelPipelineDefaultChannelPipelineChannelPipeline 的实现类,进入它的构造方法,如下:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
复制代码

该构造方法有一个参数 channel ,这个 channel 就是我们之前传入的 NioServerSocketChannel 。关于该构造器其他方法以及 ChannelPipeline 更详细的介绍将在后续文章分析。

NioEventLoopGroup

在我们最开始的Netty服务端代码中初始化了两个 NioEventLoopGroup ,即一个处理客户端连接请求的线程池—— bossGroup ,一个处理客户端读写操作的线程池—— workerGroup

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码

NioEventLoopGroup 的类继承结构图如下所示:

Netty系列文章之服务端启动分析

从图中可以看到, NioEventLoopGroup 实现了 Executor 接口, Executor 框架可以用来创建线程池的,也是一个线程执行器。关于 Executor 框架更加详细的介绍请参阅《Java并发编程的艺术》

NioEventLoopGroup

看了 NioEventLoopGroup 的类继承结构,下面来分析一下它的初始化过程,构造器源代码如下:

......
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
......
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
......
public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
复制代码

上面几个重载构造器其实没做啥,最终调用父类 MultithreadEventLoopGroup 的构造器,

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
复制代码

这里需要注意的是如果我们传入的线程数 nThreads 是 0 的话,那么Netty将会为我们设置默认的线程数 DEFAULT_EVENT_LOOP_THREADS ,这个默认值是 处理器核心数 * 2 ,如下:

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}
复制代码

bossGroup 这个线程池我们传入的 nThread 是1,实际上在 bossGroup 中只会有一个线程用于处理客户端连接请求,所以这里设置为1,而不使用默认的线程数,至于为什么只用一个线程处理连接请求还需用线程池,在Stack Overflow有相关问题的讨论。

然后回来再次进入父类 MultithreadEventExecutorGroup 的构造器,

......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
复制代码

MultithreadEventExecutorGroup 管理着 eventLoop 的生命周期,它有几个变量:

  • children : EventExecutor数组,保存eventloop
  • chooser : 线程选择器,从children中选取一个 eventloop的策略

MultithreadEventExecutorGroup 的构造器主要分为以下几个步骤:

  • 创建线程执行器—— ThreadPerTaskExecutor
  • 调用 newChild 方法初始化 children 数组
  • 创建线程选择器—— chooser

创建ThreadPerTaskExecutor

我们一开始初始化 NioEventLoopGroup ,并没有传入 Executor 参数:

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
复制代码

所以到父类 MultithreadEventExecutorGroup 构造器时,executor 为null, 然后执行:

if (executor == null) {
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
复制代码

ThreadPerTaskExecutor是一个线程执行器,它实现了 Executor 接口,

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
复制代码

ThreadPerTaskExecutor 实现了 execute 方法,每次通过调用 execute 方法执行线程任务

调用 newChild 方法初始化 children 数组

children[i] = newChild(executor, args);
复制代码

在一个for循环里,nThread线程数是总的循环次数,通过 newChild方法初始化 EventExecutor数组的每个元素,而 newChild 方法如下:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
复制代码

每次循环通过newChild实例化一个 NioEventLoop 对象。

创建线程选择器——chooser

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}
......
private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}
复制代码

根据 EventExecutor[] 数组的大小,采用不同策略初始化一个 Chooser,如果大小为 2的幂次方则采用 PowerOfTwoEventExecutorChooser ,否则使用 GenericEventExecutorChooser 。 无论使用哪个 chooser,它们的功能都是一样的,即从 EventExecutor[] 数组中,这里也就是 NioEventLoop 数组中,选择一个合适的 NioEventLoop

NioEventLoop的初始化

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
复制代码

从前面的分析就可以看到,通过 newChild方法初始化 NioEventLoopGroup 中的 NioEventLoop ,下面来看下 NioEventLoop 的构造方法是怎样的:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
复制代码

之前在 NioEventLoopGroup 的构造器中通过 SelectorProvider.provider() 创建了一个 SelectorProvider ,这里传递给了NioEventLoop中的provider,而 NioEventLoop 又通过 openSelector() 方法获取一个 selector对象,实际上是通过 provideropenSelector 方法。这不就是 对应Java NIO中的创建多路复用器 selector。(这里只是简单阐述NioEventLoop的构造方法,后续文章会对NioEventLoop做更加详细的分析)

Channel的注册过程

前面已经介绍了 Channel的创建和初始化过程,是在 initAndRegister 方法中进行的,这个方法里还会将初始化好的 channel注册到 EventLoop 线程中去

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
       .....
    }

    ChannelFuture regFuture = config().group().register(channel);
    ......
}
复制代码

调用 config().group().register 方法将 channel注册到 EventLoopGroup 中去,其目的就是 为了实现NIO中把ServerSocketChannel注册到 Selector中去,这样就是可以实现client请求的监听 ,代码如下:

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
......
public EventLoop next() {
    return (EventLoop) super.next();
}
复制代码

父类MultithreadEventExecutorGroup的next()方法,next方法使用 chooser策略从 EventExecutor[] 数组中选择一个 SingleThreadEventLoop

public EventExecutor next() {
    return chooser.next();
}
.....
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}
复制代码

然后再执行 SingleThreadEventLoopregister() 注册方法:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
...
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
复制代码

上面代码调用了 unsafe的register方法,具体是 AbstractUnsafe.register ,而unsafe主要用于实现底层的 rergister,read,write等操作。该 register 方法是:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ......
        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            ......
        }
    }
复制代码

将 eventLoop 赋值给 Channel 的 eventLoop 属性,然后又调用了 register0()方法:

private void register0(ChannelPromise promise) {
        try {
            ......
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            ......
        }
    }

复制代码

上面有个关键方法就是 doRegister() , doRegister 才是最终Nio的注册方法:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
复制代码

通过 javaChannel().register(eventLoop().unwrappedSelector(), 0, this) 将 Channel对应的Java NIO ServerSocketChannel注册到 EventLoop 中的Selector上,最终完成了channel向eventLoop的注册过程。

这里总结下 Channel注册过程中函数调用链: AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister()

添加 ChannelHandler

在之前的 initAndRegister() 方法里,里面有个 init() 方法,如下:

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

复制代码

init() 方法中设置服务端自定义的 ChannelOptions,ChannelAttrs 属性和为服务端Channel创建出来的新连接的Channel设置的自定义属性 ChildOptions,ChildAttrs ,这里就不多叙述设置参数的问题了,重点关注 pipelineaddLast 方法,该方法就是添加用于处理出站和入站数据流的 ChannelHandler,而 pipeline 是从 channel 中获取的,之前分析过当创建 channel 时会自动创建一个对应的 channelPipeline

至于 ChannelInitializer 又是什么,来看下它的类继承结构图就知道了:

Netty系列文章之服务端启动分析

ChannelInitializer 是一个抽象类,实现了 ChannelHandler 接口,它有一个抽象方法 initChannel ,上面代码实现了该方法并且添加了 bootstrap 的handler,逻辑如下:

.....
//1
ChannelHandler handler = config.handler();
if (handler != null) {
    pipeline.addLast(handler);
}
......
//2
public final ChannelHandler handler() {
    return bootstrap.handler();
}
......
//3
final ChannelHandler handler() {
    return handler;
}

复制代码

initChannel 添加的 Handler 就是我们服务端代码中 serverbootstrap 设置的 handler ,如下:

b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) {
                 ChannelPipeline p = ch.pipeline();
                 //p.addLast(new LoggingHandler(LogLevel.INFO));
                 p.addLast(serverHandler);
             }
         });
复制代码

示例代码设置的handler为 LoggingHandler ,用于处理日志,这里不细说。上面的 initChannel 方法可以添加 Handler ,这里的 serverbootstrap 启动类还增加了 childHandler 方法,也是用来添加 handler,只不过是向已经连接的 channel客户端的 channnelpipeline 添加 handler

serverbootstrap.handler() 设置的 handler 在初始化就会执行,而 serverbootstrap.childHandler() 设置的 childHandler 在客户端连接成功才会执行

小结

由于自身知识与经验有限,对Netty的服务端启动源码分析得不是很全面,在此过程中也参考了一些大佬的Netty源码分析文章,本文如有错误之处,欢迎指出。

参考资料 & 鸣谢

  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)
  • Netty源码分析之服务启动
  • Netty In Action
原文  https://juejin.im/post/5ba7477ae51d450e763309a5
正文到此结束
Loading...