在微服务系统中,缓存、限流、熔断是保证系统高可用的三板斧,今天我们就来聊聊限流。
限流是保障系统高可用的方式之一,当然啦也是大厂高频面试题,如果阿里的面试官问一句:“如何实现每秒钟1K个请求的限流?”,你要是分分钟给他写上几种限流方案,那岂不香哉,哈哈:smirk:! 话不多说,我来列几种常用限流实现方式。
Guava是Java领域很优秀的开源项目,包含了日常开发常用的集合、String、缓存等, 其中RateLimiter是常用限流工具。 RateLimiter是基于令牌桶算法实现的,如果每秒10个令牌,内部实现,会每100ms生产1个令牌。
引入pom依赖:
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> 复制代码
代码:
public class GuavaRateLimiterTest { //比如每秒生产10个令牌,相当于每100ms生产1个令牌 private RateLimiter rateLimiter = RateLimiter.create(10); /** * 模拟执行业务方法 */ public void exeBiz() { if (rateLimiter.tryAcquire(1)) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + ":执行业务逻辑"); } else { System.out.println("线程" + Thread.currentThread().getName() + ":被限流"); } } public static void main(String[] args) throws InterruptedException { GuavaRateLimiterTest limiterTest = new GuavaRateLimiterTest(); Thread.sleep(500);//等待500ms,让limiter生产一些令牌 //模拟瞬间生产100个线程请求 for (int i = 0; i < 100; i++) { new Thread(limiterTest::exeBiz).start(); } } } 复制代码
打个比方,某接口每秒允许100个请求,设置一个滑窗,窗口中有10个格子,每个格子占100ms,每100ms移动一次。滑动窗口的格子划分的越多,滑动窗口的滚动就越平滑,限流的统计就会越精确。
代码:
/** * 滑窗计数器 */ public class SliderWindowRateLimiter implements Runnable { //每秒允许的最大访问数 private final long maxVisitPerSecond; //将每秒时间划分N个块 private final int block; //每个块存储的数量 private final AtomicLong[] countPerBlock; //滑动窗口划到了哪个块儿,可以理解为滑动窗口的起始下标位置 private volatile int index; //目前总的数量 private AtomicLong allCount; /** * 构造函数 * * @param block,每秒钟划分N个窗口 * @param maxVisitPerSecond 每秒最大访问数量 */ public SliderWindowRateLimiter(int block, long maxVisitPerSecond) { this.block = block; this.maxVisitPerSecond = maxVisitPerSecond; countPerBlock = new AtomicLong[block]; for (int i = 0; i < block; i++) { countPerBlock[i] = new AtomicLong(); } allCount = new AtomicLong(0); } /** * 判断是否超过最大允许数量 * * @return */ public boolean isOverLimit() { return currentQPS() > maxVisitPerSecond; } /** * 获取目前总的访问数 * * @return */ public long currentQPS() { return allCount.get(); } /** * 请求访问进来,判断是否可以执行业务逻辑 */ public void visit() { countPerBlock[index].incrementAndGet(); allCount.incrementAndGet(); if (isOverLimit()) { System.out.println(Thread.currentThread().getName() + "被限流" + ",currentQPS:" + currentQPS() + ",index:" + index); } else { System.out.println(Thread.currentThread().getName() + "执行业务逻辑" + ",currentQPS:" + currentQPS() + ",index:" + index); } } /** * 定时执行器, * 每N毫秒滑块移动一次,然后再设置下新滑块的初始化数字0,然后新的请求会落到新的滑块上 * 同时总数减掉新滑块上的数字,并且重置新的滑块上的数量 */ @Override public void run() { index = (index + 1) % block; long val = countPerBlock[index].getAndSet(0); allCount.addAndGet(-val); } public static void main(String[] args) { SliderWindowRateLimiter sliderWindowRateLimiter = new SliderWindowRateLimiter(10, 100); //固定的速率移动滑块 ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter, 100, 100, TimeUnit.MILLISECONDS); //模拟不同速度的请求 new Thread(() -> { while (true) { sliderWindowRateLimiter.visit(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); //模拟不同速度的请求 new Thread(() -> { while (true) { sliderWindowRateLimiter.visit(); try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } 复制代码
利用Semaphore,每隔固定速率,释放Semaphore的资源。线程获取到资源,则执行业务代码。
代码:
public class SemaphoreOne { private static Semaphore semaphore = new Semaphore(10); public static void bizMethod() throws InterruptedException { if (!semaphore.tryAcquire()) { System.out.println(Thread.currentThread().getName() + "被拒绝"); return; } System.out.println(Thread.currentThread().getName() + "执行业务逻辑"); Thread.sleep(500);//模拟处理业务逻辑需要1秒 semaphore.release(); } public static void main(String[] args) { Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { semaphore.release(10); System.out.println("释放所有锁"); } }, 1000, 1000); for (int i = 0; i < 10000; i++) { try { Thread.sleep(10);//模拟每隔10ms就有1个请求进来 } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { try { SemaphoreOne.bizMethod(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } } 复制代码
令牌桶算法:一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌,如有剩余容量则添加,没有则放弃。如果有请求进来,则需要先从桶里获取令牌,当桶里没有令牌可取时,则拒绝任务。
令牌桶的优点是:可以改变添加令牌的速率,一旦提高速率,则可以处理突发流量。
代码:
public class TokenBucket { /** * 定义的桶 */ public class Bucket { //容量 int capacity; //速率,每秒放多少 int rateCount; //目前token个数 AtomicInteger curCount = new AtomicInteger(0); public Bucket(int capacity, int rateCount) { this.capacity = capacity; this.rateCount = rateCount; } public void put() { if (curCount.get() < capacity) { System.out.println("目前数量==" + curCount.get() + ", 我还可以继续放"); curCount.addAndGet(rateCount); } } public boolean get() { if (curCount.get() >= 1) { curCount.decrementAndGet(); return true; } return false; } } @Test public void testTokenBucket() throws InterruptedException { Bucket bucket = new Bucket(5, 2); //固定线程,固定的速率往桶里放数据,比如每秒N个 ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(1); scheduledCheck.scheduleAtFixedRate(() -> { bucket.put(); }, 0, 1, TimeUnit.SECONDS); //先等待一会儿,让桶里放点token Thread.sleep(6000); //模拟瞬间10个线程进来拿token for (int i = 0; i < 10; i++) { new Thread(() -> { if (bucket.get()) { System.out.println(Thread.currentThread() + "获取到了资源"); } else { System.out.println(Thread.currentThread() + "被拒绝"); } }).start(); } //等待,往桶里放token Thread.sleep(3000); //继续瞬间10个线程进来拿token for (int i = 0; i < 10; i++) { new Thread(() -> { if (bucket.get()) { System.out.println(Thread.currentThread() + "获取到了资源"); } else { System.out.println(Thread.currentThread() + "被拒绝"); } }).start(); } } } 复制代码