在servlet3.1规范当中,已经支持在servlet中通过调用request.startAsync来开启1个异步调用,然后在相应的业务线程里面进行一些业务操作,再通过asyncContext.complete即完成业务的整个操作。一个参考的demo如下所示:
val context = req.startAsync(); //重新设置业务超时时间 context.setTimeout(40_000); Runnable runnable = () -> { try{ //执行你的业务操作 //输出数据 context.getResponse().getWriter().println(totalMoney); //完成业务 context.complete(); } catch(Exception e) { e.printStackTrace(); } }; new Thread(runnable).start();
在上面的参考中,原来的servlet在调用完 thread.start之后,相应的逻辑即完成。相应的容器线程则已经还给线程池,此线程即可以接收其它客户端的请求并处理了. 异步servlet的目的即在于将接收请求的io线程与实际的业务执行相分开,避免过慢的业务阻塞了整个容器,而不能再接收更多的请求了.甚至可以在后端的业务池中定义一个队列,将要进行执行的业务逻辑放入队列里面慢慢执行。
由于异步Servlet的目的在于将web容器的io线程与业务线程分离,那么关键的部分即在于当servlet方法执行完之后,当context.complete时,如何重新触发相应io流的操作。本文尝试以jetty为参考,从源码角度查看其实现的原理和机制。
本文参考的jetty版本为 9.4.11.v20180605
当业务在调用request.startAsync时,将会直接创建一个 AsyncContext 对象。按照标准API的定义,相应的context会将相应的request和response封闭在内部,以供业务端使用。在jetty中,整个模型结构参考如下所示。
在上面的对象当中,主要关注的对象即为HttpChannelState,其内部关注的状态对象为 State 和 Async,两者均是内部类,其相应的枚举对象参考如下:
/** * The state of the HttpChannel,used to control the overall lifecycle. */ public enum State { IDLE, // Idle request DISPATCHED, // Request dispatched to filter/servlet THROWN, // Exception thrown while DISPATCHED ASYNC_WAIT, // Suspended and waiting ASYNC_WOKEN, // Dispatch to handle from ASYNC_WAIT ASYNC_IO, // Dispatched for async IO ASYNC_ERROR, // Async error from ASYNC_WAIT COMPLETING, // Response is completable COMPLETED, // Response is completed UPGRADED // Request upgraded the connection } /** * The state of the servlet async API. */ private enum Async { NOT_ASYNC, STARTED, // AsyncContext.startAsync() has been called DISPATCH, // AsyncContext.dispatch() has been called COMPLETE, // AsyncContext.complete() has been called EXPIRING, // AsyncContext timeout just happened EXPIRED, // AsyncContext timeout has been processed ERRORING, // An error just happened ERRORED // The error has been processed }
State用于描述在标准的request/response响应过程中,当前的执行的步骤值和到了哪个步骤。比如已经分发到业务流程中,开启异步等,即此对象更关注于request对象类的步骤.
Async类则用于描述在异步处理过程中,特定的异步执行点。比如启动异步处理,完成异步等,此对象关注于asyncContext对象的处理过程.
整个处理流程由HttpChannel来完成,整个类的处理逻辑参考方法 handle(),其会根据State以及Async的不同状态执行不同的操作.
2.1 处理servlet service方法
当处理业务时,Sate的初始状态为Idle,因为其所对应的下一步操作Action即为 DISPATCH,此Action的对应操作即为执行整个业务处理逻辑,代码参考如下:
case DISPATCH: { if (!_request.hasMetaData()) throw new IllegalStateException("state=" + _state); _request.setHandled(false); _response.getHttpOutput().reopen(); try { _request.setDispatcherType(DispatcherType.REQUEST); if (!_request.isHandled()) getServer().handle(this); }
此段逻辑中最后一段 getServer().handle(this) 即为执行整个service方法,其会按照标准的 Filter->Servlet的执行流程处理业务逻辑,即其会最终处理到serlvet的doGet/Post方法.
2.2 业务下一步判定和处理
在业务处理完成之后,此循环会通过判断 channelState对象 的状态决定如何处理下一步操作,相应的方法为 HttpChannelState#unhandle.
这里重新回到request.startAsync的处理代码,在startAsync中,会创建一个事件对象 AsyncContextEvent,并通过 HttpChannelState来启动异步处理。在此逻辑中,相应的异步状态值Async会修改为 STARTED,相应代码如下参考所示:
public void startAsync(AsyncContextEvent event) { try(Locker.Lock lock= _locker.lock()) { _async=Async.STARTED; _event=event; lastAsyncListeners=_asyncListeners; _asyncListeners=null; }
此逻辑,即在启用异步时即会触发,那么当 Servlet#service方法执行完之后,执行 HttpChannelState#unhandle 方法时,相应的处理处理逻辑即会判断async的值,并进入不同的Action逻辑,如下参考所示:
protected Action unhandle() { boolean read_interested = false; try(Locker.Lock lock= _locker.lock()) { ...... _initial=false; switch(_async) { ... case STARTED: if (_asyncWritePossible) { _state=State.ASYNC_IO; _asyncWritePossible=false; return Action.WRITE_CALLBACK; } else { _state=State.ASYNC_WAIT; Scheduler scheduler=_channel.getScheduler(); if (scheduler!=null && _timeoutMs>0 && !_event.hasTimeoutTask()) _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS)); return Action.WAIT; }
如上所示,当判定 async为STARTED之后,并且当前并没有回写响应数据,那么相应的state即转换为 ASYNC_WAIT,并且下一步的Action为 WAIT
2.3 HttpChannel针对 WAIT Action的处理
其针对WAIT的处理很简单,就是直接退出循环,即表示此针对此请求的整个业务处理已经结束,在容器处理的层面已经完结。可以理解为该request/response的整个生命周期已经结束。
不过由于异步的业务处理,相应的io信息仍然是存在了,并且在后续的业务处理中仍然是可用的,相应的channel,socket信息并没有被关闭.
在上一步提到在servlet层面,相应的处理已经结束。当调用 AsyncContext#completed 时,其实际上会重新启动相应的channel通道处理,以完结相应的业务处理,即可以理解为其会重新启动相应的HttpChannel#handle 来执行相应的处理,只不过因为状态不一样,而是执行其它的Action.
源码层面,相应的AsyncContext#completed 会委托给 HttpChannelState#complete 来处理,在处理逻辑中,相应的 async会更换为 COMPLETE, 同时,再次执行 HttpChannel#handle方法,如下参考所示:
public void complete() { boolean handle=false; AsyncContextEvent event; try(Locker.Lock lock= _locker.lock()) { _async=Async.COMPLETE; _state=State.ASYNC_WOKEN; } runInContext(event,_channel); }
HttpChannel实现了runnable,因此重新执行handle方法,在此方法中将重新根据状态值判定Action, 由 HttpChannelState#handling 的判定推断出相应的 Action为Completed. 参考如下所示:
protected Action handling() { try(Locker.Lock lock= _locker.lock()) { switch(_state) { case ASYNC_WOKEN: switch(_async) { case COMPLETE: _state=State.COMPLETING; return Action.COMPLETE; ....
Action.COMPLETE 即可以理解为整个servlet处理周期的扫尾阶段,这里在设置相应的响应头,关闭输出,设置标记位等,即完成最终的请求处理.
在调用完Request#startAsync 之后,其实容器是并不能完全保证业务代码一定会调用 AsyncContext#completed 这个方法,为保证channel的socket能够被正常处理,避免无限度持有连接信息,因此应该有一个保底的处理。即当业务线程并没有在指定的时间处理完毕时,容器应该主动发起连接的中断处理,以及时断开连接,保证容器的稳定性。
在API定义上,AsyncContext可以通过调用 setTimeout(long) 来设置业务流程的超时时间,其默认值为30_000。即意味着,容器会在完成servlet#service方法之后的 timeout时间后触发相应的超时处理。
在源码上,此逻辑为当 HttpChannel#handle 执行完servlet#service方法之后的判定逻辑中处理,即 参考 2#1 部分所示,当 HttpChannelState判定当前 async为 ASYNC_WAIT,下一步操作为 Action#WAIT时,即会开启一个异步任务,执行到期超时处理。如下参考所示:
_state=State.ASYNC_WAIT; Scheduler scheduler=_channel.getScheduler(); if (scheduler!=null && _timeoutMs>0 && !_event.hasTimeoutTask()) _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS)); return Action.WAIT;
这里的timeout即为 AsyncContextState#setTimeout 设置的值,其最终委托给 HttpChannelState来完成。如果此值为业务代码设置,即最终以业务设置为准。此任务对象即 AsyncContextEvent 对象,其最终的超时处理会委托给 HttpChannelState#onTimeout 来处理,其最终逻辑参考如下:
protected void onTimeout() { final List<AsyncListener> listeners; AsyncContextEvent event; try(Locker.Lock lock= _locker.lock()) { if (_async!=Async.STARTED) return; _async=Async.EXPIRING; event=_event; listeners=_asyncListeners; } Throwable th=error.get(); if (th!=null) { if (LOG.isDebugEnabled()) LOG.debug("Error after async timeout {}",this,th); onError(th); } }
而最终的逻辑即往客户端输出相应的内部服务错误信息.
当然,如果业务正常的完成时,此timeoutTask也会正常的取消,其相应的逻辑散落在相应的 HttpChannelState 各个处理方法中,即会通过调用 cancelTimeout() 来取消相应的任务.
在jetty的整个处理逻辑中,即是围绕着 channel通道对象, 状态对象,以及相应的上下文对象,来完成相应的状态和逻辑的转换操作,最终完成业务的处理。只不过在中间处理过程中,由于异步的概念会影响到流程的处理,需要将流程挂起;在完成之后再重新启用。整个逻辑还是很清晰的,对于了解之前不处理异步servlet体系的容器到支持async概念的容器的实现和设计思路,还是很有参考价值的。同时,在考虑一些极端问题时,也可以做到面面俱到,避免其它问题的产生.