Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

但使书种多,会有岁稔时。这篇文章主要讲述Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实相关的知识,希望能为你提供帮助。
并发编程的三剑客

  • 缓存 缓存的目的是提升系统访问速度和增大系统处理容量。
  • 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开。
  • 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。
限流的思想
溢出思想:速度控制限流的算法
计数限流算法固定窗口计数
Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

实现原理
  • 固定窗口计数法思想比较简单,只需要确定两个参数:计数周期T及周期内最大访问(调用)数N。请求到达时使用以下流程进行操作:
  • 固定窗口计数实现简单,并且只需要记录上一个周期起始时间与周期内访问总数,几乎不消耗额外的存储空间。
算法缺陷令牌桶算法
Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

  • 令牌桶算法则是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。
  • 桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。
  • 当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。
  • 如果获取不到,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。
优点
Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

漏桶算法
  • 漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
    Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

    文章图片

  • 如上图就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。
  • 当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。
令牌桶和漏桶对比
  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
  • 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;
  • 漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;
信号量的应用
  • 操作系统的信号量是个很重要的概念,Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
  • 信号量的本质是控制某个资源可被同时访问的个数,在一定程度上可以控制某资源的访问频率,但不能精确控制。
限流的思想
Guava中的RateLimiter可以限制单进程中某个方法的速率,本文主要介绍如何使用,实现原理请参考文档:推荐:超详细的Guava RateLimiter限流原理解析和推荐:RateLimiter 源码分析(Guava 和 Sentinel 实现)。
Guava RateLimiter
原理:Guava RateLimiter基于令牌桶算法,
  • RateLimiter系统限制QPS是多少,那么RateLimiter将以这个速度往桶里面放入令牌。
  • 然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。
Guava RateLimiter 控制操作 Guava RateLimiter 限速手段
  • RateLimiter从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。
  • RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。
Maven配置
< dependency> < groupId> com.google.guava< /groupId> < artifactId> guava< /artifactId> < version> 31.0-jre< /version> < /dependency>

java简单案例
public class RateLimiterService { // 每秒发出5个令牌 RateLimiter rateLimiter = RateLimiter.create(5); /** * 尝试获取令牌 */ public boolean tryAcquire() { return rateLimiter.tryAcquire(); } publicvoid acquire() { rateLimiter.acquire(); } public static void main(String[] args){ if (accessLimitService.tryAcquire()) { log.info("start"); // 模拟业务执行500毫秒 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "access success [" + LocalDateTime.now() + "]"; } else { //log.warn("限流"); return "access limit [" + LocalDateTime.now() + "]"; } } }public void testMethod(){ ExecutorService pool = Executors.newFixedThreadPool(10); RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second" IntStream.range(0, 10).forEach(i -> pool.submit(() -> { if (rateLimiter.tryAcquire()) { try { log.info("start"); Thread.sleep(500); } catch (InterruptedException e) { } } else { log.warn("限流"); } })); public void testMethod2(){ ExecutorService pool = Executors.newFixedThreadPool(10); RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second" IntStream.range(0, 10).forEach(i -> pool.submit(() -> { rateLimiter.acquire(); log.info("start"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } })); pool.shutdown(); } } }

public class GuavaRateLimiter { public static ConcurrentHashMap< String, RateLimiter> resourceRateLimiter = new ConcurrentHashMap< String, RateLimiter> (); //初始化限流工具RateLimiter static { createResourceRateLimiter("order", 50); } public static void createResourceRateLimiter(String resource, double qps) { if (resourceRateLimiter.contains(resource)) { resourceRateLimiter.get(resource).setRate(qps); } else { //创建限流工具,每秒发出50个令牌指令 RateLimiter rateLimiter = RateLimiter.create(qps); resourceRateLimiter.putIfAbsent(resource, rateLimiter); } } public static void main(String[] args) { for (int i = 0; i < 5000; i++) { new Thread(new Runnable() { @Override public void run() { //如果获得令牌指令,则执行业务逻辑 if (resourceRateLimiter.get("order").tryAcquire(10, TimeUnit.MICROSECONDS)) { System.out.println("执行业务逻辑"); } else { System.out.println("限流"); } } }).start(); } } }

方法摘要
限流及创建方法
Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

create方法
public static RateLimiter create(double permitsPerSecond)

返回的RateLimiter
  • 确保了在平均情况下,每秒发布的许可数不会超过permitsPerSecond,每秒钟会持续发送请求。
  • 当传入请求速率超过permitsPerSecond,速率限制器会每秒释放一个许可(1.0 / permitsPerSecond 这里是指设定了permitsPerSecond为1.0) 。
  • 当速率限制器闲置时,允许许可数暴增到permitsPerSecond,随后的请求会被平滑地限制在稳定速率permitsPerSecond中。
参数:
  • permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
  • 抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)

参数:
  • permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
  • warmupPeriod – 在这段时间内RateLimiter会增加它的速率,在抵达它的稳定速率或者最大速率之前
  • unit – 参数warmupPeriod 的时间单位
抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0
限流及阻塞方法 acquire
Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

public double acquire()

返回:acquire
public double acquire(int permits)

参数:
  • permits – 需要获取的许可数
返回:
  • 执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0
抛出:
  • IllegalArgumentException – 如果请求的许可数为负数或者为0
tryAcquire
public boolean tryAcquire(long timeout,TimeUnit unit)

参数:
  • timeout – 等待许可的最大时间,负数以0处理
  • unit – 参数timeout 的时间单位
返回:
  • true表示获取到许可,反之则是false
抛出:
  • IllegalArgumentException – 如果请求的许可数为负数或者为0
tryAcquire
public boolean tryAcquire(int permits,long timeout,TimeUnit unit)

参数:
  • permits – 需要获取的许可数
  • timeout – 等待许可数的最大时间,负数以0处理
  • unit – 参数timeout 的时间单位
返回:
  • true表示获取到许可,反之则是false
限流及状态设置【Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实】
Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

文章图片

public final void setRate(double permitsPerSecond)

参数:
  • permitsPerSecond – RateLimiter的新的稳定速率
抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public final double getRate()


    推荐阅读