作为纵横情场多年的老手,宪程在把到妹子后通常有以下策略 (假设宪程是 影流之主
的第1024代传人并且只剩下了分身的能力)
将妹子存到队列中,不时发微信去撩一下,如果有意向的话宪程会使用分身能力再创建一个 宪程
去把妹
宪程自己执行把妹的操作,如果期间又有新的妹子看上他咋办呢,那就将该妹子交给自己的分身 宪程
去轮询处理,并且宪程在把完妹子之后会尝试去把分身 宪程
的轮询任务给接过来,毕竟本体总是要掌握主动权的,如果没有接过来咋办?只能选择成为分身了,毕竟此时分身 宪程
已经接过了本体的工作,某种意义上他已经成为了 本体
。
建议在阅读之前先了解以下Tomcat的NIO模型,没有对比就没有伤害,你会发现Jetty NIO模型的有趣之处
如果时间充足的话,我建议你直接阅读附录,了解如何Debug Jetty NIO功能
既然要了解Jetty的NIO模型,从线程的角度来说可以分为以下几类
空闲线程
此角色会根据提交到线程池中的任务,将自己转变为I/O线程或者轮询线程
Acceptor线程
该角色主要负责接收来自客户端的连接并对其进行封装之后,选择一个Selector来提交此任务
轮询线程
此角色主要负责轮询事件,并处理其他角色提交给此角色的任务,另外此角色可以根据所设定的策略将轮询任务交给其他线程,在执行完I/O任务之后归还到线程池中成为 空闲线程
主要参与的类有
Connector
该角色主要负责JettyNIO模型中各个组件的启动和,协调工作
SelectorManager
此角色主要对 ManagedSelector
进行管理,想要和Selector进行交互可以使用此类
ManagedSelector
封装了JDK原生的 selector
, 并对外提供对 selector
执行操作的内部类、接口以及方法
重点所有线程共用一个线程池
关键类 org.eclipse.jetty.server.ServerConnector
Connector即连接器,是Jetty对于网络I/O模型的一个抽象,主要负责组装,启动Jetty NIO模型中所需要用到的组件。因此,我们主要注意力集中到其实现上也就是 ServerConnector
上。
初始化Connector连接器,我们需要向其提供以下关键参数(隐去了和本文无关的参数,有兴趣的可自行了解)
回收
以及 提供
ByteBuffer给I/O线程使用 accept
操作线程的数量 selector
线程数量 但是,大部分的初始化工作并不是在 ServerConnector
中执行的,而是在其父类中执行的操作,因此我们将目光转移到 org.eclipse.jetty.server.AbstractConnector
该类的初始化代码如下,其主要做了以下工作
max(1,min(4,CPU核心数÷8)
想象一下,如果ServerSocketChannel被设置为阻塞状态以便多个线程同时执行accept操作,那么 大多数情况下多数线程将会陷入阻塞状态 ,并且线程从阻塞态恢复是有线程上下文切换的成本的因此Acceptor线程并不是越多越好
public AbstractConnector( Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories) { _server = server; //检查是否设置线程池,如果没有则使用Server的 _executor = executor != null ? executor : _server.getThreadPool(); if (scheduler == null) scheduler = _server.getBean(Scheduler.class); _scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false); // 检查是否指定ByteBufferPool,如果没有则自己创建一个 if (pool == null) pool = _server.getBean(ByteBufferPool.class); _byteBufferPool = pool != null ? pool : new ArrayByteBufferPool(); // 将这些对象交给Jetty统一管理(不在本文讨论范围内,不展开) addBean(_server, false); addBean(_executor); if (executor == null) unmanage(_executor); // inherited from server addBean(_scheduler); addBean(_byteBufferPool); // ConnectionFactory主要使用来处理对应的HTTP协议 for (ConnectionFactory factory : factories) { addConnectionFactory(factory); } // 如果未指定Acceptor的数量则根据CPU核数执行计算 int cores = ProcessorUtils.availableProcessors(); if (acceptors < 0) //根据此式可以推出Acceptor数量最大是4最小是1 acceptors = Math.max(1, Math.min(4, cores / 8)); // Acceptor数量大于CPU核心数 // 将会引起大量的线程陷入阻塞状态 // 没有东西可以accept不就阻塞了吗 // 而要激活阻塞的线程则需要切换线程上下文会引起性能的浪费 if (acceptors > cores) LOG.warn("Acceptors should be <= availableProcessors: " + this); _acceptors = new Thread[acceptors]; } 复制代码
如下图所示我的电脑为 4核心
的i5CPU,那么默认的Acceptor线程应该只有一个
在启动你的Jetty之后我们可以用JConsole来验证一下
正如你所看到的,以qtp开头的线程用于NIO的线程池,其中一个Acceptor线程阻塞在accept()方法上
Acceptor是一个定义在 AbstractConnector
中的内部类, 其主要工作不断调用在子类中实现accept方法,也就是接收连接的实现延迟到了子类中。
其代如下,可以学到不少小技巧, 如果你不想看代码,其总结如下
public void run() { // 给线程起给名字 final Thread thread = Thread.currentThread(); String name = thread.getName(); _name = String.format("%s-acceptor-%d@%x-%s", name, _id, hashCode(), AbstractConnector.this.toString()); thread.setName(_name); // 设置优先级 int priority = thread.getPriority(); if (_acceptorPriorityDelta != 0) thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _acceptorPriorityDelta))); // 保存对此线程的引用 _acceptors[_id] = thread; try { while (isRunning()) { // 加锁,等待来自其他线程的信号说可以开始干活了 try (Locker.Lock lock = _locker.lock()) { if (!_accepting && isRunning()) { _setAccepting.await(); continue; } } catch (InterruptedException e) { continue; } try { //调用子类的accept方法 accept(_id); } catch (Throwable x) { if (!handleAcceptFailure(x)) break; } } } finally { // 发生异常了,则调回原理的名称以及优先级 thread.setName(name); if (_acceptorPriorityDelta != 0) thread.setPriority(priority); //释放引用 synchronized (AbstractConnector.this) { _acceptors[_id] = null; } CountDownLatch stopping = _stopping; if (stopping != null) stopping.countDown(); } } 复制代码
在子类 ServerConnector
中, accept
主要执行以下操作
阻塞
的形式接收来自客户端的连接 SocketChannel
为 非阻塞模式
,并 禁用nagle算法
SocketChannel
封装成一个 Accept
事件,交给 轮询线程
处理 ServerConnector
中的代码 @Override public void accept(int acceptorID) throws IOException { ServerSocketChannel serverChannel = _acceptChannel; if (serverChannel != null && serverChannel.isOpen()) { SocketChannel channel = serverChannel.accept(); accepted(channel); } } private void accepted(SocketChannel channel) throws IOException { channel.configureBlocking(false); Socket socket = channel.socket(); configure(socket); // socket.setTcpNoDelay(true); _manager.accept(channel); } 复制代码
SelectorManager中最终被调用的代码
public void accept(SelectableChannel channel, Object attachment) { final ManagedSelector selector = chooseSelector(); selector.submit(selector.new Accept(channel, attachment)); } 复制代码
轮询线程
主要负责轮询I/O事件以及处理其他线程提交到本线程任务。并且我们可以为 轮询线程
指定执行策略, 在后面我们可以看到执行策略将如何影响 轮询线程
行为。
首先,我们需要先明确哪些类会参与到轮询线程的工作中,也就是说我们要先理清楚轮询线程的调用链。
如上图堆栈跟踪图红框所标注的部分所示,参与到轮询线程主要堆栈结构如下图所示。
ManagedSelector
此类主要封装了JDK的 selector
类,并对外暴露操作此Selector的方法和类 EatWhatYouKill
此类即轮询线程执行策略,该类会不断调用SelectorProducer.produce 方法产生封装好的I/O任务,并根据其策略来决定执行这个I/O任务的方式 SelectorProducer
此类为 ManagedSelector
的内部类,实现线程执行策略里面的 ExecutionStrategy.Producer
接口,该类专门用于生成供轮询线程处理的I/O任务 Jetty将JDK原生的 Selector
类封装成为 ManagedSelector
,该类主要功能是对外暴露对其封装的 selector
执行操作的接口和内部类. 其关键方法和内部类如下
SelectorUpdate接口如果要对 ManagedSelector
所管理的 selector
进行更新(如执行注册感兴趣的I/O事件)可以实现此接口,该接口定义如下
public interface SelectorUpdate { void update(Selector selector); } 复制代码
submit方法该方法主要用于外界将 SelectorUpdate
提交到轮询线程中以便执行对 Selector
的更新操作,简单来说此方法会执行以下操作
public void submit(SelectorUpdate update) { if (LOG.isDebugEnabled()) LOG.debug("Queued change {} on {}", update, this); Selector selector = null; synchronized (ManagedSelector.this) { //加事件加入处理队列 _updates.offer(update); //检查是否正在轮询,如果正在轮询,则会执行唤醒操作 //因此在此处需要将selecting置为false if (_selecting) { selector = _selector; // To avoid the extra select wakeup. _selecting = false; } } if (selector != null) { //执行唤醒操作,以便对selector执行更新操作 if (LOG.isDebugEnabled()) LOG.debug("Wakeup on submit {}", this); selector.wakeup(); } } 复制代码
SelectorProducer
是 ManagedSelector
的内部类,该类实现了轮询线程执行策略的 ExecutionStrategy.Producer
接口
interface Producer { // 返回一个Runnable任务供轮询线程执行 Runnable produce(); } 复制代码
因此 SelectorProducer
需要不断调用 selector
去轮询看有无新的I/O事件以供处理,除此之外它还需要处理外部类向 ManagedSelector
通过调用 submit
方法提交的 SelectorUpdate
任务
其向线程执行策略类所提供 produce
方法代如下所示,总的来说主要完成以下几项工作
processUpdates
@Override public Runnable produce() { while (true) { //处理之前查询到事件 Runnable task = processSelected(); if (task != null) return task; //处理外部类所提交的update任务 //该方法最终会导致提交的SelectorUpdate.update被调用 processUpdates(); //此方法的调用可能会 //导致客户端SocketChannel感兴趣的事件发生变更 updateKeys(); //执行select操作,并将查询到事件保存起来 if (!select()) return null; } } 复制代码
processUpdates此方法主要是处理外部类提交的 SelectorUpdate
任务,通过复制引用非常巧妙的避免了并发问题
private void processUpdates() { synchronized (ManagedSelector.this) { //倒腾数据,将要处理队列的引用保存 //到另一个变量上,原有的引用可以继续对外提供服务 //整个数据倒腾过程非常短,性能影响较小 Deque<SelectorUpdate> updates = _updates; _updates = _updateable; _updateable = updates; } if (LOG.isDebugEnabled()) LOG.debug("updateable {}", _updateable.size()); //遍历事件队列,处理update方法 for (SelectorUpdate update : _updateable) { if (_selector == null) break; try { if (LOG.isDebugEnabled()) LOG.debug("update {}", update); //调用事件的update方法,并传入selector update.update(_selector); } catch (Throwable ex) { LOG.warn(ex); } } _updateable.clear(); Selector selector; int updates; //再次检查是否有新的事件被提交,如果有则执行唤醒操作 synchronized (ManagedSelector.this) { //外部类提交的任务会保存到updates中 updates = _updates.size(); _selecting = updates == 0; selector = _selecting ? null : _selector; } if (LOG.isDebugEnabled()) LOG.debug("updates {}", updates); if (selector != null) { if (LOG.isDebugEnabled()) LOG.debug("wakeup on updates {}", this); selector.wakeup(); } } 复制代码
select()该方法主要执行轮询操作,并将轮询到事件保存起来以供下一次循环的时候返回,在这个方法中展现jetty如何处理 空轮询
事件( 空轮询
是指selector在执行select操作时,没有查询到任何事件却返回了,这个BUG通常会造成CPU 100%
的使用率,从而使系统崩溃)
private boolean select() { try { Selector selector = _selector; if (selector != null && selector.isOpen()) { if (LOG.isDebugEnabled()) LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size()); int selected = selector.select(); //没查询到事件, 空轮询事件处理 if (selected == 0) { if (LOG.isDebugEnabled()) LOG.debug("Selector {} woken with none selected", selector); //如果线程被中断,并且标志位被设置了不在运行则执行推出逻辑 if (Thread.interrupted() && !isRunning()) throw new ClosedSelectorException(); //开启了此参数则立即执行一次select操作 if (FORCE_SELECT_NOW) selected = selector.selectNow(); } if (LOG.isDebugEnabled()) LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size()); int updates; synchronized (ManagedSelector.this) { // 完成了select操作则设置标志位 _selecting = false; updates = _updates.size(); } _keys = selector.selectedKeys(); _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator(); if (LOG.isDebugEnabled()) LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates); return true; } } catch (Throwable x) { _selector = null; if (isRunning()) LOG.warn(x); else { LOG.warn(x.toString()); LOG.debug(x); } closeNoExceptions(_selector); } return false; } 复制代码
与Netty的空轮询处理策略不同,Jetty的处理策略是再select一次并立即返回,但这样似乎并不能解决空轮询的BUG 问题详情
EatWhatYouKill
是线程执行策略的一种,也是Jetty默认的指策略,其思想来源于 如果猎人杀死一只猎物,那么猎人就应该吃掉它
(如果你吃过新鲜的虾你就会对这种 哲学
深有体会),换种说法就是 轮询线程如果查询到一次I/O事件就应该直接处理它
(想起引子了吗)
P.S. 关键代码 org.eclipse.jetty.util.thread.strategy.EatWhatYouKill
之所以这样做的原因是因为切换线程是一件比较费时操作(相对来说),因此在这种策略下轮询线程A如果获取到一个事件会有以下策略
标志为非阻塞任务
,那么线程A会 立即执行
此任务 如果任务 阻塞类型未知 或者被 标记为阻塞状态
如果线程池中的线程都处于 繁忙
状态,则将其提交到线程池种等待执行
如果线程池种有空闲线程B,则尝试将线程A负责 轮询功
能交给线程B,如果 立即获取到线程B
成功,则线程A会直接执行获取到的任务, 任务执行完成后,线程A会尝试 夺回
交给线程B的轮询任务,如果夺回失败则变为空闲线程等待分配任务。(想起引子了吗?)
除此之外,线程A还会尝试直接执行任务并且不会交出轮询工作 (代码太长,只摘出关键代码)
case BLOCKING: synchronized (this) { if (_pending) { //轮询工作陷入了停滞,因此是IDLE状态 _state = State.IDLE; mode = Mode.EXECUTE_PRODUCE_CONSUME; } //tryExecute 如果立即分配到了线程则返回true //this的run方法也就是实现轮询线程核心的方法 //因此此行代码相当于将轮询的工作转移给了其他线程 else if (_tryExecutor.tryExecute(this)) { _pending = true; //由于轮询工作的转移 //因此当前轮询工作相当于陷入空闲状态 //所以需要将此对象的状态至为IDLE //(轮询线程和当前线程使用同一个对象) _state = State.IDLE; mode = Mode.EXECUTE_PRODUCE_CONSUME; }else { //前两者均不满足则将任务提交到线程池 mode = Mode.PRODUCE_EXECUTE_CONSUME; } } break; 复制代码
case EXECUTE_PRODUCE_CONSUME: _epcMode.increment(); //直接在当前线程调用 runTask(task); // 尝试夺回轮询任务 synchronized (this) { // 如果State还处于空闲状态 // 说明 // 线程B还未开始执行轮询任务,可以直接夺回 // 如果线程B已经开始轮询 // 则选择离开 if (_state == State.IDLE) { // 返回true则继续轮询 return true; } } //返回false则结束轮询任务,变为空闲线程 return false; 复制代码
相较于循规蹈矩的Tomcat,Jetty的设计更为激进,更富有冒险主义者的精神,从个人角度来说更喜欢Jetty的设计,但从业务的角度来说还是选择Tomcat较为稳妥毕竟稳定是业务的基本需求,并且Tomcat的性能也不会太差。
以线程的类别来进行划分的话, Jetty的NIO模型如下图所示
Acceptor
线程负责接收来自客户端的新连接,并将其封装成一个事件提交给轮询线程处理 轮询线程
轮询线程处理负责轮询I/O事件之外,还需要处理外部线程所提交的 selector
更新任务,并且根据设定的执行策略,轮询线程可能会在本线程直接执行I/O任务,并将轮询任务移交给其他空闲的线程,或者选择一个空闲的线程来执行I/O操作 I/O线程
主要负责处理I/O操作 从线程类别的角度来看Jetty的NIO模型相对简单,但其引入的轮询线程执行策略使得线程之间身份可以发生转变, 得益于此Jetty可以直接轮询线程直接执行I/O任务减少了线程上下文切换所带来的性能消耗,提升了性能。
切换线程是有成本的Jetty通过直接在轮询线程执行I/O任务来提升性能,来减少线程上下文的切换,除此之外,我们还可以实现协程的机制来减少线程上下文切换所带来的成本(参考Go语言)
Acceptor线程应适量如果将ServerSocket设置为阻塞模式,那么accept操作将导致线程陷入阻塞,从accept方法返回时将引起线程上下的切换,因此并不是越多越好
我们使用SpringBoot来Debug Jetty,因此我们需要在 pom.xml
中引入Jetty,由于SpringBoot默认使用Tomcat因此我们需要将其替换掉,依赖如下所示.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jetty</artifactId> </dependency> 复制代码
使用的SpringBoot版本是2.2.0其所依赖的Jetty版本号是9.4.20
如果你要了解Connector是如何工作的请关注以下类 org.eclipse.jetty.server.ServerConnector
如果你想要了解Jetty NIO 如何轮询以及处理事件,那么请关注以下类 org.eclipse.jetty.io.ManagedSelector
并在其内部类 SelectorProducer
的 produce
方法打上断点,如下图所示,你将了解到整个轮询过程中都发生了什么
右键小红点,选择Thread,以避免进入不了断点的情况,毕竟我们调试的是多线程程序
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill