线程隔离主要是针对业务中不同业务场景,按照权重区分使用不同的线程池,以达到某一个业务出现问题,不会将故障扩散到其他的业务线程池,从而达到保证主要业务高可用。
本案例主要讲解基于servlet3的线程隔离术。
首先我们回忆一下在tomcat6,tomcat6只支持BIO,它的处理流程如下:
1).tomcat负责接收servletRequest请求
2).将接收的请求分配给servlet处理业务
3).处理完请求之后,通过servletResponse写会数据
上面这三步都是在一个线程里面完成的,也就是同步进行。
如下图:
tomcat7之后版本引入了servlet3,它基于NIO能处理更大的并发数。
我们可以将整个请求改造成如下步骤:
1).tomcat单线程进行请求解析
2).解析完之后将task放入队列(可以根据不同业务类型放入不同的队列)
3).每个队列指定相应业务线程池对task进行处理
这样改造以后就可以把业务按照重要性发送到不同线程池,两个线程池分开独立配置,互不干扰。当非核心的业务出现问题之后,不会影响核心的业务。另外由于此线程池是有我们创建的,我们可以对该线程池进行监控,处理,灵活了很多。
如下图:
下面是实现代码:
@RestController @RequestMapping("/app") public class NIOCtrl { @Autowired private LocalNewsAsyncContext localNewsAsyncContext; @Autowired private NewsService newsService; @RequestMapping("/news") public void getNews(HttpServletRequest request,@RequestParam(value = "type",required = false) String type){ if("1".equals(type)){ localNewsAsyncContext.submit(request, () -> newsService.getNews()); return; } localNewsAsyncContext.submit(request, () -> newsService.getNewsMap()); } }
@Service public class LocalNewsAsyncContext { private final static Long timeOutSeconds= 5L; @Autowired private CustomAsyncListener asyncListener; @Autowired private ThreadPoolExecutor executor; public void submit(final HttpServletRequest request,final Callable<Object> task){ final String uri= request.getRequestURI(); final Map<String,String[]> params= request.getParameterMap(); //开启异步上下文 final AsyncContext asyncContext= request.startAsync(); asyncContext.getRequest().setAttribute(Constant.URI,uri); asyncContext.getRequest().setAttribute(Constant.PARAMS, params); asyncContext.setTimeout(timeOutSeconds * 1000); if(asyncContext!=null){ asyncContext.addListener(asyncListener); } executor.submit(new CustomCallable(asyncContext, task)); } }
public class CustomCallable implements Callable{ private static final Logger LOG = LoggerFactory.getLogger(CustomCallable.class); public AsyncContext asyncContext; private Callable<Object> task; private String uri; private Map<String,String[]> params; public CustomCallable(AsyncContext asyncContext, Callable<Object> task){ this.asyncContext= asyncContext; this.task= task; this.uri= (String) asyncContext.getRequest().getAttribute(Constant.URI); this.params= (Map<String, String[]>) asyncContext.getRequest().getAttribute(Constant.PARAMS); } @Override public Object call() throws Exception { Object o= task.call(); if(o==null){ callback(asyncContext,o); }else if(o instanceof String){ callback(asyncContext, o); }else if(o instanceof CompletableFuture){ CompletableFuture<Object> future= (CompletableFuture<Object>) o; future.thenAccept(o1 -> callback(asyncContext, o1)) .exceptionally(throwable -> { callback(asyncContext,""); return null; }); }else { callback(asyncContext, o); } return null; } private void callback(AsyncContext asyncContext,Object result){ HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse(); try{ if(result instanceof String){ write(response, (String) result); }else { write(response, JSON.toJSONString(result)); } }catch (Exception e){ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); e.printStackTrace(); try { LOG.error("get info error for uri:{}, params:{}",uri,JSON.toJSONString(params),e); }catch (Exception e1){} }finally { asyncContext.complete(); } } private void write(HttpServletResponse response,String result) throws IOException { response.getOutputStream().write(result.getBytes()); } }
@Configuration public class LocalNewsPoolConfig { private final static Logger LOG= LoggerFactory.getLogger(LocalNewsPoolConfig.class); @Bean public ThreadPoolExecutor init(){ int corePoolSize= 10; int maximumPoolSize= 100; int queueCapacity= 200; LinkedBlockingDeque<Runnable> queue= new LinkedBlockingDeque<>(queueCapacity); ThreadPoolExecutor executor= new ThreadPoolExecutor(corePoolSize,maximumPoolSize,60L, TimeUnit.SECONDS,queue); executor.allowCoreThreadTimeOut(true); executor.setRejectedExecutionHandler((r, executor1) -> { if(r instanceof CustomCallable){ CustomCallable call= (CustomCallable) r; AsyncContext asyncContext= call.asyncContext; if(asyncContext!=null){ handler(asyncContext); } } }); return executor; } private static void handler(AsyncContext asyncContext){ try{ ServletRequest req= asyncContext.getRequest(); String uri= (String) req.getAttribute(Constant.URI); Map params= (Map) req.getAttribute(Constant.PARAMS); LOG.error("async req rejected. uri :{},params:{}",uri, JSON.toJSONString(params)); }catch (Exception e){ e.printStackTrace(); try{ HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse(); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); }catch (Exception e1){} }finally { asyncContext.complete(); } } }
@Component public class CustomAsyncListener implements AsyncListener { private Logger LOG= LoggerFactory.getLogger(CustomAsyncListener.class); @Override public void onComplete(AsyncEvent asyncEvent) throws IOException { } @Override public void onTimeout(AsyncEvent asyncEvent) throws IOException { AsyncContext asyncContext= asyncEvent.getAsyncContext(); try{ ServletRequest req= asyncContext.getRequest(); String uri= (String) req.getAttribute(Constant.URI); Map params= (Map) req.getAttribute(Constant.PARAMS); LOG.error("async req timeOut. uri :{},params:{}",uri, JSON.toJSONString(params)); }catch (Exception e){ e.printStackTrace(); }finally { try{ HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse(); response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT); }catch (Exception e1){} asyncContext.complete(); } } @Override public void onError(AsyncEvent asyncEvent) throws IOException { AsyncContext asyncContext= asyncEvent.getAsyncContext(); try{ ServletRequest req= asyncContext.getRequest(); String uri= (String) req.getAttribute(Constant.URI); Map params= (Map) req.getAttribute(Constant.PARAMS); LOG.error("async req error. uri :{},params:{}",uri, JSON.toJSONString(params)); }catch (Exception e){ e.printStackTrace(); try{ HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse(); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); }catch (Exception e1){} }finally { asyncContext.complete(); } } @Override public void onStartAsync(AsyncEvent asyncEvent) throws IOException { } }
@Service public class NewsService { public String getNews(){ return "servlet3 nio test."; } public StringBuilder getNewsMap(){ return new StringBuilder("I do and i understand."); } } public class Constant { public static final String URI= "uri"; public static final String PARAMS= "params"; }
来源:https://www.jianshu.com/p/0529c126e166?utm_campaign