【redis进阶(4)】限流策略

限流简述 限流算法在分布式领域是一个经常被提起的话题,当系统的处理能力有限时,如何阻止计划外的请求继续对系统施压,这是一个需要重视的问题。
除了控制流量,限流还有一个应用目的是用于控制用户行为,避免垃圾请求。比如在 UGC 社区,用户的发帖、回复、点赞等行为都要严格受控,一般要严格限定某行为在规定时间内允许的次数,超过了次数那就是非法行为。对非法行为,业务必须规定适当的惩处策略。

# 指定用户 user_id 的某个行为 action_key 在特定的时间内 period 只允许发生一定的次数 max_count def is_action_allowed(user_id, action_key, period, max_count): return True # 调用这个接口 , 一分钟内只允许最多回复 5 个帖子 can_reply = is_action_allowed("110", "reply", 60, 5) if can_reply: do_reply() else: raise ActionThresholdOverflow()

计数限流 pipe 与 zset
【【redis进阶(4)】限流策略】【redis进阶(4)】限流策略
文章图片

如图所示,用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个 key 保存下来。同一个用户同一种行为用一个 zset 记录。
为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。
通过统计滑动窗口内的行为数量与阈值 max\_count 进行比较就可以得出当前的行为是否允许。用代码表示如下:
function isActionAllowed($userId, $action, $period, $maxCount) { $redis = new Redis(); $redis->connect('127.0.0.1', 6379); $key = sprintf('hist:%s:%s', $userId, $action); $now = msectime(); # 毫秒时间戳$pipe=$redis->multi(Redis::PIPELINE); //使用管道提升性能 $pipe->zadd($key, $now, $now); //value 和 score 都使用毫秒时间戳 $pipe->zremrangebyscore($key, 0, $now - $period); //移除时间窗口之前的行为记录,剩下的都是时间窗口内的 $pipe->zcard($key); //获取窗口内的行为数量 $pipe->expire($key, $period + 1); //多加一秒过期时间 $replies = $pipe->exec(); return $replies[2] <= $maxCount; } for ($i=0; $i<20; $i++){ var_dump(isActionAllowed("110", "reply", 60*1000, 5)); //执行可以发现只有前5次是通过的 }//返回当前的毫秒时间戳 function msectime() { list($msec, $sec) = explode(' ', microtime()); $msectime = (float)sprintf('%.0f', (floatval($msec) + floatval($sec)) * 1000); return $msectime; }

使用lua脚本
10秒内只能访问3次。 后续该脚本可以在nginx或者程序运行脚本中直接使用,判断返回是否为0,就0就不让其继续访问。
-- redis-cli --eval ratelimiting.lua rate.limitingl:127.0.0.1 , 10 3-- rate.limitingl + 1 local times = redis.call('incr',KEYS[1])-- 第一次访问的时候加上过期时间10秒(10秒过后从新计数) if times == 1 then redis.call('expire',KEYS[1], ARGV[1]) end-- 注意,从redis进来的默认为字符串,lua同种数据类型只能和同种数据类型比较 if times > tonumber(ARGV[2]) then return 0 end return 1

漏桶限流 漏斗的容量是有限的,如果将漏嘴堵住,然后一直往里面灌水,它就会变满,直至再也装不进去。如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。
所以,漏斗的剩余空间就代表着当前行为可以持续进行的数量,漏嘴的流水速率代表着系统允许该行为的最大频率。下面我们使用代码来描述单机漏斗算法。
capacity = $capacity; //漏斗容量 $this->leakingRate = $leakingRate; //漏斗流水速率 $this->leftQuote = $capacity; //漏斗剩余空间 $this->leakingTs = time(); //上一次漏水时间 }/** * 漏斗算法的核心 * 其在每次灌水前都会被调用以触发漏水,给漏斗腾出空间来。 * 能腾出多少空间取决于过去了多久以及流水的速率。 * Funnel 对象占据的空间大小不再和行为的频率成正比,它的空间占用是一个常量。 */ public function makeSpace() { // 500毫秒 usleep(1000*1000); $now = time(); //距离上一次漏水过去了多久 $deltaTs = $now-$this->leakingTs; //可腾出的空间 = 上次时间 * 流水速率 $deltaQuota = $deltaTs * $this->leakingRate; var_dump('deltaQuota ' . $deltaQuota); if($deltaQuota < 1) { return; } $this->leftQuote += $deltaQuota; //增加剩余空间 $this->leakingTs = time(); //记录漏水时间 if($this->leftQuote > $this->capacity){ $this->leftQuote - $this->capacity; } }/** * @param $quota 漏桶的空间 * @return bool */ public function watering($quota=1) { $this->makeSpace(); //漏水操作 if($this->leftQuote >= $quota) { $this->leftQuote -= $quota; return true; } return false; } }function isActionAllowed($userId, $action, $capacity, $leakingRate) { static $funnels = []; $key = sprintf("%s:%s", $userId, $action); $funnel = $funnels[$key] ?? ''; if (!$funnel) { $funnel= new Funnel($capacity, $leakingRate); $funnels[$key] = $funnel; } return $funnel->watering(); }for ($i=0; $i<20; $i++){ var_dump(isActionAllowed("110", "reply", 5, 0.5)); }

使用redis漏桶插件
Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。有了这个模块,限流问题就非常简单了。
该模块只有1条指令cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这个指令具体该如何使用。
> cl.throttle key 15(漏斗容量) 3060(30 operations / 60 seconds 这是漏水速率) 1(need 1 quota,可选参数默认是1)

频率为每 60s 最多 30 次(漏水速率),漏斗的初始容量为 15,也就是说一开始可以连续回复 15 个帖子,然后才开始受漏水速率的影响。我们看到这个指令中漏水速率变成了 2 个参数,替代了之前的单个浮点数。用两个参数相除的结果来表达漏水速率相对单个浮点数要更加直观一些。
> cl.throttle key:reply 15 30 60 1) (integer) 0# 0 表示允许,1表示拒绝 2) (integer) 15# 漏斗容量capacity 3) (integer) 14# 漏斗剩余空间left_quota 4) (integer) -1# 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒) 5) (integer) 2# 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

令牌桶限流 令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.
令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.
具体实现可参考php 基于redis使用令牌桶算法实现流量控制
_config = $config; $this->_queue = $queue; $this->_max = $max; $this->_redis = $this->connect(); }/** * 加入令牌 * @param Int $num 加入的令牌数量 * @return Int 加入的数量 */ public function add($num = 0) { // 当前剩余令牌数 $curnum = intval($this->_redis->lSize($this->_queue)); // 最大令牌数 $maxnum = intval($this->_max); // 计算最大可加入的令牌数量,不能超过最大令牌数 $num = $maxnum >= $curnum + $num ? $num : $maxnum - $curnum; // 加入令牌 if ($num > 0) { $token = array_fill(0 , $num , 1); $this->_redis->lPush($this->_queue , ...$token); return $num; } return 0; }/** * 获取令牌 * @return Boolean */ public function get() { return $this->_redis->rPop($this->_queue) ? true : false; }/** * 重设令牌桶,填满令牌 */ public function reset() { $this->_redis->delete($this->_queue); $this->add($this->_max); }private function connect() { try { $redis = new Redis(); $redis->connect($this->_config['host'] , $this->_config['port'] , $this->_config['timeout'] , $this->_config['reserved'] , $this->_config['retry_interval']); if (empty($this->_config['auth'])) { $redis->auth($this->_config['auth']); } $redis->select($this->_config['index']); } catch (\RedisException $e) { throw new Exception($e->getMessage()); return false; } return $redis; } }$config = [ 'host'=> '172.28.3.157' , 'port'=> 6379 , 'index'=> 0 , 'auth'=> '' , 'timeout'=> 1 , 'reserved'=> NULL , 'retry_interval' => 100 , ]; // 令牌桶容器 $queue = 'mycontainer'; // 最大令牌数 $max = 5; // 创建TrafficShaper对象 $oTrafficShaper = new TrafficShaper($config , $queue , $max); // 重设令牌桶,填满令牌 $oTrafficShaper->reset(); // 循环获取令牌,令牌桶内只有5个令牌,因此最后3次获取失败 for ($i = 0; $i < 8; $i ++) { var_dump($oTrafficShaper->get()); } // 加入10个令牌,最大令牌为5,因此只能加入5个 // 这里可以使用异步脚本来添加令牌。 $add_num = $oTrafficShaper->add(10); var_dump($add_num); // 循环获取令牌,令牌桶内只有5个令牌,因此最后1次获取失败 for ($i = 0; $i < 6; $i ++) { var_dump($oTrafficShaper->get()); }

[info] 分析下来使用lua脚本的计数器,最简单方便。

    推荐阅读