限流场景一般基于硬件资源的使用负载,包括CPU,内存,IO。例如某个报表服务需要消耗大量内存,如果并发数增加就会拖慢整个应用,甚至内存溢出导致应用挂掉。
限流适用于会动态增加的资源,已经池化的资源不一定需要限流,例如数据库连接池,它是已经确定的资源,池的大小固定(即使可以动态伸缩池大小),这种场景下并不需要通过限流来实现,只要能做到如果池内链接已经使用完,则无法再获取新的连接则可。
因此,使用限流的前提是:
1.防止资源使用过载产生不良影响。
2.使用的资源会动态增加,例如一个站点的请求。
1.只针对Controller限流
2.根据url请求路径限流
3.可根据正则表达式匹配url来限流 4.可定义多个限流规则,每个规则的最大流量不同
1.CurrentLimiteAspect是一个拦截器,在controller执行前后执行后拦截
2.CurrentLimiter是限流器,可以添加限流规则,根据限流规则获取流量通行证,释放流量通行证;如果获取通行证失败则抛出异常。
3.LimiteRule是限流规则,限流规则可设置匹配url的正则表达式和最大流量值,同时获取该规则的流量通信证和释放流量通信证。
4.AcquireResult是获取流量通信证的结果,结果有3种:获取成功,获取失败,不需要获取。
5.Application是Spring的启动类,简单起见,在启动类种添加限流规则。
public class AcquireResult { /** 获取通行证成功 */ public static final int ACQUIRE_SUCCESS = 0; /** 获取通行证失败 */ public static final int ACQUIRE_FAILED = 1; /** 不需要获取通行证 */ public static final int ACQUIRE_NONEED = 2; /** 获取通行证结果 */ private int result; /** 可用通行证数量 */ private int availablePermits; public int getResult() { return result; } public void setResult(int result) { this.result = result; } public int getAvailablePermits() { return availablePermits; } public void setAvailablePermits(int availablePermits) { this.availablePermits = availablePermits; } } 复制代码
/** * @ClassName LimiteRule * @Description TODO * @Author 铿然一叶 * @Date 2019/10/4 20:18 * @Version 1.0 * javashizhan.com **/ public class LimiteRule { /** 信号量 */ private final Semaphore sema; /** 请求URL匹配规则 */ private final String pattern; /** 最大并发数 */ private final int maxConcurrent; public LimiteRule(String pattern, int maxConcurrent) { this.sema = new Semaphore(maxConcurrent); this.pattern = pattern; this.maxConcurrent = maxConcurrent; } /** * 获取通行证 * @param urlPath 请求Url * @return 0-获取成功,1-没有获取到通行证,2-不需要获取通行证 */ public synchronized AcquireResult tryAcquire(String urlPath) { AcquireResult acquireResult = new AcquireResult(); acquireResult.setAvailablePermits(this.sema.availablePermits()); try { //Url请求匹配规则则获取通行证 if (Pattern.matches(pattern, urlPath)) { boolean acquire = this.sema.tryAcquire(50, TimeUnit.MILLISECONDS); if (acquire) { acquireResult.setResult(AcquireResult.ACQUIRE_SUCCESS); print(urlPath); } else { acquireResult.setResult(AcquireResult.ACQUIRE_FAILED); } } else { acquireResult.setResult(AcquireResult.ACQUIRE_NONEED); } } catch (InterruptedException e) { e.printStackTrace(); } return acquireResult; } /** * 释放通行证 */ public synchronized void release() { this.sema.release(); print(null); } /** * 得到最大并发数 * @return */ public int getMaxConcurrent() { return this.maxConcurrent; } /** * 得到匹配表达式 * @return */ public String getPattern() { return this.pattern; } /** * 打印日志 * @param urlPath */ private void print(String urlPath) { StringBuffer buffer = new StringBuffer(); buffer.append("Pattern: ").append(pattern).append(", "); if (null != urlPath) { buffer.append("urlPath: ").append(urlPath).append(", "); } buffer.append("Available Permits:").append(this.sema.availablePermits()); System.out.println(buffer.toString()); } } 复制代码
/** * @ClassName CurrentLimiter * @Description TODO * @Author 铿然一叶 * @Date 2019/10/4 20:18 * @Version 1.0 * javashizhan.com **/ public class CurrentLimiter { /** 本地线程变量,存储一次请求获取到的通行证,和其他并发请求隔离开,在controller执行完后释放本次请求获得的通行证 */ private static ThreadLocal<Vector<LimiteRule>> localAcquiredLimiteRules = new ThreadLocal<Vector<LimiteRule>>(); /** 所有限流规则 */ private static Vector<LimiteRule> allLimiteRules = new Vector<LimiteRule>(); /** 私有构造器,避免实例化 */ private CurrentLimiter() {} /** * 添加限流规则,在spring启动时添加,不需要加锁,如果在运行中动态添加,需要加锁 * @param rule */ public static void addRule(LimiteRule rule) { printRule(rule); allLimiteRules.add(rule); } /** * 获取流量通信证,所有流量规则都要获取后才能通过,如果一个不能获取则抛出异常 * 多线程并发,需要加锁 * @param urlPath */ public synchronized static void tryAcquire(String urlPath) throws Exception { //有限流规则则处理 if (allLimiteRules.size() > 0) { //能获取到通行证的流量规则要保存下来,在Controller执行完后要释放 Vector<LimiteRule> acquiredLimitRules = new Vector<LimiteRule>(); for(LimiteRule rule:allLimiteRules) { //获取通行证 AcquireResult acquireResult = rule.tryAcquire(urlPath); if (acquireResult.getResult() == AcquireResult.ACQUIRE_SUCCESS) { acquiredLimitRules.add(rule); //获取到通行证的流量规则添加到本地线程变量 localAcquiredLimiteRules.set(acquiredLimitRules); } else if (acquireResult.getResult() == AcquireResult.ACQUIRE_FAILED) { //如果获取不到通行证则抛出异常 StringBuffer buffer = new StringBuffer(); buffer.append("The request [").append(urlPath).append("] exceeds maximum traffic limit, the limit is ").append(rule.getMaxConcurrent()) .append(", available permit is").append(acquireResult.getAvailablePermits()).append("."); System.out.println(buffer); throw new Exception(buffer.toString()); } else { StringBuffer buffer = new StringBuffer(); buffer.append("This path does not match the limit rule, path is [").append(urlPath) .append("], pattern is [").append(rule.getPattern()).append("]."); System.out.println(buffer.toString()); } } } } /** * 释放获取到的通行证。在controller执行完后掉调用(抛出异常也需要调用) */ public synchronized static void release() { Vector<LimiteRule> acquiredLimitRules = localAcquiredLimiteRules.get(); if (null != acquiredLimitRules && acquiredLimitRules.size() > 0) { acquiredLimitRules.forEach(rule->{ rule.release(); }); } //destory本地线程变量,避免内存泄漏 localAcquiredLimiteRules.remove(); } /** * 打印限流规则信息 * @param rule */ private static void printRule(LimiteRule rule) { StringBuffer buffer = new StringBuffer(); buffer.append("Add Limit Rule, Max Concurrent: ").append(rule.getMaxConcurrent()) .append(", Pattern: ").append(rule.getPattern()); System.out.println(buffer.toString()); } } 复制代码
/** * @ClassName CurrentLimiteAspect * @Description TODO * @Author 铿然一叶 * @Date 2019/10/4 20:15 * @Version 1.0 * javashizhan.com **/ @Aspect @Component public class CurrentLimiteAspect { /** * 拦截controller,自行修改路径 */ @Pointcut("execution(* com.javashizhan.controller..*(..))") public void controller() { } @Before("controller()") public void controller(JoinPoint point) throws Exception { HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); //获取通行证,urlPath的格式如:/limit CurrentLimiter.tryAcquire(request.getRequestURI()); } /** * controller执行完后调用,即使controller抛出异常这个拦截方法也会被调用 * @param joinPoint */ @After("controller()") public void after(JoinPoint joinPoint) { //释放获取到的通行证 CurrentLimiter.release(); } } 复制代码
@SpringBootApplication public class Application { public static void main(String[] args) { new SpringApplicationBuilder(Application.class).run(args); //添加限流规则 LimiteRule rule = new LimiteRule("/limit", 4); CurrentLimiter.addRule(rule); } } 复制代码
测试验证碰到的两个坑:
1.人工通过浏览器刷新请求发现controller是串行的
2.通过postman设置了并发测试也还是串行的,即便设置了并发数,如下图:
百度无果,只能自行写代码验证了,代码如下:
/** * @ClassName TestClient * @Description TODO * @Author 铿然一叶 * @Date 2019/10/5 0:51 * @Version 1.0 * javashizhan.com **/ public class CurrentLimiteTest { public static void main(String[] args) { final String limitUrlPath = "http://localhost:8080/limit"; final String noLimitUrlPath = "http://localhost:8080/nolimit"; //限流测试 test(limitUrlPath); //休眠一会,等上一批线程执行完,方便查看日志 sleep(5000); //不限流测试 test(noLimitUrlPath); } private static void test(String urlPath) { Thread[] requesters = new Thread[10]; for (int i = 0; i < requesters.length; i++) { requesters[i] = new Thread(new Requester(urlPath)); requesters[i].start(); } } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } } class Requester implements Runnable { private final String urlPath; private final RestTemplate restTemplate = new RestTemplate(); public Requester(String urlPath) { this.urlPath = urlPath; } @Override public void run() { String response = restTemplate.getForEntity(urlPath, String.class).getBody(); System.out.println("response: " + response); } } 复制代码
输出日志如下:
Pattern: /limit, urlPath: /limit, Available Permits:3 Pattern: /limit, urlPath: /limit, Available Permits:2 Pattern: /limit, urlPath: /limit, Available Permits:1 Pattern: /limit, urlPath: /limit, Available Permits:0 The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0. The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0. The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0. The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0. The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0. The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0. Pattern: /limit, Available Permits:1 Pattern: /limit, Available Permits:2 Pattern: /limit, Available Permits:3 Pattern: /limit, Available Permits:4 This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. This path does not match the limit rule, path is [/nolimit] pattern is [/limit]. 复制代码
可以看到日志输出信息为:
1.第1个测试url最大并发为4,一次10个并发请求,有4个获取通行证后,剩余6个获取通行证失败。
2.获取到通行证的4个请求在controller执行完后释放了通行证。
3.第2个测试url没有限制并发,10个请求均执行成功。
至此,限流器验证成功。
end.
相关阅读:
Java并发编程(一)知识地图
Java并发编程(二)原子性
Java并发编程(三)可见性
Java并发编程(四)有序性
Java并发编程(五)创建线程方式概览
Java并发编程入门(六)synchronized用法
Java并发编程入门(七)轻松理解wait和notify以及使用场景
Java并发编程入门(八)线程生命周期
Java并发编程入门(九)死锁和死锁定位
Java并发编程入门(十)锁优化
Java并发编程入门(十二)生产者和消费者模式-代码模板
Java并发编程入门(十三)读写锁和缓存模板