转载

每日一博 | 使用 Guava 的 RateLimiter 做限流

场景:

1.

在日常生活中,我们肯定收到过不少不少这样的短信,“京东最新优惠卷…”,“天猫送您…”。这种类型的短信是属于推广性质的短信。这种短信一般群发量会到千万级别。然而,要完成这些短信发送,我们是需要调用服务商的接口来完成的。倘若一次发送的量在200万条,而我们的服务商接口每秒能处理的短信发送量有限,只能达到200条每秒。那么这个时候就会产生问题了,我们如何能控制好程序发送短信时的速度昵?于是限流器就得用上了。

2.

提供服务接口的人或多或少遇到这样的问题,业务负载能力有限,为了防止过多请求涌入造成系统崩溃,如何进行流量控制?

流量控制策略有:分流,降级,限流等。这里我们讨论限流策略,他的作用是限制请求访问频率,换取系统高可用,是比较保守方便的策略。

3.常用的限流算法由: 漏桶算法令牌桶算法

一、漏桶算法

漏桶作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:

  • 一个固定容量的漏桶,按照常量固定速率流出水滴;
  • 如果桶是空的,则不需流出水滴;
  • 可以以任意速率流入水滴到漏桶;
  • 如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。

漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

每日一博 | 使用 Guava 的 RateLimiter 做限流

可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.

二、令牌桶算法

令牌桶算法是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述如下:

  • 假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌;
  • 桶中最多存放b个令牌,当桶满时,新添加的令牌被丢弃或拒绝;
  • 当一个n个字节大小的数据包到达,将从桶中删除n个令牌,接着数据包被发送到网络上;
  • 如果桶中的令牌不足n个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)。

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务

每日一博 | 使用 Guava 的 RateLimiter 做限流

令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

三、测试代码

测试代码1

package com.xx;

import com.google.common.util.concurrent.RateLimiter;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author hanliwei
 * @create 2018-06-21 17:10
 */
public class Test2 {

    public static void main(String[] args) {

        //新建一个每秒限制3个的令牌桶
        RateLimiter rateLimiter = RateLimiter.create(3.0);

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //获取令牌桶中一个令牌,最多等待10秒
                    if (rateLimiter.tryAcquire(1, 10, TimeUnit.SECONDS)) {
                        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                    }
                }
            });
        }

        executor.shutdown();
    }
}

结果:

2018-06-21 17:53:31

2018-06-21 17:53:31

2018-06-21 17:53:32

2018-06-21 17:53:32

2018-06-21 17:53:32

2018-06-21 17:53:33

2018-06-21 17:53:33

2018-06-21 17:53:33

2018-06-21 17:53:34

2018-06-21 17:53:34

测试代码2

package com.xx;

import com.google.common.util.concurrent.RateLimiter;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author hanliwei
 * @create 2018-06-21 17:57
 */
public class Test3 {

    public static void main(String[] args) {
        //线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        //速率是每秒只有5个许可
        final RateLimiter rateLimiter = RateLimiter.create(3.0);

        for (int i = 0; i < 10; i++) {
            final int no = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        //获取许可
                        rateLimiter.acquire();
                        System.out.println("Accessing: " + no + ",time:"
                                + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date()));

                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            };
            //执行线程
            exec.execute(runnable);
        }
        //退出线程池
        exec.shutdown();
    }
}

结果:

Accessing: 0,time:18-06-21 17:58:41

Accessing: 1,time:18-06-21 17:58:41

Accessing: 4,time:18-06-21 17:58:41

Accessing: 8,time:18-06-21 17:58:42

Accessing: 5,time:18-06-21 17:58:42

Accessing: 3,time:18-06-21 17:58:43

Accessing: 7,time:18-06-21 17:58:43

Accessing: 6,time:18-06-21 17:58:44

测试代码3

package com.xx;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ConcurrentMap;

/**
 * 单机限速demo
 *
 * @author hanliwei
 * @create 2018-06-21 18:51
 */
public class Test6 {

    //key-value (service,Qps) ,接口服务的限制速率
    private static final ConcurrentMap<String,Double> resourceMap = Maps.newConcurrentMap();
    //userkey-service,limiter ,限制用户对接口的访问速率
    private static final ConcurrentMap<String,RateLimiter> userResourceLimiterMap = Maps.newConcurrentMap();

    static {
        //init ,初始化方法A的Qps为50
        resourceMap.putIfAbsent("methodA",10.0);
    }

    public static void updateResourceQps(String resource,double qps) {
        resourceMap.put(resource,qps);
    }

    public static void removeResource(String resource) {
        resourceMap.remove(resource);
    }

    public static int enter(String resource, String userKey) {
        long t1 = System.currentTimeMillis();
        Double qps = resourceMap.get(resource);

        //不限流
        if (qps == null || qps.doubleValue() == 0.0) {
            return 0;
        }

        String keySer = resource + userKey;
        RateLimiter rateLimiter = userResourceLimiterMap.get(keySer);
        //if null , new limit
        if (rateLimiter == null) {
            rateLimiter = RateLimiter.create(qps);

            RateLimiter putByOtherThread = userResourceLimiterMap.putIfAbsent(keySer,rateLimiter);
            if (putByOtherThread != null) {
                rateLimiter = putByOtherThread;
            }
            rateLimiter.setRate(qps);
        }

        //非阻塞
        if (!rateLimiter.tryAcquire()) {
            //限速中,提示用户
            System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; "
                    + resource + " visited too frequently by key:" + userKey);
            return 99;
        } else {
            //正常访问
            System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; " );
            return 0;
        }

    }

    public static void main(String[] args) throws InterruptedException {
//        testA();

        Test6.updateResourceQps("methodB",5.0);
        testB();

    }
    private static void testA() throws InterruptedException {
        int i = 0;
        while (true) {
            i++;
            long t2 = System.currentTimeMillis();
            System.out.println(" begin:" + t2 + " , hanchao:" + i);

            int  result = Test6.enter("methodA","hanchao");
            if (result == 99) {
                i = 0;
                Thread.sleep(1000);
            }
        }
    }

    private static void testB() throws InterruptedException {
        //测试other
        int y = 0;
        while (true) {
            y++;
            long t3 = System.currentTimeMillis();
            System.out.println(" begin:" + t3 + " , tom:" + y);

            int  result2 = Test6.enter("methodB","tom");
            if (result2 == 99) {
                y = 0;
                Thread.sleep(1000);
            }
        }
    }

}

测试结果:

每日一博 | 使用 Guava 的 RateLimiter 做限流 每日一博 | 使用 Guava 的 RateLimiter 做限流

四、方法摘要

修饰符和类型 方法和描述
double acquire()
从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求
double acquire(int permits)
从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求
static RateLimiter create(double permitsPerSecond)
根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)
double getRate()
返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数
void setRate(double permitsPerSecond)
更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。
String toString()
返回对象的字符表现形式
boolean tryAcquire()
从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits)
从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)
boolean tryAcquire(long timeout, TimeUnit unit)
从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)

五、Semaphore

public static void main(String[] args) {
        //线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        //只能5个线程同时访问
        final Semaphore semp = new Semaphore(3);

        for (int i = 0; i < 10; i++) {
            final int no = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        //获取许可
                        semp.acquire();
                        System.out.println("Accessing: " + no
                                + " --- " + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date()));
                        //睡5s
                        Thread.sleep(5000);
                        //访问完后,释放许可,如果注释掉下面的语句,则控制台只能打印3条记录,之后线程一直阻塞
                        semp.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            };
            //执行线程
            exec.execute(runnable);
        }
        //退出线程池
        exec.shutdown();
    }

结果:

Accessing: 3 --- 18-06-21 18:21:07

Accessing: 1 --- 18-06-21 18:21:07

Accessing: 2 --- 18-06-21 18:21:07

Accessing: 0 --- 18-06-21 18:21:12

Accessing: 4 --- 18-06-21 18:21:12

Accessing: 8 --- 18-06-21 18:21:17

Accessing: 5 --- 18-06-21 18:21:17

Accessing: 7 --- 18-06-21 18:21:22

六、Semaphore和RateLimiter的区别

Semaphore:信号量,直译很难理解。作用是限定只有抢到信号的线程才能执行,其他的都得等待!你可以设置N个信号,这样最多可以有N个线程同时执行。 注意,其他的线程也在,只是挂起了

RateLimiter:这是guava的,直译是速率限制器。其作用是 限制一秒内只能有N个线程执行,超过了就只能等待下一秒。注意,N是double类型。

========================================

Semaphore:从线程个数限流

RateLimiter:从速率限流  目前常见的算法是 漏桶算法和令牌算法

令牌桶算法:相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌

漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据

应用场景:

漏桶算法:必须读写分流的情况下,限制读取的速度

令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000

实现的方法都是一样。RateLimiter来实现

七、参考链接

Guava官方文档-RateLimiter类==☆

【Guava】使用Guava的RateLimiter做限流

Guava RateLimiter源码解析

参考文章-便于理解

Guava Rate Limiter实现分析-有时间了可以读一下

其他基础知识参考(我的其他的博客):

多线程之Callable的简单学习

springmvc初始化数据

原文  https://my.oschina.net/hanchao/blog/1833612
正文到此结束
Loading...