任何应用都有一个设计指标,当应用的压力超过了他设计所能承载的能力时,就好比一座只允许行人通过的独木桥,是无法承载一辆坦克的重量的,这个时候,为了让机器能够继续运行,在不宕机的情况下尽其所能的对一部分用户提供服务,保证整个流程能够继续走下去,这个时候,就必须对应用进行流控,丢弃一部分用户的请无法避免。
流控可以从多个维度来进行,比如针对 QPS , 并发线程数 , 黑白名单 , 加权分级 等等,最典型最直接的便是针对 QPS 和 并发线程数 的流控。当然,要进行流控,首先等有一个流控的阀值,这个阀值不是说拍拍脑袋就能够想出来,不同类型的应用,所面临的情况不一样,也没有一个统一的衡量标准,必须经过多轮的压力测试,才能够得出一个比较靠谱的数值。
一、简单的流控
1、使用Semphore进行并发流控
模拟代码如下所示:
Semaphore semphore = new Semaphore(10); if(semphore.getQueueLength() > 10){ //等待队列阀值为10时 return; } try { semphore.acquire(); //干活 } catch (InterruptedException e) { e.printStackTrace(); }finally{ semphore.release();//释放 }
也可以参见:http://ifeve.com/concurrency-practice-1/
2、使用乐观锁加上下文切换进行流控
public void enter(Object obj){ boolean isUpdate = false; int countValue = count.get(); if(countValue > 0){ isUpdate = count.compareAndSet(countValue, countValue -1); if(isUpdate)return; } concurQueue.add(obj); try { obj.wait(); } catch (InterruptedException e) { logger.error("flowcontrol thread was interrupted .......",e); } return ; } public void release(){ synchronized(count){ if(count.get() < VALVE){ count.set(count.get() + 1); } } Object obj = concurQueue.remove(); if(obj != null){ synchronized (obj) { obj.notify(); } } System.out.println("notify ..............."); return ; }
具体采用信号量还是使用上下文切换形式,需要根据临界代码段执行的时间而定
当请求进来时,调用配置的concurrentlock的enter方法,判断是否达到阀值,如果没有达到阀值,则进入,进行处理, 处理完后计数器加1,如果已经达到阀值则放入等待队列,因为等待队列是消耗内存的,因此等待队列也必须有阀值,如果队列超过阀值,请求直接丢弃
二、漏斗算法和桶令牌算法
利用现存的算法,比如:漏斗算法和桶令牌算法进行流量的控制。
常见流控算法及Java实现
现在主要的工作是处理一些中间件,所以流控必然是需要去考虑的东西。
流控更专业的叫法是:流量整形(traffic shaping),典型作用是限制流出某一网络的某一连接的流量与突发,使这类报文以比较均匀的速度向外发送。
常见算法
通常的做法就是通过建立一个缓存区或是令牌桶来实现。更具体的算法是:漏斗算法和桶令牌算法。
漏斗算法就是有一个斗:数据往这个斗中流入,然后开一口,以一定的速度将这个斗中的数据流出,不支持任持续突发和最大突发,至于这个斗满了如何处理再说。
桶令牌算法:一个存放令牌的桶,以一定的速度往这个桶生成令牌,数据流出先从这个桶中拿令牌,若是拿不到令牌就另行处理(具体自己设定)。
桶令牌跟漏斗最大的区别在于可以支撑一个突然的流量变化,就是满桶令牌数的峰值。
具体代码
帮助
package com.netease.datastream.util.flowcontrol;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle;
/**
* <pre>
* Created by inter12 on 15-3-18.
* </pre>
*/
public class TokenBucket implements LifeCycle {
// 默认桶大小个数 即最大瞬间流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一个桶的单位是1字节
private int everyTokenSize = 1;
// 瞬间最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = ‘a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已经满了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 获取足够的令牌个数
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
Preconditions.checkNotNull(dataSize);
Preconditions.checkArgument(isStart, “please invoke start method first !”);
int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
@Override
public void start() {
// 初始化桶队列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生产者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
isStart = true;
}
@Override
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
@Override
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException, InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(“/tmp/ds_test”)));
String data = “xxxx”;// 四个字节
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write(“token pass — index:” + i1);
System.out.println(“token pass — index:” + i1);
} else {
bufferedWriter.write(“token rejuect — index” + i1);
System.out.println(“token rejuect — index” + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
后来查了资料:google的guava也有一个类似的流控类:RateLimiter。不过这个不是基于流量的控制,更多是速度的控制,有点像TPS。
RateLimiter的具体使用:http://java.dzone.com/articles/ratelimiter-discovering-google
参考资料:
http://7658423.blog.51cto.com/7648423/1576118
http://colobu.com/2014/11/13/rate-limiting/
http://baike.baidu.com/view/2530454.htm
———————
流量控制与令牌桶算法
GUAVA JAVA ALGORITHM
一年一度的「双 11」又要到了,阿里的码农们进入了一年中最辛苦的时光。各种容量评估、压测、扩容让我们忙得不可开交。洛阳亲友如相问,就说我搞双十一。
如何让系统在汹涌澎湃的流量面前谈笑风生?我们的策略是不要让系统超负荷工作。如果现有的系统扛不住业务目标怎么办?加机器!机器不够怎么办?业务降级,系统限流!
正所谓「他强任他强,清风拂山岗;他横任他横,明月照大江」,降级和限流是大促保障中必不可少的神兵利器,丢卒保车,以暂停边缘业务为代价保障核心业务的资源,以系统不被突发流量压挂为第一要务。
集团的中间件有一个不错的单机限流框架,支持两种限流模式:控制速率和控制并发。限流这种东西,应该是来源于网络里面的「流量整型」,通过控制数据包的传输速率和时机,来实现一些性能、服务质量方面的东西。令牌桶是一种常见的流控算法,属于控制速率类型的。控制并发则相对要常见的多,比如操作系统里的「信号量」就是一种控制并发的方式。
在 Wikipedia 上,令牌桶算法是这么描述的:
每秒会有 r 个令牌放入桶中,或者说,每过 1/r 秒桶中增加一个令牌
桶中最多存放 b 个令牌,如果桶满了,新放入的令牌会被丢弃
当一个 n 字节的数据包到达时,消耗 n 个令牌,然后发送该数据包
如果桶中可用令牌小于 n,则该数据包将被缓存或丢弃
令牌桶控制的是一个时间窗口内的通过的数据量,在 API 层面我们常说的 QPS、TPS,正好是一个时间窗口内的请求量或者事务量,只不过时间窗口限定在 1s 罢了。
现实世界的网络工程中使用的令牌桶,比概念图中的自然是复杂了许多,「令牌桶」的数量也不是一个而是两个,简单的算法描述可用参考中兴的期刊^1或者 RFC。
假如项目使用 Java 语言,我们可以轻松地借助 Guava 的 RateLimiter 来实现基于令牌桶的流控。RateLimiter 令牌桶算法的单桶实现,也许是因为在 Web 应用层面单桶实现就够用了,双筒实现就属于过度设计。
RateLimiter 对简单的令牌桶算法做了一些工程上的优化,具体的实现是 SmoothBursty。需要注意的是,RateLimiter 的另一个实现 SmoothWarmingUp,就不是令牌桶了,而是漏桶算法。也许是出于简单起见,RateLimiter 中的时间窗口能且仅能为 1s,如果想搞其他时间单位的限流,只能另外造轮子。
SmoothBursty 积极响应李克强总理的号召,上个月的流量没用完,可以挪到下个月用。其实就是 SmoothBursty 有一个可以放 N 个时间窗口产生的令牌的桶,系统空闲的时候令牌就一直攒着,最好情况下可以扛 N 倍于限流值的高峰而不影响后续请求。如果不想像三峡大坝一样能扛千年一遇的洪水,可以把 N 设置为 1,这样就只屯一个时间窗口的令牌。
RateLimiter 有一个有趣的特性是「前人挖坑后人跳」,也就是说 RateLimiter 允许某次请求拿走超出剩余令牌数的令牌,但是下一次请求将为此付出代价,一直等到令牌亏空补上,并且桶中有足够本次请求使用的令牌为止[^2]。这里面就涉及到一个权衡,是让前一次请求干等到令牌够用才走掉呢,还是让它先走掉后面的请求等一等呢?Guava 的设计者选择的是后者,先把眼前的活干了,后面的事后面再说。
[^2]: How is the RateLimiter designed, and why?
当我们要实现一个基于速率的单机流控框架的时候,RateLimiter 是一个完善的核心组件,就仿佛 Linux 内核对 GNU 操作系统那样重要。但是我们还需要其他的一些东西才能把一个流控框架跑起来,比如一个通用的 API,一个拦截器,一个在线配置流控阈值的后台等等。
下面随便写了一个简单的流控框架 API,至于拦截器和后台就懒得写了,有时间再自己造一套中间件的轮子吧~
public class TrafficShaper {
public static class RateLimitException extends Exception {
private static final long serialVersionUID = 1L;
private String resource;
public String getResource() {
return resource;
}
public RateLimitException(String resource) {
super(resource + ” should not be visited so frequently”);
this.resource = resource;
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
private static final ConcurrentMap<String, RateLimiter>
resourceLimiterMap = Maps.newConcurrentMap();
public static void updateResourceQps(String resource, double qps) {
RateLimiter limiter = resourceLimiterMap.get(resource);
if (limiter == null) {
limiter = RateLimiter.create(qps);
RateLimiter putByOtherThread
= resourceLimiterMap.putIfAbsent(resource, limiter);
if (putByOtherThread != null) {
limiter = putByOtherThread;
}
}
limiter.setRate(qps);
}
public static void removeResource(String resource) {
resourceLimiterMap.remove(resource);
}
public static void enter(String resource) throws RateLimitException {
RateLimiter limiter = resourceLimiterMap.get(resource);
if (limiter == null) {
return;
}
if (!limiter.tryAcquire()) {
throw new RateLimitException(resource);
}
}
public static void exit(String resource) {
//do nothing when use RateLimiter
}
}
——————–
1、漏桶算法(Leaky bucket)
漏桶算法强制一个常量的输出速率而不管输入数据流的突发性,当输入空闲时,该算法不执行任何动作.就像用一个底部开了个洞的漏桶接水一样,水进入到漏桶里,桶里的水通过下面的孔以固定的速率流出,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率.如下图所示:
2、令牌桶(Token bucket)
令牌桶算法的基本过程如下:
每秒会有 r 个令牌放入桶中,或者说,每过 1/r 秒桶中增加一个令牌
“漏桶算法”能够强行限制数据的传输速率,而“令牌桶算法”在能够限制数据的平均传输数据外,还允许某种程度的突发传输。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的上限,因此它适合于具有突发特性的流量。
来源:https://www.cnblogs.com/moonandstar08/p/5440555.html
几个流量控制算法总结
1.1 限流算法—-漏桶算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率,示意图如下(因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率):
1.2 限流算法—-令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解。随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了,新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务。
令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量。
3.3 基于RateLimiter的流速控制
ateLimiter使用的是令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。
RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。
通过设置许可证的速率来定义RateLimiter。在默认配置下,许可证会在固定的速率下被分配,速率单位是每秒多少个许可证。为了确保维护配置的速率,许可会被平稳地分配,许可之间的延迟会做调整。
可能存在配置一个拥有预热期的RateLimiter 的情况,在这段时间内,每秒分配的许可数会稳定地增长直到达到稳定的速率。
有一点很重要,那就是请求的许可数从来不会影响到请求本身的限制(调用acquire(1) 和调用acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的限制,也就是说,如果一个高开销的任务抵达一个空闲的RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter 并不提供公平性的保证。