对于 NioEventLoopGroup
这个对象,在我的理解里面它就和 ThreadGroup
类似, NioEventLoopGroup
中有一堆 NioEventLoop
小弟, ThreadGroup
中有一堆 Thread
小弟,真正意义上干活的都是 NioEventLoop
和 Thread
这两个小弟。下面的文章大家可以类比下进行阅读,应该会很容易弄懂的。(本文基于netty-4.1.32.Final)
这里咱们可以从 NioEventLoopGroup
最简单的无参构造函数开始。
1 public NioEventLoopGroup() { 2 this(0); 3 } 复制代码
一步步往下走,可以发现最终调用到构造函数:
1 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, 2 final SelectStrategyFactory selectStrategyFactory) { 3 super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); 4 } 复制代码
参数说明:
io.netty.eventLoopThreads
系统环境变量,则优先考虑,否则设置成为 CPU核心数*2
。 null
。 SelectorProvider.provider()
。 SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
。 RejectedExecutionException
异常。 继续往下面走,调用父类 MultithreadEventLoopGroup
中的构造函数:
1 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { 2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); 3 } 复制代码
这里可以看到判断 nThreads == 0
后就会给其附上一个默认值。继续走,调用父类 MultithreadEventExecutorGroup
中的构造方法。
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { 2 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); 3 } 复制代码
这里有个关注的点, DefaultEventExecutorChooserFactory
。这是一个chooserFactory,用来生产 EventExecutorChooser
选择器的。而 EventExecutorChooser
的功能是用来选择哪个 EventExecutor
去执行咱们的任务。咱们从下面的代码中可以观察到 DefaultEventExecutorChooserFactory
一共给咱们提供了两种策略。
1 public EventExecutorChooser newChooser(EventExecutor[] executors) { 2 if (isPowerOfTwo(executors.length)) { 3 return new PowerOfTwoEventExecutorChooser(executors); 4 } else { 5 return new GenericEventExecutorChooser(executors); 6 } 7 } 复制代码
这里的策略也很简单:
PowerOfTwoEventExecutorChooser
这个选择器,因为这样可以采用位运算去获取执行任务的 EventExecutor
。 1 public EventExecutor next() { 2 return executors[idx.getAndIncrement() & executors.length - 1]; 3 } 复制代码
GenericEventExecutorChooser
选择器,这里采用的是取模的方式去获取执行任务的 EventExecutor
。 1 public EventExecutor next() { 2 return executors[Math.abs(idx.getAndIncrement() % executors.length)]; 3 } 复制代码
相比而言, 位运算的效率要比取模的效率高 ,所以咱们在自定义线程数的时候,最好设置成为2^n个线程数。
到达最终调用的函数
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 2 EventExecutorChooserFactory chooserFactory, Object... args) { 3 if (nThreads <= 0) { 4 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); 5 } 6 7 if (executor == null) { 8 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); 9 } 10 11 children = new EventExecutor[nThreads]; 12 13 for (int i = 0; i < nThreads; i ++) { 14 boolean success = false; 15 try { 16 children[i] = newChild(executor, args); 17 success = true; 18 } catch (Exception e) { 19 // TODO: Think about if this is a good exception type 20 throw new IllegalStateException("failed to create a child event loop", e); 21 } finally { 22 if (!success) { 23 for (int j = 0; j < i; j ++) { 24 //创建NioEventLoop失败后进行资源的一些释放 25 children[j].shutdownGracefully(); 26 } 27 28 for (int j = 0; j < i; j ++) { 29 EventExecutor e = children[j]; 30 try { 31 while (!e.isTerminated()) { 32 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 33 } 34 } catch (InterruptedException interrupted) { 35 // Let the caller handle the interruption. 36 Thread.currentThread().interrupt(); 37 break; 38 } 39 } 40 } 41 } 42 } 43 //这里可以去看下上面对于 DefaultEventExecutorChooserFactory的一些介绍 44 chooser = chooserFactory.newChooser(children); 45 46 final FutureListener<Object> terminationListener = new FutureListener<Object>() { 47 @Override 48 public void operationComplete(Future<Object> future) throws Exception { 49 if (terminatedChildren.incrementAndGet() == children.length) { 50 terminationFuture.setSuccess(null); 51 } 52 } 53 }; 54 55 for (EventExecutor e: children) { 56 // 给每一个成功创建的EventExecutor 绑定一个监听终止事件 57 e.terminationFuture().addListener(terminationListener); 58 } 59 60 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); 61 Collections.addAll(childrenSet, children); 62 // 弄一个只读的EventExecutor数组,方便后面快速迭代,不会抛出并发修改异常 63 readonlyChildren = Collections.unmodifiableSet(childrenSet); 64 } 复制代码
从上面的代码可以观察到,等了很久的executor 在这里终于给其赋值了,其值为 ThreadPerTaskExecutor
的一个实例对象,这一块的初始化赋值都是很简单的,干活调用的是如下方法:
1 public void execute(Runnable command) { 2 threadFactory.newThread(command).start(); 3 } 复制代码
对这一块不是很了解的可以去查阅下线程池有关的资料,咱们重点关注一下 newChild
这个方法,可以说是上面整个流程中的重点:
newChild
这个方法在 NioEventLoopGroup
中被重写了:
1 protected EventLoop newChild(Executor executor, Object... args) throws Exception { 2 return new NioEventLoop(this, executor, (SelectorProvider) args[0], 3 ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); 4 } 复制代码
细心的小伙伴可以观察到,这里有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler这个三个参数,实际上就是本文最开始初始化的三个实例对象(可以翻阅到顶部查看一下)。
继续往下走流程:
1 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, 2 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { 3 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); 4 if (selectorProvider == null) { 5 throw new NullPointerException("selectorProvider"); 6 } 7 if (strategy == null) { 8 throw new NullPointerException("selectStrategy"); 9 } 10 provider = selectorProvider; 11 final SelectorTuple selectorTuple = openSelector(); 12 selector = selectorTuple.selector; 13 unwrappedSelector = selectorTuple.unwrappedSelector; 14 selectStrategy = strategy; 15 } 复制代码
在上面的代码片段中除了调用父类的构造器之外就进行了参数的判空和简单的赋值。这里 openSelector
方法调用后返回 SelectorTuple
实例主要是为了能同时得到包装前后的 selector
与 unwrappedSelector
。
1 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, 2 boolean addTaskWakesUp, int maxPendingTasks, 3 RejectedExecutionHandler rejectedHandler) { 4 super(parent); 5 this.addTaskWakesUp = addTaskWakesUp; 6 this.maxPendingTasks = Math.max(16, maxPendingTasks); 7 this.executor = ObjectUtil.checkNotNull(executor, "executor"); 8 taskQueue = newTaskQueue(this.maxPendingTasks); 9 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); 10 } 复制代码
这里会有一个 taskQueue
队列的初始化( Queue<Runnable> taskQueue
),看名字就知道,这个队列里面放着的是咱们要去执行的任务。这里的初始化方法 newTaskQueue
在 NioEventLoop
中重写了的。具体如下:
1 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { 2 // This event loop never calls takeTask() 3 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() 4 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); 5 } 复制代码
这里生成的是一个 MPSC队列(Multi Producer Single Consumer) ,这是一个多生产者单消费的无锁队列,支持并发。从字面意思上就可以观察到这个队列效率应该是蛮高的。这里的 maxPendingTasks
值为 Integer.MAX_VALUE
。然后最终生成的是 MpscUnboundedArrayQueue
这样一个无边界的队列。
这样 newChild
这个方法到这里就走完了。
简单介绍下这个环节,在上面的创建 NioEventLoopGroup
有个环节是给每个 NioEventLoop
儿子绑定一个terminationListener监听事件
1 for (EventExecutor e: children) { 2 e.terminationFuture().addListener(terminationListener); 3 } 复制代码
这个事件的回调方法是:
1 @Override 2 public void operationComplete(Future<Object> future) throws Exception { 3 if (terminatedChildren.incrementAndGet() == children.length) { 4 terminationFuture.setSuccess(null); 5 } 6 } 复制代码
在每一个 NioEventLoop
关闭后,就会回调这个方法,然后给 NioEventLoopGroup
实例中的 terminatedChildren
字段自增1,并与初始化成功的 NioEventLoop
的总个数进行比较,如果
terminatedChildren
的值与 NioEventLoop
的总个数相等,则调用 bossGroup.terminationFuture().get()
方法就不会阻塞,并正常返回 null
。
同样, future.channel().closeFuture().sync()
这段代码也将不会阻塞住了,调用 sync.get()
也会返回 null
。
下面给一段测试代码,完整示例大家可以到我的 github 中去获取:
上面的代码只是一个简单的测试,后面还有别的发现的话会继续在 github 中与大家一起分享~