限流一般是指在一个时间窗口内对某些操作请求的数量进行限制。比如一个论坛限制用户每秒钟只能发一个帖子,每秒钟只能回复 5 个帖子。
限流可以保证系统的稳定,限制恶意请求,防止因为流量暴增导致系统瘫痪宕机。
常用的限流算法有:滑动窗口限流、漏斗限流。
得益于 Redis 的数据结构特点,Redis 实现滑动窗口限流和漏斗限流的非常的便捷。
1 滑动窗口限流
滑动窗口限流 就是记录一个滑动的时间窗口内的操作次数,操作次数超过阈值则进行限流。
在 Redis 中,可以用 zset(有序集合)数据结构来实现滑动窗口限流:
用唯一的 id 作为 zset 的 key(可以是 user_id+action_key,标识某个用户的某个操作),score 和 value 可以设置为当前操作的时间戳。
每次新的操作请求进来时,先获取当前时间戳 nowTime,以 nowTime 为 score 和 value,向 zset 中新增一个元素。然后,整理 zset,删除最近 period 范围之外的元素。最后统计 zset 中的元素个数,也就是 period 时间范围内的操作次数。如果操作次数 <= 最大限制次数,说明本次操作是被允许的。否则不被允许。
代码实现如下:
/**
* 基于 Redis 的滑动窗口限流实现
*
* @param userId 用户 id
* @param actionKey 操作
* @param period 时间范围(单位:秒)
* @param maxCount 允许操作的最大次数
* @return 是否允许本次操作
*/
public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) throws IOException {
// 生成唯一的 key(唯一标识某个用户的某个操作)
String key = "hist:" + userId + ":" + actionKey;
// 获取当前时间戳(毫秒)
long nowTime = System.currentTimeMillis();
// 使用管道
Pipeline pipe = jedis.pipelined();
pipe.multi();
// 添加元素到 zset 中,key 的 score 设置为当前时间戳
pipe.zadd(key, nowTime, "" + nowTime);
// 整理 zset,删除时间窗口外的数据(删除从 0 到离当前时间 period 范围内的数据,只保留最近 period 范围内的数据)
pipe.zremrangeByScore(key, 0, nowTime - period * 1000L);
// 返回集合中元素个数
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period + 1);
pipe.exec();
pipe.close();
// 判断是否超过了限制的最大操作次数
return count.get() <= maxCount;
}
其实,zset 中的 value 没有什么意义,保存当前时间戳只是为了保证每次操作都是唯一的能够添加到 zset 中的新元素。
2 漏斗限流
漏斗限流 就是利用漏斗的原理,用一个漏斗来记录请求,一边向漏斗中添加请求,一边将请求漏出去。
漏斗限流的基本思路为:
漏斗有一定的容量和恒定的漏水速率。
将请求比作灌入漏斗的水,漏水比作处理请求。如果请求的速率大于漏水的速率,则漏斗会在某一时刻被灌满,则进行限流;如果请求的速率小于漏水的速率,则漏斗永远不会灌满,因此不限流。
代码实现如下:
public class FunnelRateLimiter {
// 用一个 map 保存不同操作对应的漏斗
private final Map<String, Funnel> funnels = new HashMap<>();
/**
* 计算某个用户的某个动作是否被允许
*
* @param userId 用户 id
* @param actionKey 操作 key
* @param capacity 漏斗容量
* @param leakingRate 漏水速率
* @return 是否被允许
*/
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
// weiyikey,标识某个用户的某个操作
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
// 如果对应的漏斗不存在,初始化漏斗
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
// 向漏斗中灌水,单个请求占据容量为 1
return funnel.watering(1);
}
/**
* 定义一个类,表示漏斗
*/
static class Funnel {
// 漏斗容量
private int capacity;
// 流出速率
private float leakingRate;
// 漏斗剩余空间
private int leftQuota;
// 上一次执行漏水的时间
private long leakingTs;
/**
* 构造函数
*
* @param capacity 容量
* @param leakingRate 漏水速率
*/
public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;
this.leakingRate = leakingRate;
// 初始化时,剩余空间等于漏斗容量
this.leftQuota = capacity;
// 上一次漏水时间指定为初始化时间
this.leakingTs = System.currentTimeMillis();
}
/**
* 腾出漏斗空间(每一次灌水之前都要触发一次)
*/
public void makeSpace() {
long nowTs = System.currentTimeMillis();
// 计算上一次漏水时间和当前时间的时间差
long deltaTs = nowTs - leakingTs;
// 计算应该腾出的空间
int deltaQuota = (int) (deltaTs * leakingRate);
// 间隔时间太长,整数数字过大溢出
if (deltaQuota < 0) {
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
// 腾出的空间太小,最小单位是 1
if (deltaQuota < 1) {
return;
}
// 更新剩余空间
this.leftQuota += deltaQuota;
// 更新上一次漏水时间为当前时间
this.leakingTs = nowTs;
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}
/**
* 向漏斗中灌水
*
* @param quota 固定额度
* @return 是否灌水成功
*/
public boolean watering(int quota) {
// 腾出空间
makeSpace();
// 如果剩余空间大于要灌入的空间,则更新剩余空间,灌水成功
if (this.leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
// 否则灌水失败
return false;
}
}
}
那么在分布式系统中,如何利用 Redis 实现漏斗限流呢?
可以利用 Redis 中的 hash 字典存储漏斗 Funnel 的结构。每次请求(灌水)的时候,将 Funnel 结构从 Redis 中取出来,然后计算更新后的 Funnel 信息,最后更新到 Redis 中。
但是,整个过程不是原子性的!
2.1 redis-cell 模块
Redis 4.0 提供了一个叫做redis-cell的限流模块。该模块使用了漏斗限流算法,并且提供了原子性的限流指令。
要想体验,可以使用 docker 镜像:
hsz1273327/redis-cell
该模块只有一条指令:
cl.throttle [key 名称] [漏斗容量] [指定时间允许的次数] [指定时间,单位:秒] [可选参数,默认为 1]
一共支持 5 个参数,最后一个参数可选,默认为 1。
举例:
cl.throttle laowang:reply 15 30 60 1
意思是:允许 laowang 用户 replay 的频率为 60 秒内最多 30 次,漏斗的初始容量为 15,也就是一开始可以连续 reply 15 次,然后开始受到 reply 的速率影响。
返回值说明:
127.0.0.1:6379> cl.throttle laowang:reply 15 30 60 1
1) (integer) 0 # 0 表示允许,1 表示拒绝
2) (integer) 16 # 漏斗容量 capacity
3) (integer) 15 # 漏斗剩余空间 left_quota
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗完全空出来 (left_quota == capacity,单位秒)