原文: wangwei.one/posts/netty…
前面 ,我们分析了Netty中的Channel组件,本篇我们来介绍一下与Channel关联的另一个核心的组件 —— EventLoop 。
Netty版本:4.1.30
EventLoop定义了Netty的核心抽象,用于处理网络连接生命周期中所有发生的事件。
我们先来从一个比较高的视角来了解一下Channels、Thread、EventLoops、EventLoopGroups之间的关系。
上图是表示了拥有4个EventLoop的EventLoopGroup处理IO的流程图。它们之间的关系如下:
下图是Netty EventLoop相关类的UML图。从中我们可以看到EventLoop相关的类都是实现了 java.util.concurrent
包中的 ExecutorService 接口。我们可以直接将任务(Runable 或 Callable) 提交给EventLoop去立即执行或定时执行。
例如,使用EventLoop去执行定时任务,样例代码:
public static void scheduleViaEventLoop() { Channel ch = new NioSocketChannel(); ScheduledFuture<?> future = ch.eventLoop().schedule( () -> System.out.println("60 seconds later"), 60, TimeUnit.SECONDS); } 复制代码
Netty线程模型的高性能主要取决于当前所执行线程的身份的确定。一个线程提交到EventLoop执行的流程如下:
其中,Netty中的每一个EventLoop都有它自己的任务队列,并且和其他的EventLoop的任务队列独立开来。
服务于Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根据不同的传输实现,EventLoop的创建和分配方式也不同。
在NIO传输方式中,使用尽可能少的EventLoop就可以服务多个Channel。如图所示,EventLoopGroup采用顺序循环的方式负责为每一个新创建的Channel分配EventLoop,每一个EventLoop会被分配给多个Channels。
一旦一个Channel被分配给了一个EventLoop,则这个Channel的生命周期内,只会绑定这个EventLoop。这就让我们在ChannelHandler的实现省去了对线程安全和同步问题的担心。
与NIO方式的不同在于,一个EventLoop只会服务于一个Channel。
初步了解了 EventLoop 以及 EventLoopGroup 的工作机制,接下来我们以 NioEventLoopGroup 为例,来深入分析 NioEventLoopGroup 是如何创建的,又是如何启动的,它的内部执行逻辑又是怎样的等等问题。
我们从 NioEventLoopGroup 的构造函数开始分析:
EventLoopGroup acceptorEventLoopGroup = new NioEventLoopGroup(1); 复制代码
NioEventLoopGroup构造函数会调用到父类 MultithreadEventLoopGroup 的构造函数,默认情况下,EventLoop的数量 = 处理器数量 x 2:
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); private static final int DEFAULT_EVENT_LOOP_THREADS; // 默认情况下,EventLoop的数量 = 处理器数量 x 2 static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } ... } 复制代码
继续调用父类,会调用到 MultithreadEventExecutorGroup 的构造器,主要做三件事情:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 创建任务执行器 ThreadPerTaskExecutor if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 创建 EventExecutor 数组 children = new EventExecutor[nThreads]; // 通过for循环创建数量为 nThreads 个的 EventLoop for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 调用 newChild 接口 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 创建选择器 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } 复制代码
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } 复制代码
线程任务执行器 ThreadPerTaskExecutor 源码如下,具体的任务都由 ThreadFactory 去执行:
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } // 使用 threadFactory 执行任务 @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } } 复制代码
来看看 newDefaultThreadFactory 方法:
protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); } 复制代码
接下来看看 DefaultThreadFactory 这个类,实现了 ThreadFactory 接口,我们可以了解到:
public class DefaultThreadFactory implements ThreadFactory { // 线程池ID编号自增器 private static final AtomicInteger poolId = new AtomicInteger(); // 线程ID自增器 private final AtomicInteger nextId = new AtomicInteger(); // 线程名称前缀 private final String prefix; // 是否为守护进程 private final boolean daemon; // 线程优先级 private final int priority; // 线程组 protected final ThreadGroup threadGroup; public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); } ... // 获取线程名,返回结果:nioEventLoopGroup public static String toPoolName(Class<?> poolType) { if (poolType == null) { throw new NullPointerException("poolType"); } String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); } else { return poolName; } } } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } // nioEventLoopGroup-2- prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } @Override public Thread newThread(Runnable r) { // 创建新线程 nioEventLoopGroup-2-1 Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } // 创建新线程 FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } } 复制代码
继续从 MultithreadEventExecutorGroup 构造器开始,创建完任务执行器 ThreadPerTaskExecutor 之后,进入for循环,开始创建 NioEventLoop:
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 创建 nioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } ... } 复制代码
NioEventLoopGroup类中的 newChild()
方法:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } 复制代码
NioEventLoop 构造器:
public final class NioEventLoop extends SingleThreadEventLoop{ ... NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // 调用父类 SingleThreadEventLoop 构造器 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } // 设置 selectorProvider provider = selectorProvider; // 获取 SelectorTuple 对象,里面封装了原生的selector和优化过的selector final SelectorTuple selectorTuple = openSelector(); // 设置优化过的selector selector = selectorTuple.selector; // 设置原生的selector unwrappedSelector = selectorTuple.unwrappedSelector; // 设置select策略 selectStrategy = strategy; } ... } 复制代码
接下来我们看看 获取多路复用选择器 方法—— openSelector() ,
// selectKey 优化选项flag private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); private SelectorTuple openSelector() { // JDK原生的selector final Selector unwrappedSelector; try { // 通过 SelectorProvider 创建获得selector unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } // 如果不优化,则直接返回 if (DISABLE_KEYSET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } // 通过反射创建 sun.nio.ch.SelectorImpl 对象 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); // 如果 maybeSelectorImplClass 不是 selector 的一个实现,则直接返回原生的Selector if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. // 确保当前的选择器实现是我们可以检测的 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } // maybeSelectorImplClass 是selector的实现,则转化为 selector 实现类 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; // 创建新的 SelectionKey 集合 SelectedSelectionKeySet,内部采用的是 SelectionKey 数组的形 // 式,而非 set 集合 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 selectedKeys Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); // 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 publicSelectedKeys Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } // 设置字段 selectedKeys Accessible 为true Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } // 设置字段 publicSelectedKeys Accessible 为true cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } // 设置 SelectedSelectionKeySet selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); // 返回包含了原生selector和优化过的selector的SelectorTuple return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); } 复制代码
优化后的 SelectedSelectionKeySet 对象,内部采用 SelectionKey 数组的形式:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } // 使用数组,来替代HashSet,可以降低时间复杂度为O(1) @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; } @Override public boolean remove(Object o) { return false; } @Override public boolean contains(Object o) { return false; } @Override public int size() { return size; } @Override public Iterator<SelectionKey> iterator() { return new Iterator<SelectionKey>() { private int idx; @Override public boolean hasNext() { return idx < size; } @Override public SelectionKey next() { if (!hasNext()) { throw new NoSuchElementException(); } return keys[idx++]; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } void reset() { reset(0); } void reset(int start) { Arrays.fill(keys, start, size, null); size = 0; } // 扩容 private void increaseCapacity() { SelectionKey[] newKeys = new SelectionKey[keys.length << 1]; System.arraycopy(keys, 0, newKeys, 0, size); keys = newKeys; } } 复制代码
SingleThreadEventLoop 构造器
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { ... protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { // 调用 SingleThreadEventExecutor 构造器 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); } ... } 复制代码
SingleThreadEventExecutor 构造器,主要做两件事情:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ... protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); // 设置线程任务执行器 this.executor = ObjectUtil.checkNotNull(executor, "executor"); // 设置任务队列 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } ... } 复制代码
NioEventLoop 中对 newTaskQueue 接口的实现,返回的是 JCTools 工具包 Mpsc 队列。后面我们写文章单独介绍 JCTools 中的相关队列。
Mpsc:Multi Producer Single Consumer (Lock less, bounded and unbounded)
多个生产者对单个消费者(无锁、有界和无界都有实现)
public final class NioEventLoop extends SingleThreadEventLoop { ... @Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); } ... } 复制代码
接下来,我们看看 MultithreadEventExecutorGroup 构造器的最后一个部分内容,创建线程执行选择器chooser,它的主要作用就是 EventLoopGroup 用于从 EventLoop 数组中选择一个 EventLoop 去执行任务。
// 创建选择器 chooser = chooserFactory.newChooser(children); 复制代码
EventLoopGroup 中定义的 next()
接口:
public interface EventLoopGroup extends EventExecutorGroup { ... // 选择下一个 EventLoop 用于执行任务 @Override EventLoop next(); ... } 复制代码
MultithreadEventExecutorGroup 中对 next() 的实现:
@Override public EventExecutor next() { // 调用 DefaultEventExecutorChooserFactory 中的next() return chooser.next(); } 复制代码
DefaultEventExecutorChooserFactory 对于如何从数组中选择任务执行器,也做了巧妙的优化。
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } // 判断线程任务执行的个数是否为 2 的幂次方。e.g: 2、4、8、16 private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } // 幂次方选择器 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { // 通过二级制进行 & 运算,效率更高 return executors[idx.getAndIncrement() & executors.length - 1]; } } // 普通选择器 private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { // 按照最普通的取模的方式从index=0开始向后开始选择 return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } } 复制代码
通过本节内容,我们了解到了EventLoop与EventLoopGroup的基本原理,EventLoopGroup与EventLoop的创建过程: