本文基于 Guava-18.0.jar 。
RateLimiter
是令牌桶思想的一个实现,可实现流量整形、资源访问速率控制。
与信号量对比:
RateLimiter RateLimiter
对 RateLimiter
请求许可的数量不会对请求本身产生抑制影响,但会对下一个请求产生抑制。例如一个请求很大数量许可的请求到达空闲 RateLimiter
时,它将马上获得许可,但下一个请求会被抑制,从而为前面昂贵的请求付出代价。
从 RateLimiter
申请许可时,可能会阻塞、也可能不会,是否阻塞取决于上一次分配许可的时间和当前请求的许可数量。
RateLimiter
可以配置一个 warnup 热身周期,在这个周期内每秒发出的许可数量稳步增长直至达到稳定的速率。简单说是慢启动吧。
// 每秒2个的速率提交任务 final RateLimiter rateLimiter = RateLimiter.create(2.0); void submitTasks(List tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } }
RateLimiter -- SmoothRateLimiter -- SmoothBursty -- SmoothWarmingUp
RateLimiter
是个抽象类,提供静态方法来创建具体的子类实例。
每个实例持有一个 SleepingStopwatch
实例,用来计时、提供睡眠等待的功能。
RateLimiter
是线程安全的,通过 synchronized
关键字来保证。
public abstract class RateLimiter { public static RateLimiter create(double permitsPerSecond) { return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond); } static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod); return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit); } static RateLimiter create( SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } private final SleepingStopwatch stopwatch; // 用来提供监视器锁的对象 private volatile Object mutexDoNotUseDirectly; }
public double acquire(int permits) { // 申请给定数量的许可,返回成功获得许可需要等待的时间。 long microsToWait = reserve(permits); // 进行等待 stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); // 利用监视器锁保证线程安全性 synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength(int permits, long nowMicros) { // 返回可获得许可的时刻 long momentAvailable = reserveEarliestAvailable(permits, nowMicros); // 与当前时间做差才是要等待的时长 return max(momentAvailable - nowMicros, 0); } // 具体的分配许可的行为由子类实现,在 SmoothRateLimiter 类里 abstract long reserveEarliestAvailable(int permits, long nowMicros);
下面的代码位于 SmoothRateLimiter
类里。
属性:
// 当前已存储的许可,令牌桶桶里攒的 double storedPermits; // 令牌桶能容纳的最大许可数量 double maxPermits; // 稳定速率下、生成两个许可之间的时间间隔。 // 例如稳定速率是每秒 5 个许可,那么稳定的间隔是 200ms double stableIntervalMicros; // 下一次请求(不管申请的大小)被授予的时间。 // 在授权一个请求后,会把这个值推向更远的未来。更大的请求比小的请求 推的更远。 // 此值可能在过去或未来 private long nextFreeTicketMicros = 0L;
令牌分配核心算法:
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { // 同步时间和桶里的令牌数量 resync(nowMicros); // 当前请求获取成功的时间为 nextFreeTicketMicros // 与请求数量 requiredPermits 无关 long returnValue = nextFreeTicketMicros; // 计算当前立即可得的许可数量 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 当前请求需要新生成的许可数量 double freshPermits = requiredPermits - storedPermitsToSpend; // 生成上述数量新许可需要的时间 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 新许可需要的时间由下一个请求承担 this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; this.storedPermits -= storedPermitsToSpend; return returnValue; } private void resync(long nowMicros) { // nextFreeTicket 处于过去,说明当前请求可以立即申请成功 if (nowMicros > nextFreeTicketMicros) { // 更新桶里的令牌数量、但不能超过桶的容量 storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros); nextFreeTicketMicros = nowMicros; } }
该算法的核心逻辑为:
nextFreeTicketMicros
处于过去,当前请求可以立即成功,不管它申请的许可数量是多少。 nextFreeTicketMicros
处于未来,说明当前请求受前面请求的影响,需要等待 (nowMicros - nextFreeTicketMicros)
。 freshPermits = requiredPermits - min(requiredPermits, this.storedPermits)
。 waitMicros = X + (long) (freshPermits * stableIntervalMicros)
,这个等待时间由下一个请求承受。其中 X = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
由子类的具体实现决定, SmoothWarmingUp
子类用该方法实现慢启动。 this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros
。 上面的逻辑贯彻了前面所说的:
对 RateLimiter
请求许可的数量不会对请求本身产生抑制影响,但会对下一个请求产生抑制。例如一个请求很大数量许可的请求到达空闲 RateLimiter
时,它将马上获得许可,但下一个请求会被抑制,从而为前面昂贵的请求付出代价。
RateLimiter
的核心思路是优先满足当前请求。
如果桶里的令牌数量不能满足当前请求,为了保证速率要求,可以让当前请求等待令牌生成,也就是让当前请求等待,也可以让后面的请求延迟执行。
不管是让谁等都可能要等待,那不如让当前的请求优先满足、后面的请求去等待。如果后面的请求来得很晚或者根本就不来,那就不需要等待、赚到了。
与漏桶算法的对比:
欢迎关注我的微信公众号: coderbee笔记 。