需求
项目有一个群发短信提醒的模块,这种东西做出来就需要谨慎,要做好单位时间限流,不然因为bug而导致短信发送失控那就不得了了,基于此需求,redis就是个理想的中间件来实现我们的需求,最近也在看掘金小册的《深入理解Redis核心原理与应用实践》,学以致用一下。
漏斗限流
漏斗限流大白话就是 一个漏斗的容量是一定的,根据预先设定好的 流速,漏斗中的水是慢慢增加的,当漏斗中的剩余容量有富余时,你就可以从里面舀水(限流器同意执行想要的操作),反之,漏斗当前时间是空的 或者 很少(小于1),漏斗就会限制你继续操作。
漏斗抽象出来的VO原型就是这样的:
VO定义了漏斗的基本属性,每个字段都有注释,这里只讲一下构造器,这个构造器只有两个参数,capacity漏斗容量就不说了,leakingDuration定义了 漏斗 从空到满 所需要的时间(秒),为什么不直接传 leakingRate(流速) ?
因为直接传流速,有可能值比较小,比如我的需求是 30分钟只执行一次,那你需要算 1/1800(秒)传进去,自己算比较麻烦,代码也不够友好。二来我们到时候这个结构是存进redis里,出于对优化redis空间的考虑,我们需要一个ttl(过期)时间,ttl时间就取这个 (leakingDuration+10)秒,这样既不会影响业务(30分钟内你没有发起过请求,我就没必要一直留着这个结构,下次一切重来即可)。
@Data
@AllArgsConstructor
public class Funnel {
/**
* 漏斗容量
*/
long capacity;
/**
* 漏斗剩余空间
*/
long leftQuota;
/**
* 上一次流水时间(时间戳)
*/
long leakingTs;
/**
* 漏斗流水速率(capacity / leakingDuration)
*/
float leakingRate;
/**
* @param capacity 漏斗容量
* @param leakingDuration 漏斗 从空到满 所需要的时间(秒)
*/
public Funnel(long capacity, float leakingDuration) {
this.capacity = capacity;
this.leakingRate = capacity / leakingDuration;
this.leftQuota = capacity;
this.leakingTs = System.currentTimeMillis();
}
}
然后我们是spring + jedis项目,所以先初始化jedis
@Bean
public JedisPool redisPoolFactory() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setBlockWhenExhausted(true);
jedisPoolConfig.setMaxWaitMillis(redissonProperties.getTimeout());
JedisPool jedisPool = new JedisPool(jedisPoolConfig, redissonProperties.getHost(), Integer.valueOf(redissonProperties.getPort()), redissonProperties.getTimeout(), redissonProperties.getPassword(), null != redissonProperties.getDatabase() ? redissonProperties.getDatabase() : 0);
return jedisPool;
}
先贴一下 完整的代码,里面的isActionAllowed方法就是唯一暴露给调用方的方法,也是漏斗限流的主方法
@Component
@Slf4j
public class FunnelRateLimiter {
@Autowired
private JedisPool jedisPool;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
public static final String OK = "OK";
private final String LOCK_IS_ACTIONALLOWED = "isActionAllowed";
/**
* @param unionId 限流唯一id
* @param actionKey 限流类型
* @param capacity 漏斗容量
* @param leakingDuration 漏斗 从空到满 所需时间,用于计算漏斗流速(秒)
* @param quota 漏斗配额步长(一次请求多少配额,一般为1次)
* @return 是否允许操作
*/
public boolean isActionAllowed(String unionId, String actionKey, long capacity, float leakingDuration, int quota) {
boolean tryLock = tryLock(LOCK_IS_ACTIONALLOWED, 5, 5);
if (!tryLock) {
log.error("isActionAllowed No lock taken {} {}", unionId, actionKey);
return false;
}
try (Jedis jedis = jedisPool.getResource()) {
String key = String.format("infra-monitor:funnel:%s:%s", actionKey, unionId);
log.info("new Funnel key {}", key);
Funnel funnel = getFunnel(jedis, key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingDuration);
log.info("new Funnel {}", funnel);
}
boolean flag = watering(funnel, quota);
// 对数据进行更新
putFunnel(jedis, key, funnel, leakingDuration);
return flag;
} catch (Exception e) {
log.error("FunnelRateLimiter isActionAllowed error {}", e);
return false;
} finally {
unlock(LOCK_IS_ACTIONALLOWED);
}
}
private Funnel getFunnel(Jedis jedis, String key) {
Map<String, String> funnelMap = jedis.hgetAll(key);
if (null != funnelMap && funnelMap.size() > 0) {
return new Funnel(Long.valueOf(funnelMap.get("capacity")), Long.valueOf(funnelMap.get("leftQuota")), Long.valueOf(funnelMap.get("leakingTs")), Float.valueOf(funnelMap.get("leakingRate")));
}
return null;
}
void putFunnel(Jedis jedis, String key, Funnel funnel, float leakingDuration) {
HashMap<String, String> funnelMap = new HashMap<>(4);
funnelMap.put("capacity", String.valueOf(funnel.getCapacity()));
funnelMap.put("leftQuota", String.valueOf(funnel.getLeftQuota()));
funnelMap.put("leakingTs", String.valueOf(funnel.getLeakingTs()));
funnelMap.put("leakingRate", String.valueOf(funnel.getLeakingRate()));
Pipeline p = jedis.pipelined();
jedis.hmset(key, funnelMap);
p.expire(key, (int) leakingDuration + 10);
p.sync();
}
private void makeSpace(Funnel funnel) {
long nowTs = System.currentTimeMillis();
long deltaTs = TimeUnit.MILLISECONDS.toSeconds(nowTs - funnel.getLeakingTs());
int deltaQuota = (int) (deltaTs * funnel.getLeakingRate());
// 腾出空间太小,最小单位是1
if (deltaQuota < 1) {
return;
}
funnel.setLeftQuota(funnel.getLeftQuota() + deltaQuota);
funnel.setLeakingTs(nowTs);
if (funnel.getLeftQuota() > funnel.getCapacity()) {
funnel.setLeftQuota(funnel.getCapacity());
}
}
private boolean watering(Funnel funnel, int quota) {
makeSpace(funnel);
if (funnel.getLeftQuota() >= quota) {
funnel.setLeftQuota(funnel.getLeftQuota() - quota);
return true;
}
return false;
}
/**
* 尝试获取锁
*
* @param lockKey
* @param waitTime 最多等待时间(秒)
* @param leaseTime 上锁后自动释放锁时间(秒)
* @return
*/
public boolean tryLock(String lockKey, long waitTime, int leaseTime) {
long currentThreadid = Thread.currentThread().getId();
final String finalLockKey = packageLockKey(lockKey);
try (Jedis jedis = jedisPool.getResource()) {
long time;
long begin = System.currentTimeMillis();
log.info("获取锁开始 {} {}", currentThreadid, begin);
String result = jedis.set(finalLockKey, "1", "NX", "EX", leaseTime);
if (OK.equalsIgnoreCase(result)) {
log.info("锁为空直接拿到锁,Thread {} 拿到锁", currentThreadid);
return true;
}
// 到目前为止已经超时,则返回false
time = System.currentTimeMillis() - begin;
if (time > TimeUnit.SECONDS.toMillis(waitTime)) {
return false;
}
CountDownLatch l = new CountDownLatch(1);
ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(() -> {
long id = Thread.currentThread().getId();
String waitResult = jedis.set(finalLockKey, "1", "NX", "EX", leaseTime);
if (OK.equalsIgnoreCase(waitResult)) {
log.info("轮询阶段拿到锁,Thread {} 拿到锁", id);
l.countDown();
throw new RuntimeException();
}
}, 0, 500, TimeUnit.MILLISECONDS);
boolean await = l.await(TimeUnit.SECONDS.toMillis(waitTime) - time, TimeUnit.MILLISECONDS);
if (await) {
log.info("拿锁阶段,Thread {} 拿到锁", currentThreadid);
} else {
scheduledFuture.cancel(true);
}
return await;
} catch (InterruptedException e) {
log.error("FunnelRateLimiter InterruptedException {}", e);
return false;
}
}
/**
* 释放锁
*
* @param lockKey
*/
public void unlock(String lockKey) {
final String finalLockKey = packageLockKey(lockKey);
try (Jedis jedis = jedisPool.getResource()) {
Long del = jedis.del(finalLockKey);
if (del > 0) {
long currentThreadid = Thread.currentThread().getId();
log.info("FunnelRateLimiter 锁已经释放 Thread {}", currentThreadid);
}
} catch (Exception e) {
log.error("FunnelRateLimiter unlock error {}", e);
}
}
private String packageLockKey(String lockKey) {
return String.format("infra-monitor:distributedLock:%s", lockKey);
}
}
isActionAllowed方法里面的tryLock()调用 和 finally 里的unlock()调用 属于分布式锁的内容,暂时别看,我们先以一个实际调用来解释代码:
这个方法就等于控制 短信的发送量在 每30分钟/1次的频率上。
- unionId: 手机号自不必说,属于业务id
- actionKey:业务类型id,因为同一个业务类型内 不同的业务id 应该享有相同的业务规则(其实不相同也没关系,我写的规则的细粒度可以精细到业务id),更多做一个标识分类用,方便redis查看同一业务类型的总体情况。
- capacity:漏斗容量
- leakingDuration:漏斗全部漏完所需时间,用于计算漏斗流速(秒),前面也解释了为什么填这个。
- quota:配额步长,这个参数用于一些特殊需求,一般参数都是1,如果每次往漏斗里灌水的量大于1的话,这个参数就起作用。
funnelRateLimiter.isActionAllowed("188xxxxxxxx", "unfinishedAlarm", 1, 1800, 1)
isActionAllowed里面比较重要调用的方法是makeSpace(), 意在重新计算漏斗空间。当一个新的请求过来,我们需要根据
(当前时间 与 leakingTs(上一次流水时间) 的间隔 ) * leakingRate(漏斗流速) = 已经增加的水量;
然后
leftQuota(剩余水量) + 已经增加的水量 = 当前实际剩余水量;
当 当前实际剩余水量 > 漏斗容量 时,以漏斗容量为主
private void makeSpace(Funnel funnel) {
long nowTs = System.currentTimeMillis();
long deltaTs = TimeUnit.MILLISECONDS.toSeconds(nowTs - funnel.getLeakingTs());
int deltaQuota = (int) (deltaTs * funnel.getLeakingRate());
// 腾出空间太小,最小单位是1
if (deltaQuota < 1) {
return;
}
funnel.setLeftQuota(funnel.getLeftQuota() + deltaQuota);
funnel.setLeakingTs(nowTs);
if (funnel.getLeftQuota() > funnel.getCapacity()) {
funnel.setLeftQuota(funnel.getCapacity());
}
}
当执行完makeSpace操作,漏斗只需要 减掉 配额步长 即可
private boolean watering(Funnel funnel, int quota) {
makeSpace(funnel);
if (funnel.getLeftQuota() >= quota) {
funnel.setLeftQuota(funnel.getLeftQuota() - quota);
return true;
}
return false;
}
当然,一次舀水动作完成后 别忘了重新对redis里的hash结构进行更新
// 对数据进行更新
putFunnel(jedis, key, funnel, leakingDuration);
在redis里对vo的存储用了hash结构,并且因为还需要加上ttl设置的操作,我们可以用pipeline(管道)对命令进行批量操作,减少多指令情况下的IO开销。
void putFunnel(Jedis jedis, String key, Funnel funnel, float leakingDuration) {
HashMap<String, String> funnelMap = new HashMap<>(4);
funnelMap.put("capacity", String.valueOf(funnel.getCapacity()));
funnelMap.put("leftQuota", String.valueOf(funnel.getLeftQuota()));
funnelMap.put("leakingTs", String.valueOf(funnel.getLeakingTs()));
funnelMap.put("leakingRate", String.valueOf(funnel.getLeakingRate()));
Pipeline p = jedis.pipelined();
jedis.hmset(key, funnelMap);
p.expire(key, (int) leakingDuration + 10);
p.sync();
}
漏斗限流的逻辑就差不多了,当我们完成后,可能有疑问,这段逻辑在分布式环境下能用吗?答案当然是NO,即使redis指令已经保证原子性了,但是我们无法保证整个过程的原子性,这段代码只能说完成了一半,下一章 我们还得给它加一个"锁"。
最后分享一下掘金的小册,我也是从小册中学习并改变自己原有的代码,可以共同进步。