Redis的用武之处--由短信限流引出的分布式锁和漏斗限流(上)

需求

项目有一个群发短信提醒的模块,这种东西做出来就需要谨慎,要做好单位时间限流,不然因为bug而导致短信发送失控那就不得了了,基于此需求,redis就是个理想的中间件来实现我们的需求,最近也在看掘金小册的《深入理解Redis核心原理与应用实践》,学以致用一下。

漏斗限流

漏斗限流大白话就是 一个漏斗的容量是一定的,根据预先设定好的 流速,漏斗中的水是慢慢增加的,当漏斗中的剩余容量有富余时,你就可以从里面舀水(限流器同意执行想要的操作),反之,漏斗当前时间是空的 或者 很少(小于1),漏斗就会限制你继续操作。

漏斗抽象出来的VO原型就是这样的:

VO定义了漏斗的基本属性,每个字段都有注释,这里只讲一下构造器,这个构造器只有两个参数,capacity漏斗容量就不说了,leakingDuration定义了 漏斗 从空到满 所需要的时间(秒),为什么不直接传 leakingRate(流速) ?

因为直接传流速,有可能值比较小,比如我的需求是 30分钟只执行一次,那你需要算 1/1800(秒)传进去,自己算比较麻烦,代码也不够友好。二来我们到时候这个结构是存进redis里,出于对优化redis空间的考虑,我们需要一个ttl(过期)时间,ttl时间就取这个 (leakingDuration+10)秒,这样既不会影响业务(30分钟内你没有发起过请求,我就没必要一直留着这个结构,下次一切重来即可)。

@Data
@AllArgsConstructor
public class Funnel {
    /**
     * 漏斗容量
     */
    long capacity;
    /**
     * 漏斗剩余空间
     */
    long leftQuota;
    /**
     * 上一次流水时间(时间戳)
     */
    long leakingTs;

    /**
     * 漏斗流水速率(capacity / leakingDuration)
     */
    float leakingRate;


    /**
     * @param capacity        漏斗容量
     * @param leakingDuration 漏斗 从空到满 所需要的时间(秒)
     */
    public Funnel(long capacity, float leakingDuration) {
        this.capacity = capacity;
        this.leakingRate = capacity / leakingDuration;
        this.leftQuota = capacity;
        this.leakingTs = System.currentTimeMillis();
    }
}

然后我们是spring + jedis项目,所以先初始化jedis

    @Bean
    public JedisPool redisPoolFactory() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setBlockWhenExhausted(true);
        jedisPoolConfig.setMaxWaitMillis(redissonProperties.getTimeout());
        JedisPool jedisPool = new JedisPool(jedisPoolConfig, redissonProperties.getHost(), Integer.valueOf(redissonProperties.getPort()), redissonProperties.getTimeout(), redissonProperties.getPassword(), null != redissonProperties.getDatabase() ? redissonProperties.getDatabase() : 0);
        return jedisPool;
    }

先贴一下 完整的代码,里面的isActionAllowed方法就是唯一暴露给调用方的方法,也是漏斗限流的主方法

@Component
@Slf4j
public class FunnelRateLimiter {

    @Autowired
    private JedisPool jedisPool;

    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

    public static final String OK = "OK";

    private final String LOCK_IS_ACTIONALLOWED = "isActionAllowed";

    /**
     * @param unionId         限流唯一id
     * @param actionKey       限流类型
     * @param capacity        漏斗容量
     * @param leakingDuration 漏斗 从空到满 所需时间,用于计算漏斗流速(秒)
     * @param quota           漏斗配额步长(一次请求多少配额,一般为1次)
     * @return 是否允许操作
     */
    public boolean isActionAllowed(String unionId, String actionKey, long capacity, float leakingDuration, int quota) {
        boolean tryLock = tryLock(LOCK_IS_ACTIONALLOWED, 5, 5);
        if (!tryLock) {
            log.error("isActionAllowed No lock taken {} {}", unionId, actionKey);
            return false;
        }

        try (Jedis jedis = jedisPool.getResource()) {
            String key = String.format("infra-monitor:funnel:%s:%s", actionKey, unionId);
            log.info("new Funnel key {}", key);
            Funnel funnel = getFunnel(jedis, key);
            if (funnel == null) {
                funnel = new Funnel(capacity, leakingDuration);
                log.info("new Funnel {}", funnel);
            }
            boolean flag = watering(funnel, quota);
//            对数据进行更新
            putFunnel(jedis, key, funnel, leakingDuration);
            return flag;
        } catch (Exception e) {
            log.error("FunnelRateLimiter isActionAllowed error {}", e);
            return false;
        } finally {
            unlock(LOCK_IS_ACTIONALLOWED);
        }
    }


    private Funnel getFunnel(Jedis jedis, String key) {
        Map<String, String> funnelMap = jedis.hgetAll(key);
        if (null != funnelMap && funnelMap.size() > 0) {
            return new Funnel(Long.valueOf(funnelMap.get("capacity")), Long.valueOf(funnelMap.get("leftQuota")), Long.valueOf(funnelMap.get("leakingTs")), Float.valueOf(funnelMap.get("leakingRate")));
        }
        return null;
    }


    void putFunnel(Jedis jedis, String key, Funnel funnel, float leakingDuration) {
        HashMap<String, String> funnelMap = new HashMap<>(4);
        funnelMap.put("capacity", String.valueOf(funnel.getCapacity()));
        funnelMap.put("leftQuota", String.valueOf(funnel.getLeftQuota()));
        funnelMap.put("leakingTs", String.valueOf(funnel.getLeakingTs()));
        funnelMap.put("leakingRate", String.valueOf(funnel.getLeakingRate()));

        Pipeline p = jedis.pipelined();
        jedis.hmset(key, funnelMap);
        p.expire(key, (int) leakingDuration + 10);
        p.sync();
    }


    private void makeSpace(Funnel funnel) {
        long nowTs = System.currentTimeMillis();
        long deltaTs = TimeUnit.MILLISECONDS.toSeconds(nowTs - funnel.getLeakingTs());
        int deltaQuota = (int) (deltaTs * funnel.getLeakingRate());
        // 腾出空间太小,最小单位是1
        if (deltaQuota < 1) {
            return;
        }
        funnel.setLeftQuota(funnel.getLeftQuota() + deltaQuota);
        funnel.setLeakingTs(nowTs);
        if (funnel.getLeftQuota() > funnel.getCapacity()) {
            funnel.setLeftQuota(funnel.getCapacity());
        }
    }

    private boolean watering(Funnel funnel, int quota) {
        makeSpace(funnel);
        if (funnel.getLeftQuota() >= quota) {
            funnel.setLeftQuota(funnel.getLeftQuota() - quota);
            return true;
        }
        return false;
    }


    /**
     * 尝试获取锁
     *
     * @param lockKey
     * @param waitTime  最多等待时间(秒)
     * @param leaseTime 上锁后自动释放锁时间(秒)
     * @return
     */
    public boolean tryLock(String lockKey, long waitTime, int leaseTime) {
        long currentThreadid = Thread.currentThread().getId();
        final String finalLockKey = packageLockKey(lockKey);
        try (Jedis jedis = jedisPool.getResource()) {
            long time;
            long begin = System.currentTimeMillis();
            log.info("获取锁开始 {} {}", currentThreadid, begin);
            String result = jedis.set(finalLockKey, "1", "NX", "EX", leaseTime);
            if (OK.equalsIgnoreCase(result)) {
                log.info("锁为空直接拿到锁,Thread {} 拿到锁", currentThreadid);
                return true;
            }

//            到目前为止已经超时,则返回false
            time = System.currentTimeMillis() - begin;
            if (time > TimeUnit.SECONDS.toMillis(waitTime)) {
                return false;
            }
            CountDownLatch l = new CountDownLatch(1);
            ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(() -> {
                long id = Thread.currentThread().getId();
                String waitResult = jedis.set(finalLockKey, "1", "NX", "EX", leaseTime);
                if (OK.equalsIgnoreCase(waitResult)) {
                    log.info("轮询阶段拿到锁,Thread {} 拿到锁", id);
                    l.countDown();
                    throw new RuntimeException();
                }
            }, 0, 500, TimeUnit.MILLISECONDS);
            boolean await = l.await(TimeUnit.SECONDS.toMillis(waitTime) - time, TimeUnit.MILLISECONDS);
            if (await) {
                log.info("拿锁阶段,Thread {} 拿到锁", currentThreadid);
            } else {
                scheduledFuture.cancel(true);
            }
            return await;
        } catch (InterruptedException e) {
            log.error("FunnelRateLimiter InterruptedException {}", e);
            return false;
        }
    }

    /**
     * 释放锁
     *
     * @param lockKey
     */
    public void unlock(String lockKey) {
        final String finalLockKey = packageLockKey(lockKey);
        try (Jedis jedis = jedisPool.getResource()) {
            Long del = jedis.del(finalLockKey);
            if (del > 0) {
                long currentThreadid = Thread.currentThread().getId();
                log.info("FunnelRateLimiter 锁已经释放 Thread {}", currentThreadid);
            }
        } catch (Exception e) {

            log.error("FunnelRateLimiter unlock error {}", e);
        }
    }

    private String packageLockKey(String lockKey) {
        return String.format("infra-monitor:distributedLock:%s", lockKey);
    }
}

isActionAllowed方法里面的tryLock()调用 和 finally 里的unlock()调用 属于分布式锁的内容,暂时别看,我们先以一个实际调用来解释代码:

这个方法就等于控制 短信的发送量在 每30分钟/1次的频率上。

  • unionId: 手机号自不必说,属于业务id
  • actionKey:业务类型id,因为同一个业务类型内 不同的业务id 应该享有相同的业务规则(其实不相同也没关系,我写的规则的细粒度可以精细到业务id),更多做一个标识分类用,方便redis查看同一业务类型的总体情况。
  • capacity:漏斗容量
  • leakingDuration:漏斗全部漏完所需时间,用于计算漏斗流速(秒),前面也解释了为什么填这个。
  • quota:配额步长,这个参数用于一些特殊需求,一般参数都是1,如果每次往漏斗里灌水的量大于1的话,这个参数就起作用。
funnelRateLimiter.isActionAllowed("188xxxxxxxx", "unfinishedAlarm", 1, 1800, 1)

isActionAllowed里面比较重要调用的方法是makeSpace(), 意在重新计算漏斗空间。当一个新的请求过来,我们需要根据

(当前时间leakingTs(上一次流水时间) 的间隔 ) * leakingRate(漏斗流速) = 已经增加的水量;

然后

leftQuota(剩余水量) + 已经增加的水量 = 当前实际剩余水量;

当前实际剩余水量 > 漏斗容量 时,以漏斗容量为主

private void makeSpace(Funnel funnel) {
        long nowTs = System.currentTimeMillis();
        long deltaTs = TimeUnit.MILLISECONDS.toSeconds(nowTs - funnel.getLeakingTs());
        int deltaQuota = (int) (deltaTs * funnel.getLeakingRate());
        // 腾出空间太小,最小单位是1
        if (deltaQuota < 1) {
            return;
        }
        funnel.setLeftQuota(funnel.getLeftQuota() + deltaQuota);
        funnel.setLeakingTs(nowTs);
        if (funnel.getLeftQuota() > funnel.getCapacity()) {
            funnel.setLeftQuota(funnel.getCapacity());
        }
    }

当执行完makeSpace操作,漏斗只需要 减掉 配额步长 即可

    private boolean watering(Funnel funnel, int quota) {
        makeSpace(funnel);
        if (funnel.getLeftQuota() >= quota) {
            funnel.setLeftQuota(funnel.getLeftQuota() - quota);
            return true;
        }
        return false;
    }

当然,一次舀水动作完成后 别忘了重新对redis里的hash结构进行更新

// 对数据进行更新
putFunnel(jedis, key, funnel, leakingDuration);

在redis里对vo的存储用了hash结构,并且因为还需要加上ttl设置的操作,我们可以用pipeline(管道)对命令进行批量操作,减少多指令情况下的IO开销。

void putFunnel(Jedis jedis, String key, Funnel funnel, float leakingDuration) {
        HashMap<String, String> funnelMap = new HashMap<>(4);
        funnelMap.put("capacity", String.valueOf(funnel.getCapacity()));
        funnelMap.put("leftQuota", String.valueOf(funnel.getLeftQuota()));
        funnelMap.put("leakingTs", String.valueOf(funnel.getLeakingTs()));
        funnelMap.put("leakingRate", String.valueOf(funnel.getLeakingRate()));

        Pipeline p = jedis.pipelined();
        jedis.hmset(key, funnelMap);
        p.expire(key, (int) leakingDuration + 10);
        p.sync();
}

漏斗限流的逻辑就差不多了,当我们完成后,可能有疑问,这段逻辑在分布式环境下能用吗?答案当然是NO,即使redis指令已经保证原子性了,但是我们无法保证整个过程的原子性,这段代码只能说完成了一半,下一章 我们还得给它加一个"锁"。

最后分享一下掘金的小册,我也是从小册中学习并改变自己原有的代码,可以共同进步。
在这里插入图片描述


版权声明:本文为lovejj1994原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。