转载

Netty 服务端启动流程-I

Netty的主从Reactor多线程模型,通常都会创建两个EventLoopGroup,分别作为主从线程池:

bossGroup:主要处理accept事件,之后将建立的客户端连接注册到workerGroup

workerGroup:负责处理I/O事件

Netty服务端的启动流程:

  • 初始化EventLoopGroup
  • 创建服务端启动器-ServerBootStrap
  • 绑定监听端口,并等待绑定完成
  • 阻塞主线程,保持服务端的启动

I-EventLoopGrouop的初始化

图示:

Netty 服务端启动流程-I
// 如果 不指定线程数,将使用默认线程数:2倍的CPU核数
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码

主要类调用:

NioEventLoopGroup

-->MultithreadEventLoopGroup

-->MultithreadEventExecutorGroup

NioEventLoopGroup

其构造方法主要调用父类MultithreadEventLoopGroup的构造方法

// 默认的 构造方法,在这里 默认指定了 线程数0
public NioEventLoopGroup() {
    this(0);
}
// 最终调用的 构造方法
public NioEventLoopGroup(int nThreads, Executor executor,  
    final SelectorProvider selectorProvider,
    final SelectStrategyFactory selectStrategyFactory) {  
    
    // 调用父类构造方法 传递了各种参数                         
    super(nThreads, executor, selectorProvider, selectStrategyFactory,  
        RejectedExecutionHandlers.reject());
}

nThreads:线程数
executor: JDK的executor(线程池,用来创建线程),默认为null  
selectorProvider: SelectorProvider, 用来打开一个 Selector  
selectStrategyFactory: Select 策略工厂,默认为DefaultSelectStrategyFactory.INSTANCE  
rejectedExecutionHandlers:任务拒绝处理器
复制代码

MultithreadEventLoopGroup

其构造方法主要调用父类MultithreadEventLoopGroup的构造方法,在这个类中指定了默认线程数;如果传入的线程数为0,则使用默认线程数

// 构造方法,重新指定 线程数
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // 直接调用 父类构造方法
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

// 默认线程数
private static final int DEFAULT_EVENT_LOOP_THREADS;
// 静态初始化
// 1.先获取配置io.netty.eventLoopThreads 指定的线程数
// 2.如果没有获取到,则返回传入的默认值 NettyRuntime.availableProcessors() * 2)
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}
复制代码

MultithreadEventExecutorGroup

在这个类中,完成了EventLoopGroup以及EventLoop的初始化流程。但EventLoop还并为启动;

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
    Object... args) {
    // 调用 本类构造方法;传递了 事件执行器选择器工厂
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

复制代码

最终调用的构造方法,来开始初始化EventLoopGroup,主要流程:

  1. 校验executor,如果为null,初始化一个线程池Executor
  2. 新建EventExecutor数组,大小为 nThreads
  3. 初始化 每一个EventExecutor,这里用到了NioEventLoopGroup中的newChild方法
  4. 根据 EventExecutor数组的大小,使用事件执行器选择器工厂初始化事件执行器选择器
  5. 为每一个EventExecutor 添加终结监听器,只有全部的EventExecutor关闭了,才算关闭成功
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    // step 1
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // step 2
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // step 3
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 如果有一个没有创建成功,则关闭之前所有成功创建的EventLoop
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                             e.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // step 4    
    chooser = chooserFactory.newChooser(children);
    // 创建 监听器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    // step 5
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
    // 设置 只读
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
复制代码

newChild方法

调用的 NioEventLoopGroup类中的newChild方法,初始化EventLoop,也就是NioEventLoop

原文  https://juejin.im/post/5eec606de51d4573ff27187e
正文到此结束
Loading...