Redis使用zset和hash实现限流算法

一、基础概念

1.1、限流的定义

所谓限流,在软件系统中,就是指限制某个动作在一段时间内的执行次数。

1.2、限流的好处

限流最直接的好处防止高并发情况下因服务器资源过载导致的系统崩溃情况发生,具体来说,限流可以保证使用有限的资源提供最大化的服务能力,按照预期流量提供服务,超过的部分将会拒绝服务、排队或等待、降级等处理。

1.3、限流的分类

限流按照限流的粒度,可以分为单机限流和分布式限流:
单机
应用级限流方式只是单应用内的请求限流,不能进行全局限流。

  1. 限流总资源数
  2. 限流总并发/连接/请求数
  3. 限流某个接口的总并发/请求数
  4. 限流某个接口的时间窗请求数
  5. 平滑限流某个接口的请求数
  6. Guava RateLimiter

分布式
我们需要分布式限流接入层限流来进行全局限流。

  1. redis+lua实现中的lua脚本
  2. 使用Nginx+Lua实现的Lua脚本
  3. 使用 OpenResty 开源的限流方案
  4. 限流框架,比如Sentinel实现降级限流熔断

二、限流实现

2.1、简单限流

使用zset做限流的基本思想就是对于每次行为,都统计出前一段时间发生的行为数,判断当前行为是否过于频繁;
zset中存储的数据结构:

key: userid:actionKey
value: nowTs
score: nowTs

算法实现:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	"strconv"
	"sync"
	"time"
)

type RedisLimiter struct {
	client  *redis.Client
	key     string
	limit   int64
	timeout time.Duration
	lock    sync.Mutex
}

func NewRedisLimiter(client *redis.Client, key string, limit int64, timeout time.Duration) *RedisLimiter {
	return &RedisLimiter{
		client:  client,
		key:     key,
		limit:   limit,
		timeout: timeout,
	}
}

func (l *RedisLimiter) Allow() bool {
	l.lock.Lock()
	defer l.lock.Unlock()
	now := time.Now().UnixNano()
	// 删除过期的时间戳
	fmt.Println("before: ")
	l.client.ZRemRangeByScore(l.key, "0", strconv.FormatInt(now-l.timeout.Nanoseconds(), 10))

	// 获取时间窗口内的请求数量
	count := l.client.ZCount(l.key, strconv.FormatInt(now-l.timeout.Nanoseconds(), 10), "+inf").Val()
	fmt.Println("after: ", l.client.ZCard(l.key))
	if count < l.limit {
		// 添加新的时间戳
		l.client.ZAdd(l.key, redis.Z{
			Score:  float64(now),
			Member: now,
		})
		return true
	}
	return false
}

const (
	LimiterKey = "%s:%s"
)

func main() {
	client := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		DB:       0,
	})
	if client == nil {
		fmt.Println("Err")
		return
	}
	user := "xiaogao"
	action := "addFriend"
	oneSecond, _ := time.ParseDuration("1s")
	limiter := NewRedisLimiter(client, fmt.Sprintf(LimiterKey, user, action), 100, oneSecond)

	milli := time.Now().UnixNano()
	for i := 0; i < 1000; i++ {
		//time.Sleep(time.Millisecond * 5)
		allowed := limiter.Allow()
		if allowed {
			fmt.Println("第", i, "次请求成功: ", time.Now().UnixNano()-milli)
		} else {
			fmt.Println("请求过于频繁,休息10秒")
			time.Sleep(time.Second * 10)
		}
	}

}

2.2、令牌桶实现

下面的算法思想来自《Redis深度历险》中关于漏斗限流的实现,但笔者认为这种算法因为可以容纳“突发流量”,所以更像令牌桶算法。

上面说的使用zset进行限流,有一个明显的缺点,就是需要存放一段时间内的所有用户行为,当限流阈值过大时,比如246060s内操作不超过100w次,那就很耗费内存,这时就可以参考令牌桶算法;
image.png
简单描述令牌桶算法就是,有一个桶,桶中初始有n个令牌,同时令牌会以一定的速率生成,请求到达时,需要先判断桶中是否有足够的令牌,如果有,请求可以执行,否则就需要等待令牌生成。
“桶”结构应包含如下参数

type Funnel struct {
	Capacity     int // 桶的初始容量
	LeakyRate    float64 // 令牌生成速率
	LeftCapacity int // 剩余容量
	LastLeakyTs  int64 // 最后一次更新容量的时间
}

算法实现:

package main

import (
	"fmt"
	"sync"
	"time"
)

type Funnel struct {
	Capacity     int
	LeakyRate    float64
	LeftCapacity int
	LastLeakyTs  int64
}

var funnels map[string]*Funnel

func (funnel *Funnel) makeSpace() {
	nowTs := time.Now().Unix()
	intervalTs := nowTs - funnel.LastLeakyTs
	relasedCapacity := float64(intervalTs) * funnel.LeakyRate
	if relasedCapacity < 0 {
		funnel.LeftCapacity = funnel.Capacity
		funnel.LastLeakyTs = nowTs
		return
	}
	if relasedCapacity < 1 {
		return
	}

	funnel.LeftCapacity += int(relasedCapacity)
	funnel.LastLeakyTs = nowTs
	if funnel.LeftCapacity > funnel.Capacity {
		funnel.LeftCapacity = funnel.Capacity
	}
}

func (funnel *Funnel) waterLeaking(quota int) bool {
	funnel.makeSpace()
	//fmt.Println("LeftCapacity:", funnel.LeftCapacity)
	if funnel.LeftCapacity >= quota {
		funnel.LeftCapacity = funnel.LeftCapacity - quota
		return true
	}
	return false
}

func IsActionAllowed(userId, actionKey string, capacity, quota int, leakyRating float64) bool {
	key := fmt.Sprintf("%s:%s", userId, actionKey)
	funnel, ok := funnels[key]
	if !ok {
		funnel = &Funnel{
			Capacity:     capacity,
			LeftCapacity: capacity,
			LeakyRate:    leakyRating,
			LastLeakyTs:  time.Now().Unix(),
		}
		funnels[key] = funnel
	}
	return funnel.waterLeaking(quota)

}

func main() {
	user := "xiao"
	action := "addFriend"
	funnels = make(map[string]*Funnel, 10)
	group := sync.WaitGroup{}
	for i := 0; i < 1000; i++ {
		time.Sleep(time.Millisecond * 10)
		group.Add(1)
		allowed := IsActionAllowed(user, action, 10, 1, 0.5)
		group.Done()
		if allowed {
			fmt.Println("第", i, "次请求成功")
		} else {
			fmt.Println("请求过于频繁,休息10秒")
			time.Sleep(time.Second * 10)
		}
	}
	group.Wait()
}

参考


版权声明:本文为alpha_xia原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。