上文说到NioEventLoop的run方法可以分为3个步骤:
其中步骤1已在上一节讲述,这里接着讲述下面2个步骤
首先看一下在步骤2和步骤3的主干代码
final int ioRatio = this.ioRatio; // 将所有任务执行完 if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { // 记录IO事件消耗的时间,然后按比例处理分配时间处理非IO任务 final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; // ioRatio默认50,(100-ioRatio)/ioRatio刚好等于1,做到平均分配 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
ioRadio是NioEventLoop的一个成员变量,用来控制分配花费在IO事件与非IO任务时间的比例。默认情况下,ioRadio是50,表示IO事件与非IO任务
将分配相同时间。而当ioRatio为100时,该值失效,不再平衡两种动作的时间分配比值。
了解了这一点,上述两种分支代码就不难理解了,我们直接进入processSelectedKeys,看看netty如何执行IO事件
先进入processSelectedKeys方法内部。
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
可以看到这里又根据selectedKeys是否为空这个条件来确定是处理优化过的keys还是普通keys。关于selectedKeys,在NioEventLoop介绍这一节中,
我们介绍了NioEventLoop的创建,在创建过程中,默认会将SelectedKeys由Hashset替换为数组实现,此处的selectedKeys正是替换过后的实现。
我们继续跟进到processSelectedKeysOptimized方法
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
方法内部用一个for循环处理selectedKeys。key的attchment默认是在注册时附加上去的NioServerSocketChannel和NioSocketChannel。
继续跟进processSelectedKey(k, (AbstractNioChannel) a)方法。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop = ch.eventLoop(); if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } }
netty首先对selectionKey的有效性做了一个判断。当key无效时,关闭key所在的channel。当key有效时,委托NioUnsafe对象对key进行IO操作。
注意这里先进行OP_CONNECT,再执行OP_WRITE,最后执行OP_READ和OP_ACCEPT。关于Unsafe的这些IO操作留待以后分析。
processSelectedKeysPlain方法流程类似,略过
由于IoRatio默认为50,我们先进入runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法。
protected boolean runAllTasks(long timeoutNanos) { // 步骤1 fetchFromScheduledTaskQueue(); // 步骤2 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } // 步骤3 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { // 步骤4 safeExecute(task); runTasks ++; // 步骤5 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } // 步骤6 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
非IO任务的执行可以分为6个步骤
我们一个一个步骤讲解
首先看一下整体流程
private boolean fetchFromScheduledTaskQueue() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return true; } if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false; } } }
首先先判断定时任务队列是否有任务,然后调用了一个AbstractScheduledEventExecutor.nanoTime(),该方法返回ScheduledFutureTask类从初始化
到当前时刻的差值。也即将ScheduledFutureTask初始化的时刻当成零时刻。
获取到零时刻到当前时刻的差值后,用一个for循环不断去定时任务队列里获取终止时刻在当前时刻之后的任务(scheduledTask.deadlineNanos() - nanoTime<=0)
当获取到定时任务后,将它添加到普通任务队列taskQueue里。同时添加失败后,还会再重新添加回定时任务队列,防止任务直接丢失。
说到定时任务队列,也少不了一探其实现。scheduledTaskQueue初始化代码如下:
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new DefaultPriorityQueue<>( SCHEDULED_FUTURE_TASK_COMPARATOR, 11); } return scheduledTaskQueue; }
采用的是一个懒加载的方式,在调用scheduledTaskQueue()创建定时任务时才进行初始化。从名字可以看出,它是一个优先级队列,初始化容量为11,
采用的Comparator是调用2个ScheduledFutureTask的compareTo方法,首先比较任务的终止时间,然后比较两个任务的id。代码较简单,就不列了。
然后我们看下调度方法schedule
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task.setId(nextTaskId++)); } else { executeScheduledRunnable(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task.setId(nextTaskId++)); } }, true, task.deadlineNanos()); } return task; }
可以发现,netty将"添加定时任务"也当做一个任务,放入任务队列里。
// NioEventLoop中定义的pollTask方法 protected Runnable pollTask() { Runnable task = super.pollTask(); if (needsToSelectAgain) { selectAgain(); } return task; } // super.pollTask调用了此方法,定义在SingleThreadEventExecutor中 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task != WAKEUP_TASK) { return task; } } }
这里依然是通过轮询从任务队列里取出任务,并且忽略WAKEUP_TASK这个标记性任务。
在当前时间上,加上IO事件执行的时间,作为非IO任务执行的超时时间
protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } }
捕获所有异常,使得定时任务报错时不退出
由于nanoTime()是一个相对耗时的操作,netty默认执行了64次非IO任务后,才计算是否超时。若执行了超过64个任务没或者任务队列已经没有任务,
就打断循环,并将当前时间更新为lastExecutionTime。
到了这里,我们已经介绍完了大部分NioEventLoop的内容,限于笔者水平和文章篇幅,nioEventLoop所使用的任务队列MpscQueue和ScheduleFutureTask
内部执行原理不再进一步深究。但这也已经足够对NioEventLoop塑造一个比较整体性的认识了。