Redis限流

限流一般是指在一个时间窗口内对某些操作请求的数量进行限制。比如一个论坛限制用户每秒钟只能发一个帖子,每秒钟只能回复 5 个帖子。

限流可以保证系统的稳定,限制恶意请求,防止因为流量暴增导致系统瘫痪宕机。

常用的限流算法有:滑动窗口限流、漏斗限流

得益于 Redis 的数据结构特点,Redis 实现滑动窗口限流和漏斗限流的非常的便捷。

1 滑动窗口限流

滑动窗口限流 就是记录一个滑动的时间窗口内的操作次数,操作次数超过阈值则进行限流。

在 Redis 中,可以用 zset(有序集合)数据结构来实现滑动窗口限流:

用唯一的 id 作为 zset 的 key(可以是 user_id+action_key,标识某个用户的某个操作),score 和 value 可以设置为当前操作的时间戳。

每次新的操作请求进来时,先获取当前时间戳 nowTime,以 nowTime 为 score 和 value,向 zset 中新增一个元素。然后,整理 zset,删除最近 period 范围之外的元素。最后统计 zset 中的元素个数,也就是 period 时间范围内的操作次数。如果操作次数 <= 最大限制次数,说明本次操作是被允许的。否则不被允许。

代码实现如下:

    /**
     * 基于 Redis 的滑动窗口限流实现
     *
     * @param userId    用户 id
     * @param actionKey 操作
     * @param period    时间范围(单位:秒)
     * @param maxCount  允许操作的最大次数
     * @return 是否允许本次操作
     */
    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) throws IOException {
        // 生成唯一的 key(唯一标识某个用户的某个操作)
        String key = "hist:" + userId + ":" + actionKey;
        // 获取当前时间戳(毫秒)
        long nowTime = System.currentTimeMillis();
        // 使用管道
        Pipeline pipe = jedis.pipelined();
        pipe.multi();
        // 添加元素到 zset 中,key 的 score 设置为当前时间戳
        pipe.zadd(key, nowTime, "" + nowTime);
        // 整理 zset,删除时间窗口外的数据(删除从 0 到离当前时间 period 范围内的数据,只保留最近 period 范围内的数据)
        pipe.zremrangeByScore(key, 0, nowTime - period * 1000L);
        // 返回集合中元素个数
        Response<Long> count = pipe.zcard(key);
        pipe.expire(key, period + 1);
        pipe.exec();
        pipe.close();
        // 判断是否超过了限制的最大操作次数
        return count.get() <= maxCount;
    }

其实,zset 中的 value 没有什么意义,保存当前时间戳只是为了保证每次操作都是唯一的能够添加到 zset 中的新元素。

2 漏斗限流

漏斗限流 就是利用漏斗的原理,用一个漏斗来记录请求,一边向漏斗中添加请求,一边将请求漏出去。

漏斗限流的基本思路为:

漏斗有一定的容量和恒定的漏水速率。

将请求比作灌入漏斗的水,漏水比作处理请求。如果请求的速率大于漏水的速率,则漏斗会在某一时刻被灌满,则进行限流;如果请求的速率小于漏水的速率,则漏斗永远不会灌满,因此不限流。

代码实现如下:

public class FunnelRateLimiter {

    // 用一个 map 保存不同操作对应的漏斗
    private final Map<String, Funnel> funnels = new HashMap<>();

    /**
     * 计算某个用户的某个动作是否被允许
     *
     * @param userId      用户 id
     * @param actionKey   操作 key
     * @param capacity    漏斗容量
     * @param leakingRate 漏水速率
     * @return 是否被允许
     */
    public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
        // weiyikey,标识某个用户的某个操作
        String key = String.format("%s:%s", userId, actionKey);
        Funnel funnel = funnels.get(key);
        // 如果对应的漏斗不存在,初始化漏斗
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        // 向漏斗中灌水,单个请求占据容量为 1
        return funnel.watering(1);
    }

    /**
     * 定义一个类,表示漏斗
     */
    static class Funnel {
        // 漏斗容量
        private int capacity;
        // 流出速率
        private float leakingRate;
        // 漏斗剩余空间
        private int leftQuota;
        // 上一次执行漏水的时间
        private long leakingTs;

        /**
         * 构造函数
         *
         * @param capacity    容量
         * @param leakingRate 漏水速率
         */
        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            // 初始化时,剩余空间等于漏斗容量
            this.leftQuota = capacity;
            // 上一次漏水时间指定为初始化时间
            this.leakingTs = System.currentTimeMillis();
        }

        /**
         * 腾出漏斗空间(每一次灌水之前都要触发一次)
         */
        public void makeSpace() {
            long nowTs = System.currentTimeMillis();
            // 计算上一次漏水时间和当前时间的时间差
            long deltaTs = nowTs - leakingTs;
            // 计算应该腾出的空间
            int deltaQuota = (int) (deltaTs * leakingRate);
            // 间隔时间太长,整数数字过大溢出
            if (deltaQuota < 0) {
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
                return;
            }
            // 腾出的空间太小,最小单位是 1
            if (deltaQuota < 1) {
                return;
            }
            // 更新剩余空间
            this.leftQuota += deltaQuota;
            // 更新上一次漏水时间为当前时间
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) {
                this.leftQuota = this.capacity;
            }
        }

        /**
         * 向漏斗中灌水
         *
         * @param quota 固定额度
         * @return 是否灌水成功
         */
        public boolean watering(int quota) {
            // 腾出空间
            makeSpace();
            // 如果剩余空间大于要灌入的空间,则更新剩余空间,灌水成功
            if (this.leftQuota >= quota) {
                this.leftQuota -= quota;
                return true;
            }
            // 否则灌水失败
            return false;
        }
    }
}

那么在分布式系统中,如何利用 Redis 实现漏斗限流呢?

可以利用 Redis 中的 hash 字典存储漏斗 Funnel 的结构。每次请求(灌水)的时候,将 Funnel 结构从 Redis 中取出来,然后计算更新后的 Funnel 信息,最后更新到 Redis 中。

但是,整个过程不是原子性的!

2.1 redis-cell 模块

Redis 4.0 提供了一个叫做redis-cell的限流模块。该模块使用了漏斗限流算法,并且提供了原子性的限流指令。

要想体验,可以使用 docker 镜像:hsz1273327/redis-cell

该模块只有一条指令:

cl.throttle [key 名称] [漏斗容量] [指定时间允许的次数] [指定时间,单位:秒] [可选参数,默认为 1]

一共支持 5 个参数,最后一个参数可选,默认为 1。

举例:

cl.throttle laowang:reply 15 30 60 1

意思是:允许 laowang 用户 replay 的频率为 60 秒内最多 30 次,漏斗的初始容量为 15,也就是一开始可以连续 reply 15 次,然后开始受到 reply 的速率影响。

返回值说明:

127.0.0.1:6379> cl.throttle laowang:reply 15 30 60 1
1) (integer) 0    # 0 表示允许,1 表示拒绝
2) (integer) 16   # 漏斗容量 capacity
3) (integer) 15   # 漏斗剩余空间 left_quota
4) (integer) -1   # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2    # 多长时间后,漏斗完全空出来 (left_quota == capacity,单位秒)

版权声明:本文为hbtj_1216原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。