转载

曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器

前言

问:标题说的什么意思?

答:简单说,一个spring boot应用(我这里,版本升到2.1.7.Release了,没什么问题),默认使用了tomcat作为底层容器来接收和处理连接。 我这里,在依赖中排除了tomcat,使用Netty作为了替代品。优势在于,启动飞快,线程数量完全可控(多少个netty的boss、worker线程,多少个业务线程),如果能优化得好,效率会很高(我这个还有很多优化空间,见文末总结)

流程图如下(中间的三个handler是自定义的):

曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器

这个东西,年初我就弄出来了,然后用在了某个我负责的微服务里,之前一直想写,但是一直没把demo代码从微服务里抽出来,然后就一直拖着。前一阵吧,把代码抽出来了,然后又觉得要优化下,不然有些低级问题怎么办?

前一阵抽了代码出来,然后想着优化下,结果忙起来搞忘了,而且优化无底洞啊,所以先不优化了,略微补了些注释,就发上来了,希望大家看到后,多多批评指正。

先附上代码地址: https://gitee.com/ckl111/Netty_Spring_MVC_Sample/

启动后,访问: http://localhost:8081/test.do 即可。

实现大体思路

  1. 排除掉 tomcat 依赖
  2. 解决掉报错,保证 spring mvc 的上下文正常启动
  3. 启动 netty 容器,最后一个 handler 负责将 servlet request 交给 dispatcherServlet 处理

具体实现

解决dispatcherServlet不能正常工作的问题

问题1:缺少 servletContext 报错

曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器

经过追踪发现,这个 servletContext 来源于: org.springframework.web.context.support.GenericWebApplicationContext 中的 servletContext 字段

解决办法:

META-INF/spring.factories 中,定义了一个listener,来参与spring boot启动时的生命周期:

org.springframework.boot.SpringApplicationRunListener=com.ceiec.router.config.MyListener

在我的自定义listener中,实现 org.springframework.boot.SpringApplicationRunListener ,然后重写如下方法:

package com.ceiec.router.config;

import com.ceiec.router.config.servletconfig.MyServletContext;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringApplicationRunListener;
import org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.PropertySource;

import javax.servlet.ServletContext;
import java.util.Map;

@Data
@Slf4j
public class MyListener implements SpringApplicationRunListener {


    public MyListener(SpringApplication application, String[] args) {
        super();
    }
    ...
    
    @Override
    public void contextPrepared(ConfigurableApplicationContext context) {
        // 这里手动new一个servletContext,然后设置给spring上下文
        ServletContext servletContext = new MyServletContext();
        ServletWebServerApplicationContext applicationContext = (ServletWebServerApplicationContext) context;
        applicationContext.setServletContext(servletContext);
    }
  
  ...

}

自定义实现了 com.ceiec.router.config.servletconfig.MyServletContext ,这个很简单,继承spring test包中的 org.springframework.mock.web.MockServletContext 即可。

package com.ceiec.router.config.servletconfig;

import org.springframework.mock.web.MockServletContext;

import javax.servlet.Filter;
import javax.servlet.FilterRegistration;
import javax.servlet.Servlet;
import javax.servlet.ServletRegistration;

public class MyServletContext extends MockServletContext{

    @Override
    public ServletRegistration.Dynamic addServlet(String servletName, Servlet servlet) {
        return null;
    }

    @Override
    public FilterRegistration.Dynamic addFilter(String filterName, Filter filter){
        return null;
    }
}

问题2:

暂时没有。之前的版本本来有一个问题,升到spring boot 2.1.7后,好像不需要了,先不管。

问题3:

怎么保证少了tomcat后, dispatcherServlet 还能用?准确地说, dispatcherServlet 这个东西和tomcat是两回事,以前写struts 2的时候,也没 dispatcherServlet 这个类,不是吗?

所以,在spring boot启动时,并不强依赖底层容器, dispatcherServlet 这个bean会自动装配,装配代码在

org.springframework.boot.autoconfigure.web.servlet.DispatcherServletAutoConfiguration.DispatcherServletConfiguration

@Configuration
    @Conditional(DefaultDispatcherServletCondition.class)
    @ConditionalOnClass(ServletRegistration.class)
    @EnableConfigurationProperties({ HttpProperties.class, WebMvcProperties.class })
    protected static class DispatcherServletConfiguration {

        private final HttpProperties httpProperties;

        private final WebMvcProperties webMvcProperties;

        //这里自动装配DispatcherServlet
        @Bean(name = DEFAULT_DISPATCHER_SERVLET_BEAN_NAME)
        public DispatcherServlet dispatcherServlet() {
            DispatcherServlet dispatcherServlet = new DispatcherServlet();
            dispatcherServlet.setDispatchOptionsRequest(
                                this.webMvcProperties.isDispatchOptionsRequest());
            dispatcherServlet.setDispatchTraceRequest(
              this.webMvcProperties.isDispatchTraceRequest());
            return dispatcherServlet;
        }

问题4:

自动装配 DispatcherServlet 后,处理请求时报错:

曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器

解决方式是,启动完成后,给 dispatcherServlet 设置这个field的值,同时,初始化我们的servlet(这里提一句,还记得 servlet 的生命周期吗,就是那个东西):

import org.springframework.mock.web.MockServletConfig;
/**
 * 从spring上下文获取 DispatcherServlet,设置其字段config为mockServletConfig
 */
DispatcherServlet dispatcherServlet = applicationContext.getBean(DispatcherServlet.class);
MockServletConfig myServletConfig = new MockServletConfig();
MyReflectionUtils.setFieldValue(dispatcherServlet,"config",myServletConfig);

/**
 * 初始化servlet
 */
try {
  dispatcherServlet.init();
} catch (ServletException e) {
  log.error("e:{}",e);
}

netty处理过程

大致流程

这里,我们再将总共流程图贴一下:

曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器

中间的三个handler,是我们自定义的。每个handler具体做的事情,写得比较清楚了。具体看下面的 com.ceiec.router.netty.DispatcherServletChannelInitializer :

public class DispatcherServletChannelInitializer extends ChannelInitializer<SocketChannel> {

    //可以使用单独的线程池,来处理业务请求
    private static DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(4,new NamedThreadFactory("business_servlet"));

    @Override
    public void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();

        // 对通信数据进行编解码
        pipeline.addLast(new HttpServerCodec());

        // 把多个HTTP请求中的数据组装成一个
        pipeline.addLast(new HttpObjectAggregator(65536));

        // 用于处理大的数据流
        pipeline.addLast(new ChunkedWriteHandler());

        /**
         * 生成servlet使用的request
         */
        pipeline.addLast("GenerateServletRequestHandler", new GenerateServletRequestHandler());

        /**
         * 过滤器处理器,模拟servlet中的 filter 链
         */
        FilterNettyHandler filterNettyHandler = SpringContextUtils.getApplicationContext().getBean(FilterNettyHandler.class);
        pipeline.addLast("FilterNettyHandler", filterNettyHandler);

        /**
         * 真正的业务handler,转交给:spring mvc的dispatcherServlet 处理
         */
        DispatcherServletHandler dispatcherServletHandler = SpringContextUtils.getApplicationContext().getBean(DispatcherServletHandler.class);
        //pipeline.addLast("dispatcherServletHandler", dispatcherServletHandler);
        // 使用下面的重载方法,第一个参数为线程池,则这里会异步执行我们的业务逻辑,正常也应该这样,避免长时间阻塞io线程
        pipeline.addLast(eventExecutors,"handler", new ServletNettyHandler(dispatcherServlet));
    }


}

原始netty的http请求,转成servlet http请求

其中, GenerateServletRequestHandler 完成这部分工作,传递给下一个handler的,就是 MockHttpServletRequest 类型:

@Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
        if (!fullHttpRequest.decoderResult().isSuccess()) {
            sendError(channelHandlerContext, BAD_REQUEST);
            return;
        }

        // 设置请求的会话id
        String token = UUID.randomUUID().toString().replace("-", "");
        MDC.put(SESSION_KEY, token);

        String remoteIP = getRemoteIP(fullHttpRequest, channelHandlerContext);
        MockHttpServletRequest servletRequest = createServletRequest(fullHttpRequest);
        String s = fullHttpRequest.content().toString(CharsetUtil.UTF_8);

        log.info("{},request:{},param:{}", remoteIP, fullHttpRequest.uri(), s);
        try {
            channelHandlerContext.fireChannelRead(servletRequest);
        } finally {
            // 删除SessionId
            MDC.remove(SESSION_KEY);
        }

    }

模拟servlet filter chain对请求进行处理

这里说下,为什么要使用spring来管理它,且类型为prototype,因为:每次请求进来,都会去调用

com.ceiec.router.netty.DispatcherServletChannelInitializer#initChannel ,在那里面是如下的从spring上下文获取的方式来拿到 FilterNettyHandler 的。

@Override
    public void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        ... 
        /**
         * 过滤器处理器,模拟servlet中的 filter 链
         */
        FilterNettyHandler filterNettyHandler = SpringContextUtils.getApplicationContext().getBean(FilterNettyHandler.class);
        pipeline.addLast("FilterNettyHandler", filterNettyHandler);
    }
package com.ceiec.router.netty.handler;

import com.ceiec.router.netty.DispatcherServletChannelInitializer;
import com.ceiec.router.netty.filter.ApplicationFilterChain;
import com.ceiec.router.netty.filter.ApplicationFilterFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Scope;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.stereotype.Component;

/**
 * desc: 模拟servlet的filter链
 * netty handler链的初始化在{@link DispatcherServletChannelInitializer#initChannel(io.netty.channel.socket.SocketChannel)}
 * @author: ckl
 * creat_date: 2019/12/10 0010
 * creat_time: 10:14
 **/
@Slf4j
@Component
@Scope(scopeName = "prototype")
public class FilterNettyHandler extends SimpleChannelInboundHandler<MockHttpServletRequest> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MockHttpServletRequest httpServletRequest) throws Exception {
        MockHttpServletResponse httpServletResponse = new MockHttpServletResponse();
        ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(ctx,httpServletRequest);
        if (filterChain == null) {
            return;
        }

        filterChain.doFilter(httpServletRequest, httpServletResponse);
    }
}

handler最后一棒:将请求交给dispatcherServlet处理

package com.ceiec.router.netty.handler;

import com.ceiec.router.netty.DispatcherServletChannelInitializer;
import com.ceiec.router.netty.filter.RequestResponseWrapper;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.stream.ChunkedStream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.DispatcherServlet;

import java.io.ByteArrayInputStream;
import java.io.InputStream;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
 *
 * desc:
 * 请求交给,Spring的dispatcherServlet处理
 * netty handler链的初始化在{@link DispatcherServletChannelInitializer#initChannel(io.netty.channel.socket.SocketChannel)}
 * @author: caokunliang
 * creat_date: 2019/8/21 0021
 * creat_time: 15:46
 **/
@Slf4j
@Component
@Scope(scopeName = "prototype")
public class DispatcherServletHandler extends SimpleChannelInboundHandler<RequestResponseWrapper> {

    @Autowired
    private DispatcherServlet dispatcherServlet;


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestResponseWrapper requestResponseWrapper) throws Exception {
        MockHttpServletRequest servletRequest = (MockHttpServletRequest) requestResponseWrapper.getServletRequest();
        MockHttpServletResponse servletResponse = (MockHttpServletResponse) requestResponseWrapper.getServletResponse();
        //这里调用dispatcherServlet的service,最终会调用controller的方法,响应流会写入到servletResponse中
        dispatcherServlet.service(servletRequest, servletResponse);

        HttpResponseStatus status = HttpResponseStatus.valueOf(servletResponse.getStatus());
        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);

        for (String name : servletResponse.getHeaderNames()) {
            response.headers().add(name, servletResponse.getHeader(name));
        }

        response.headers().add("Content-Type","application/json;charset=UTF-8");

        // Write the initial line and the header.
        channelHandlerContext.write(response);

        InputStream contentStream = new ByteArrayInputStream(servletResponse.getContentAsByteArray());

        ChunkedStream stream = new ChunkedStream(contentStream);
        ChannelFuture writeFuture = channelHandlerContext.writeAndFlush(stream);
        writeFuture.addListener(ChannelFutureListener.CLOSE);
    }


}

总结

大概就上面这些东西了,整体来说,有很多需要优化的东西。但我本身对netty的使用,只能算相对勉强,很多细节性的东西没考虑。

比如:

  1. 我这里,是很粗暴地每次请求后,关闭了连接;
  2. 请求id在从worker线程,传给dispatcherServlet的业务线程时,丢失了(主要是直接使用了netty的api,来生成线程池,难以控制);
  3. 我使用了这个技术的微服务,qps不算高,高了之后,会不会有大问题,暂时未知,需要进一步测试,但最近也忙,时间有限。
  4. channel的handler这里,现在用的prototype的bean,如果换成单例bean,在高并发下会不会有问题呢,待验证。

虽然问题很多,但是我觉得很难等到我全部完善了再分享,因为我个人能力有限( netty 功力不行,哈哈)。我能做的是,先分享,抛砖引玉,后续有时间了我也会慢慢优化。

代码地址:https://gitee.com/ckl111/Netty_Spring_MVC_Sample

原文  http://www.cnblogs.com/grey-wolf/p/12017818.html
正文到此结束
Loading...