转载

Servlet3 异步原理与实践

*

hello 各位小伙伴,不知道大家端午节在忙啥呢?有没有出行计划呢?松哥最近在研究 Spring Security 源码,不由得感叹 Spring Security 真的上手容易,要理解真的不易。松哥后面专门写篇文章和大家聊这个问题。这两天在看 Spring Security 源码的时候,涉及到一点异步 Servlet 的问题,本来想着自己写篇文章和大家扫盲,后来在网上看到有一篇文章写的还不错,于是今天就来和大家分享一下,也算是给后面 Spring Security 的介绍做一个铺垫(其实也是我端午想偷个懒(手动狗头),原文作者王新栋,文章地址:jianshu.com/p/c23ca9d26f64。

对了,如果大家假期没有出行计划,也不妨看看这个: SpringBoot+SpringSecurity+Vue+微人事视频教程

一、什么是 Servlet

servlet 是基于 Java 的 Web 组件, 由容器进行管理, 来生成动态内容。像其他基于 Java 的组件技术一样, servlet 也是基于平台无关的 Java 类格式, 被编译为平台无关的字节码, 可以被基于 Java 技术的 Web 服务器动态加载并运行。容器(Container), 有时候也叫做 servlet 引擎, 是 Web 服务器为支持 servlet 功能扩展的部分。客户端通过 servlet 容器实现的 request/response paradigm(请求/应答模式) 与 Servlet 进行交互。

二、什么是Servlet规范

每当一个 Servlet 版本发布都会对应一个 Servlet 版本的规范, 比如 Servlet2.5、Servlet3.0、Servlet3.1.

规范中描述了Java Servlet API 的标准, 定义了 Java Servlet API 中类、接口、方法签名的完整规范且附带的 Javadoc 文档供开发人员查阅, 目的主要是为 Java Servlet 给出一个完整和清晰的解释。从下图可以看出 Servlet 规范版本和 tomcat 支持的版本的对应关系。比如 Servlet3 是从 tomcat7 以后开始支持的。

Servlet3 异步原理与实践

三、 同步, 异步, 阻塞,非阻塞

同步异步是数据通信的方式, 阻塞和非阻塞是一种状态。比如同步这种数据通讯方式里面可以有阻塞状态也可以有非阻塞状态。从另外一个角度理解同步和异步, 就是如果一个线程干完的事情都是同步, 有线程切换才能干完的事情就是异步。

四、Servlet3 的异步位置

这里说的位置是指, 从 tomcat 处理整个 request 请求流程中, 异步处于哪一步。我们先梳理出在 NIO 模式下(是否使用 NIO 跟异步没有直接关系, 这里是拿 NIO 模式下的 tomcat 流程做说明), 下面这个图是 tomcat 的总体结构, 里面用箭头标明了请求线路。

Servlet3 异步原理与实践

我们知道在 tomcat 的组件中 Connector 和 Engine 是最核心的两个组件, Servlet3 的异步处理就是发生在 Connector 中。Tomcat 的组件之间的协作关系, 后续会单独写一篇文章介绍。这里先有一个直观的认识。便与后续对异步理解。

五、Servlet3 的异步流程

Servlet3 异步原理与实践

接收到 request 请求之后, 由 tomcat 工作线程从 HttpServletRequest 中获得一个异步上下文 AsyncContext 对象, 然后由 tomcat 工作线程把 AsyncContext 对象传递给业务处理线程, 同时 tomcat 工作线程归还到工作线程池, 这一步就是异步开始。在业务处理线程中完成业务逻辑的处理, 生成 response 返回给客户端。在 Servlet3.0 中虽然处理请求可以实现异步, 但是 InputStream 和 OutputStream 的 IO 操作还是阻塞的, 当数据量大的 request body 或者  response body 的时候, 就会导致不必要的等待。从 Servlet3.1 以后增加了非阻塞 IO, 需要 tomcat8.x 支持。

六、Servlet3 的异步使用步骤

我们使用的大致步骤如下:

  1. 声明 Servlet, 增加 asyncSupported 属性, 开启异步支持。@WebServlet(urlPatterns = "/AsyncLongRunningServlet", asyncSupported = true)

  2. 通过 request 获取异步上下文 AsyncContext。AsyncContext asyncCtx = request.startAsync();

  3. 开启业务逻辑处理线程, 并将 AsyncContext 传递给业务线程。executor.execute(new AsyncRequestProcessor(asyncCtx, secs));

  4. 在异步业务逻辑处理线程中, 通过 asyncContext 获取 request 和 response, 处理对应的业务。

  5. 业务逻辑处理线程处理完成逻辑之后, 调用 AsyncContext 的 complete 方法。asyncContext.complete();从而结束该次异步线程处理。

七、Servlet3 的异步使用示例

7.1、AsyncLongRunningServlet.java 处理 Servlet 请求,并开启异步

package com.test.servlet3;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Created by wangxindong on 2017/10/19.
 */
@WebServlet(urlPatterns = "/AsyncLongRunningServlet", asyncSupported = true)
public class AsyncLongRunningServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    protected void doGet(HttpServletRequest request,
                         HttpServletResponse response) throws ServletException, IOException {
        long startTime = System.currentTimeMillis();
        System.out.println("AsyncLongRunningServlet Start::Name="
                + Thread.currentThread().getName() + "::ID="
                + Thread.currentThread().getId());

        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);

        String time = request.getParameter("time");
        int secs = Integer.valueOf(time);
        // max 10 seconds
        if (secs > 10000)
            secs = 10000;

        AsyncContext asyncCtx = request.startAsync();
        asyncCtx.addListener(new AppAsyncListener());
        asyncCtx.setTimeout(9000);//异步servlet的超时时间,异步Servlet有对应的超时时间,如果在指定的时间内没有执行完操作,response依然会走原来Servlet的结束逻辑,后续的异步操作执行完再写回的时候,可能会遇到异常。

        ThreadPoolExecutor executor = (ThreadPoolExecutor) request
                .getServletContext().getAttribute("executor");

        executor.execute(new AsyncRequestProcessor(asyncCtx, secs));
        long endTime = System.currentTimeMillis();
        System.out.println("AsyncLongRunningServlet End::Name="
                + Thread.currentThread().getName() + "::ID="
                + Thread.currentThread().getId() + "::Time Taken="
                + (endTime - startTime) + " ms.");
    }
}

7.2、AppAsyncListener.java 异步监听器

package com.test.servlet3;

import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebListener;
import java.io.IOException;
import java.io.PrintWriter;

/**
 * Created by wangxindong on 2017/10/19.
 */
@WebListener
public class AppAsyncListener implements AsyncListener {
    @Override
    public void onComplete(AsyncEvent asyncEvent) throws IOException {
        System.out.println("AppAsyncListener onComplete");
        // we can do resource cleanup activity here
    }

    @Override
    public void onError(AsyncEvent asyncEvent) throws IOException {
        System.out.println("AppAsyncListener onError");
        //we can return error response to client
    }

    @Override
    public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
        System.out.println("AppAsyncListener onStartAsync");
        //we can log the event here
    }

    @Override
    public void onTimeout(AsyncEvent asyncEvent) throws IOException {
        System.out.println("AppAsyncListener onTimeout");
        //we can send appropriate response to client
        ServletResponse response = asyncEvent.getAsyncContext().getResponse();
        PrintWriter out = response.getWriter();
        out.write("TimeOut Error in Processing");
    }
}

7.3、AppContextListener.java Servlet 上下文监听器,可以在里面初始化业务线程池

package com.test.servlet3;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by wangxindong on 2017/10/19.
 * 在监听中初始化线程池
 */
@WebListener
public class AppContextListener implements ServletContextListener {
    public void contextInitialized(ServletContextEvent servletContextEvent) {

        // create the thread pool
        ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200, 50000L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100));
        servletContextEvent.getServletContext().setAttribute("executor",
                executor);

    }

    public void contextDestroyed(ServletContextEvent servletContextEvent) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) servletContextEvent
                .getServletContext().getAttribute("executor");
        executor.shutdown();
    }
}

7.4、AsyncRequestProcessor.java 业务工作线程

package com.test.servlet3;

import javax.servlet.AsyncContext;
import java.io.IOException;
import java.io.PrintWriter;

/**
 * Created by wangxindong on 2017/10/19.
 * 业务工作线程
 */
public class AsyncRequestProcessor implements Runnable {
    private AsyncContext asyncContext;
    private int secs;

    public AsyncRequestProcessor() {
    }

    public AsyncRequestProcessor(AsyncContext asyncCtx, int secs) {
        this.asyncContext = asyncCtx;
        this.secs = secs;
    }

    @Override
    public void run() {
        System.out.println("Async Supported? "
                + asyncContext.getRequest().isAsyncSupported());
        longProcessing(secs);
        try {
            PrintWriter out = asyncContext.getResponse().getWriter();
            out.write("Processing done for " + secs + " milliseconds!!");
        } catch (IOException e) {
            e.printStackTrace();
        }
        //complete the processing
        asyncContext.complete();
    }

    private void longProcessing(int secs) {
        // wait for given time before finishing
        try {
            Thread.sleep(secs);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

八、Tomcat NIO Connector ,Servlet 3.0 Async,Spring MVC Async 的关系

对于这几个概念往往会混淆, 这里做一个梳理比较, nio 是一种 IO 的模型, 对比与传统的 BIO, 它可以利用较少的线程处理更多的连接从而增加机器的吞吐量, Tomcat NIO Connector 是 Tomcat 的一种 NIO 连接模式。异步, 前面提到他是一种通讯的方式, 它跟 NIO 没有任务关系, 及时没有 NIO 也可以实现异步, Servlet 3.0 Async 是指 Servlet 3 规范以后支持了异步处理 Servlet 请求, 我们可以把请求线程和业务线程分开。Spring MVC Async 是在 Servlet3 异步的基础上做了一层封装。具体的区别如下:

8.1、Tomcat NIO Connector

Tomcat 的 Connector 有三种模式, BIO,NIO,APR,Tomcat NIO Connector 是其中的 NIO 模式, 使得 tomcat 容器可以用较少的线程处理大量的连接请求, 不再是传统的一请求一线程模式。Tomcat 的 server.xml 配置 protocol="org.apache.coyote.http11.Http11NioProtocol", Http11NioProtocol 从 tomcat 6.x 开始支持。NIO 的细节可以参看 NIO 相关技术文章。

8.2、Servlet 3.0 Async

是说 Servlet 3.0 支持了业务请求的异步处理, Servlet3 之前一个请求的处理流程, 请求解析、READ BODY,RESPONSE BODY,以及其中的业务逻辑处理都由 Tomcat 线程池中的一个线程进行处理的。那么 3.0 以后我们可以让请求线程(IO 线程)和业务处理线程分开, 进而对业务进行线程池隔离。我们还可以根据业务重要性进行业务分级, 然后再把线程池分级。还可以根据这些分级做其它操作比如监控和降级处理。servlet 3.0 从 tomcat 7.x 开始支持。

8.3、Spring MVC Async

是 Spring MVC 3.2 以上版本基于 Servlet 3 的基础做的封装, 原理及实现方式同上, 使用方式如下:

@Controller
@RequestMapping("/async/TestController")
public class TestController {
    @ResponseBody
    @RequestMapping("/{testUrl}")
    public DeferredResult<ResponseEntity<String>> testProcess(@PathVariable String testUrl) {
        final DeferredResult<ResponseEntity<String>> deferredResult = new DeferredResult<ResponseEntity<String>>();

        // 业务逻辑异步处理,将处理结果 set 到 DeferredResult
        new Thread(new AsyncTask(deferredResult)).start();

        return deferredResult;
    }

    private static class AsyncTask implements Runnable {

        private DeferredResult result;

        private AsyncTask(DeferredResult result) {
            this.result = result;
        }

        @Override
        public void run() {
            //业务逻辑START
            //...
            //业务逻辑END
            result.setResult(result);
        }
    }
}

九、Servlet3 非阻塞 IO

Servlet3.1 以后增加了非阻塞 IO 实现, 需要 Tomcat8.x 以上支持。根据 Servlet3.1 规范中的描述”非阻塞 IO 仅对在 Servlet 中的异步处理请求有效, 否则, 当调用 ServletInputStream.setReadListener 或ServletOutputStream.setWriteListener 方法时将抛出 IllegalStateException“。可以说 Servlet3 的非阻塞 IO 是对 Servlet3 异步的增强。Servlet3 的非阻塞是利用 java.util.EventListener 的事件驱动机制来实现的。

9.1、AsyncLongRunningServlet.java 接收请求,获取读取请求监听器 ReadListener

package com.test.servlet3Noblock;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

/**
 * Created by wangxindong on 2017/10/23.
 */
@WebServlet(urlPatterns = "/AsyncLongRunningServlet2", asyncSupported = true)
public class AsyncLongRunningServlet extends HttpServlet {
    protected void doGet(HttpServletRequest request,
                         HttpServletResponse response) throws ServletException, IOException {
        request.setCharacterEncoding("UTF-8");
        response.setContentType("text/html;charset=UTF-8");

        AsyncContext actx = request.startAsync();//通过request获得AsyncContent对象

        actx.setTimeout(30*3000);//设置异步调用超时时长

        ServletInputStream in = request.getInputStream();
        //异步读取(实现了非阻塞式读取)
        in.setReadListener(new MyReadListener(in,actx));
        //直接输出到页面的内容(不等异步完成就直接给页面)
        PrintWriter out = response.getWriter();
        out.println("<h1>直接返回页面,](https://mp.weixin.qq.com/s/8Z8vYBkDtwYIeKgQ_d6WHA)[不等异步处理结果了</h1>");
        out.flush();
    }

}

9.2、MyReadListener.java 异步处理

package com.test.servlet3Noblock;

import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import java.io.IOException;
import java.io.PrintWriter;

/**
 * Created by wangxindong on 2017/10/23.
 */
public class MyReadListener implements ReadListener {
    private ServletInputStream inputStream;
    private AsyncContext asyncContext;
    public MyReadListener(ServletInputStream input,AsyncContext context){
        this.inputStream = input;
        this.asyncContext = context;
    }
    //数据可用时触发执行
    @Override
    public void onDataAvailable() throws IOException {
        System.out.println("数据可用时触发执行");
    }

    //数据读完时触发调用
    @Override
    public void onAllDataRead() throws IOException {
        try {
            Thread.sleep(3000);//暂停5秒
            PrintWriter out = asyncContext.getResponse().getWriter();
            out.write("数据读完了");
            out.flush();
            System.out.println("数据读完了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //数据出错触发调用
    @Override
    public void onError(Throwable t){
        System.out.println("数据 出错");
        t.printStackTrace();
    }
}

Servlet3.1 的非阻塞 IO 从下面图中可以看出是面对 InputStream 和 OutPutStream 流的, 这里的非阻塞 IO 跟我们常说的 JDK NIO 不是一个概念, Servlet3.1 的非阻塞是同 jdk 的事件驱动机制来实现。

public interface ReadListener extends java.util.EventListener
Servlet3 异步原理与实践

十、总结

通讯模型中的 NIO 可以利用很少的线程处理大量的连接, 提高了机器的吞吐量。Servlet 的异步处理机制使得我们可以将请求异步到独立的业务线程去执行, 使得我们能够将请求线程和业务线程分离。通讯模型的 NIO 跟 Servlet3 的异步没有直接关系。但是我们将两种技术同时使用就更增加了以 tomcat 为容器的系统的处理能力。自从 Servlet3.1 以后增加了非阻塞的 IO, 这里的非阻塞 IO 是面向 inputstream 和 outputstream 流, 通过 jdk 的事件驱动模型来实现, 更一步增强了 Servlet 异步的高性能, 可以认为是一种增强版的异步机制。

今日干货

Servlet3 异步原理与实践

刚刚发表

查看: 13500 回复: 135

公众号后台回复 2TB,免费获取 2TB Java 学习资料。

原文  http://mp.weixin.qq.com/s?__biz=MzI1NDY0MTkzNQ==&mid=2247489147&idx=1&sn=181a9e43818d50518991df6be33c01ff
正文到此结束
Loading...