但使书种多,会有岁稔时。这篇文章主要讲述Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实相关的知识,希望能为你提供帮助。
并发编程的三剑客
- 缓存 缓存的目的是提升系统访问速度和增大系统处理容量。
- 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开。
- 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。
溢出思想:速度控制限流的算法
计数限流算法固定窗口计数
文章图片
实现原理
- 固定窗口计数法思想比较简单,只需要确定两个参数:计数周期T及周期内最大访问(调用)数N。请求到达时使用以下流程进行操作:
- 固定窗口计数实现简单,并且只需要记录上一个周期起始时间与周期内访问总数,几乎不消耗额外的存储空间。
文章图片
- 令牌桶算法则是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。
- 桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。
- 当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。
- 如果获取不到,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。
文章图片
漏桶算法
- 漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
文章图片
- 如上图就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。
- 当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。
- 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
- 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;
- 漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
- 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;
- 操作系统的信号量是个很重要的概念,Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
- 信号量的本质是控制某个资源可被同时访问的个数,在一定程度上可以控制某资源的访问频率,但不能精确控制。
Guava中的RateLimiter可以限制单进程中某个方法的速率,本文主要介绍如何使用,实现原理请参考文档:推荐:超详细的Guava RateLimiter限流原理解析和推荐:RateLimiter 源码分析(Guava 和 Sentinel 实现)。
Guava RateLimiter
原理:Guava RateLimiter基于令牌桶算法,
- RateLimiter系统限制QPS是多少,那么RateLimiter将以这个速度往桶里面放入令牌。
- 然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。
- RateLimiter从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。
- RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。
<
dependency>
<
groupId>
com.google.guava<
/groupId>
<
artifactId>
guava<
/artifactId>
<
version>
31.0-jre<
/version>
<
/dependency>
java简单案例
public class RateLimiterService {
// 每秒发出5个令牌
RateLimiter rateLimiter = RateLimiter.create(5);
/**
* 尝试获取令牌
*/
public boolean tryAcquire() {
return rateLimiter.tryAcquire();
}
publicvoid acquire() {
rateLimiter.acquire();
}
public static void main(String[] args){
if (accessLimitService.tryAcquire()) {
log.info("start");
// 模拟业务执行500毫秒
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "access success [" + LocalDateTime.now() + "]";
} else {
//log.warn("限流");
return "access limit [" + LocalDateTime.now() + "]";
}
}
}public void testMethod(){
ExecutorService pool = Executors.newFixedThreadPool(10);
RateLimiter rateLimiter = RateLimiter.create(5);
// rate is "5 permits per second"
IntStream.range(0, 10).forEach(i ->
pool.submit(() ->
{
if (rateLimiter.tryAcquire()) {
try {
log.info("start");
Thread.sleep(500);
} catch (InterruptedException e) {
}
} else {
log.warn("限流");
}
}));
public void testMethod2(){
ExecutorService pool = Executors.newFixedThreadPool(10);
RateLimiter rateLimiter = RateLimiter.create(5);
// rate is "5 permits per second"
IntStream.range(0, 10).forEach(i ->
pool.submit(() ->
{
rateLimiter.acquire();
log.info("start");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
pool.shutdown();
}
}
}
public class GuavaRateLimiter {
public static ConcurrentHashMap<
String, RateLimiter>
resourceRateLimiter = new ConcurrentHashMap<
String, RateLimiter>
();
//初始化限流工具RateLimiter
static {
createResourceRateLimiter("order", 50);
}
public static void createResourceRateLimiter(String resource, double qps) {
if (resourceRateLimiter.contains(resource)) {
resourceRateLimiter.get(resource).setRate(qps);
} else {
//创建限流工具,每秒发出50个令牌指令
RateLimiter rateLimiter = RateLimiter.create(qps);
resourceRateLimiter.putIfAbsent(resource, rateLimiter);
}
}
public static void main(String[] args) {
for (int i = 0;
i <
5000;
i++) {
new Thread(new Runnable() {
@Override
public void run() {
//如果获得令牌指令,则执行业务逻辑
if (resourceRateLimiter.get("order").tryAcquire(10, TimeUnit.MICROSECONDS)) {
System.out.println("执行业务逻辑");
} else {
System.out.println("限流");
}
}
}).start();
}
}
}
方法摘要
限流及创建方法
文章图片
文章图片
create方法
public static RateLimiter create(double permitsPerSecond)
返回的RateLimiter
- 确保了在平均情况下,每秒发布的许可数不会超过permitsPerSecond,每秒钟会持续发送请求。
- 当传入请求速率超过permitsPerSecond,速率限制器会每秒释放一个许可(1.0 / permitsPerSecond 这里是指设定了permitsPerSecond为1.0) 。
- 当速率限制器闲置时,允许许可数暴增到permitsPerSecond,随后的请求会被平滑地限制在稳定速率permitsPerSecond中。
- permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
- 抛出:
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)
参数:
- permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
- warmupPeriod – 在这段时间内RateLimiter会增加它的速率,在抵达它的稳定速率或者最大速率之前
- unit – 参数warmupPeriod 的时间单位
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
文章图片
文章图片
文章图片
public double acquire()
返回:acquire
public double acquire(int permits)
参数:
- permits – 需要获取的许可数
- 执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0
- IllegalArgumentException – 如果请求的许可数为负数或者为0
public boolean tryAcquire(long timeout,TimeUnit unit)
参数:
- timeout – 等待许可的最大时间,负数以0处理
- unit – 参数timeout 的时间单位
- true表示获取到许可,反之则是false
- IllegalArgumentException – 如果请求的许可数为负数或者为0
public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
参数:
- permits – 需要获取的许可数
- timeout – 等待许可数的最大时间,负数以0处理
- unit – 参数timeout 的时间单位
- true表示获取到许可,反之则是false
文章图片
文章图片
public final void setRate(double permitsPerSecond)
参数:
- permitsPerSecond – RateLimiter的新的稳定速率
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public final double getRate()
推荐阅读
- 时序数据可视化的降采样算法
- k8s____pod调度
- HashMapHashTableTreeMap的底层源码分析和对比
- 这21个网络工程师必备工具,都是老杨的私人珍藏。
- samba服务器安装和配置
- Python数据分析基础-1二元操作符(又全又清晰!)
- 相关行知03--项目类型
- linux之fping命令
- MySQL之Update生命周期最详细的解剖