Redisson 中 RRateLimiter 的令牌桶算法

令牌桶算法在 Redisson 的 RRateLimiter 中通过三个 Redis 数据结构巧妙实现:Hash 存储基础配置,Zset 记录请求时间戳,String 跟踪可用令牌数。其核心优势在于懒惰删除过期请求、精确时间窗口控制和非阻塞限流机制,既保证分布式一致性又实现高效限流,尽管不保证请求的公平处理顺序。

令牌桶 (Token Bucket) 是一个常用的限流算法。想象一个固定大小的桶,系统会以固定的速率往桶里放入令牌。当请求到来时,需要从桶中获取一个令牌才能继续处理,如果桶中没有令牌,请求就会被拒绝或等待。这种机制可以很好地控制系统的请求处理速率。

虽然底层使用的是令牌桶算法,但当 limit 设置为 1 时,它的行为更像是滑动窗口限流:在固定时间窗口内只允许一个请求通过。这是因为:

  1. 令牌产生速率被设置为每 60 秒 1 个
  2. 桶的容量实际上也是 1
  3. 每次请求都需要消耗一个令牌

下面详细解释一下 Redisson 中 RRateLimiter 的令牌桶算法实现原理。

定义一个通用的限流器:

/**  
 * 限流服务  
 */
 public class SlidingWindowRateLimiter {  
  
    private RedissonClient redissonClient;  
  
    private static final String LIMIT_KEY_PREFIX = "nft:turbo:limit:";  
  
    public SlidingWindowRateLimiter(RedissonClient redissonClient) {  
        this.redissonClient = redissonClient;  
    }  
  
    public Boolean tryAcquire(String key, int limit, int windowSize) {  
        RRateLimiter rRateLimiter = redissonClient.getRateLimiter(LIMIT_KEY_PREFIX + key);  
  
        if (!rRateLimiter.isExists()) {  
            rRateLimiter.trySetRate(RateType.OVERALL, limit, windowSize, RateIntervalUnit.SECONDS);  
        }  
  
        return rRateLimiter.tryAcquire();  
    }  
}

测试:

@Test
public void tryAcquire1() {
    Boolean result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
    Assert.assertTrue(result);
    result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
    Assert.assertTrue(result);
    result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
    Assert.assertTrue(result);
    result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
    Assert.assertFalse(result);

    try {
        Thread.currentThread().sleep(10000);
    }catch (Exception e){

    }
    result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
    Assert.assertTrue(result);

}

Redisson 的 RRateLimiter 在 Redis 中会生成三个数据结构:

  1. Hash 结构(基础配置)

键名:nft:turbo:limit:testLock997

HMSET "nft:turbo:limit:testLock997" 
    "rate" "3"           # 令牌产生速率:每个时间间隔产生3个令牌
    "interval" "10000"   # 时间间隔:10000毫秒(10秒)
    "type" "0"          # 限流类型:0表示集群限流模式

这个 Hash 结构存储了限流器的基本配置信息。通过这些参数,我们可以知道这个限流器被配置为每 10 秒允许 3 个请求通过,并且是在集群范围内生效的。

  1. Zset 结构(令牌分配记录)

键名:{nft:turbo:limit:testLock997}:permits

这个有序集合用来记录令牌的分配情况。其中:

  • score(分数)是时间戳:1737885857942
  • member(成员)是一个二进制值:"\x10\x0e)/\x91y"\x1f\xb8j\xd2\x06*.\xd6|N\x01\x00\x00\x00"

这个结构的巧妙之处在于:

  • 使用时间戳作为分数,可以方便地清理过期的令牌记录
  • 二进制值包含了令牌的标识信息,确保每个令牌的唯一性
  • 有序集合的特性让我们可以高效地按时间范围查询和清理令牌
  1. String 结构(当前可用令牌数)

键名:{nft:turbo:limit:testLock997}:value

值:2

这个简单的字符串值记录了当前可用的令牌数量。在这个例子中,还剩 2 个令牌可以使用。这个计数器会随着:

  • 时间推移而增加(但不超过配置的速率限制)
  • 令牌被使用而减少

源码分析

  1. 基础配置获取和验证
local rate = redis.call("hget", KEYS[1], "rate")
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")

这部分代码从 Redis hash 中获取限流器的基本配置。这些配置在初始化时就已经设置好了:

  • rate:令牌生成速率
  • interval:时间窗口大小(毫秒)
  • type:运行模式(单机/集群)
  1. 键名准备
local valueName = KEYS[2]
local permitsName = KEYS[4]
if type == "1" then
    valueName = KEYS[3]
    permitsName = KEYS[5]
end

这里设置了两个重要的 Redis 键:

  • valueName:存储当前可用令牌数的键
  • permitsName:存储请求历史的有序集合键
    type == "1" 的判断是为了支持单机模式,这种情况下使用不同的键名。
  1. 令牌桶状态检查和更新
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
    local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)

这是一个关键部分。当获取到当前令牌数后,系统会检查是否有已经过期的请求。过期的判断标准是:

请求时间 < (当前时间 - 时间窗口)

举个例子:

假设时间窗口是 1 秒(1000ms),当前时间是 15000ms:

  • 系统会查找 score < (15000 - 1000) = 14000 的所有请求
  • 这些请求都被认为是 " 过期 " 的,它们占用的令牌可以被释放了
  1. 令牌释放机制
local released = 0
for i, v in ipairs(expiredValues) do
    local random, permits = struct.unpack("fI", v)
    released = released + permits
end

if released > 0 then
    redis.call("zrem", permitsName, unpack(expiredValues))
    currentValue = tonumber(currentValue) + released
    redis.call("set", valueName, currentValue)
end

这段代码实现了令牌的回收机制:

  • 遍历所有过期的请求,累加它们占用的令牌数
  • 如果有令牌要释放(released > 0):
    • 从有序集合中删除这些过期请求
    • 将释放的令牌数添加到当前可用令牌数中

例如:

如果 rate=5,有三个过期请求各使用了 1 个令牌,那么:

  • released = 3
  • currentValue 会增加 3
  • 这三个请求记录会从 ZSET 中删除
  1. 令牌分配决策
if tonumber(currentValue) < tonumber(ARGV[1]) then
    local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1)
    local random, permits = struct.unpack("fI", nearest[1])
    return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
    redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
    redis.call("decrby", valueName, ARGV[1])
    return nil
end

这是决策部分:

  • 如果令牌不够(currentValue < 请求数量):
    • 找到最近一次请求的时间
    • 计算还需要等待多久才能获得足够的令牌
    • 返回等待时间
  • 如果令牌充足:
    • 记录本次请求到有序集合
    • 减少可用令牌数
    • 返回 nil(表示可以立即获取令牌)
  1. 首次请求处理
else
    redis.call("set", valueName, rate)
    redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
    redis.call("decrby", valueName, ARGV[1])
    return nil
end

这是处理首次请求的逻辑:

  • 初始化令牌桶,设置满额令牌
  • 记录首次请求信息
  • 扣减使用的令牌数

这个实现的巧妙之处在于:

  1. 使用 ZSET 的 score 作为时间戳,可以精确控制时间窗口
  2. 懒惰删除策略:只在新请求来临时才清理过期数据
  3. 通过返回等待时间而不是直接阻塞,实现了非阻塞的限流
  4. 使用打包的数据结构(struct.pack)高效存储请求信息

这样的设计既保证了分布式环境下的一致性,又实现了高效的限流控制。需要注意的是,这个实现是非公平的,因为它不保证等待中的请求按照到达顺序获取令牌。

完整源码解读

-- 基础配置获取部分
-- 示例:假设配置的限流规则是每秒允许5个请求(rate=5, interval=1000ms)

-- 从Redis的hash结构中获取限流器的基本配置
-- rate:令牌产生速率,例如 rate = 5 表示每个时间窗口产生5个令牌
local rate = redis.call("hget", KEYS[1], "rate")
-- interval:时间窗口大小(毫秒),例如 interval = 1000 表示1秒
local interval = redis.call("hget", KEYS[1], "interval")
-- type:运行模式(1=单机,其他=集群)
local type = redis.call("hget", KEYS[1], "type")
-- 确保限流器已经正确初始化
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")

-- 令牌桶相关的Redis键名设置
-- valueName:存储当前可用令牌数的键,格式为 {limitername}:value
-- 例如:myRateLimiter:value = 3 表示当前有3个可用令牌
local valueName = KEYS[2]
-- permitsName:存储请求历史的有序集合键,格式为 {limitername}:permits
-- 使用ZSET结构,score为请求时间戳,value为打包的请求信息
local permitsName = KEYS[4]

-- 单机模式特殊处理:使用不同的键名
if type == "1" then
    valueName = KEYS[3]
    permitsName = KEYS[5]
end

-- 验证请求的令牌数不超过配置的速率
-- 例如:如果rate=5,那么单次请求不能要求超过5个令牌
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")

-- 获取当前可用的令牌数
-- 首次请求时为null,后续请求会返回当前剩余的令牌数
local currentValue = redis.call("get", valueName)

-- 主要处理逻辑
if currentValue ~= false then
    -- === 清理过期请求,释放令牌 ===
    -- 示例:当前时间为15000ms,interval=1000ms
    -- 则查找score在[0, 14000]范围内的请求,这些都是过期请求
    local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
    
    -- 计算要释放的令牌总数
    local released = 0
    -- 遍历过期的请求记录,累加它们占用的令牌数
    -- struct.unpack("fI", v) 解包:f=随机数(float),I=请求的令牌数(integer)
    for i, v in ipairs(expiredValues) do
        local random, permits = struct.unpack("fI", v)
        released = released + permits
    end

    -- 如果有过期的令牌需要释放
    -- 示例:如果找到两个过期请求,每个占用1个令牌,则released=2
    if released > 0 then
        -- 从ZSET中删除这些过期请求的记录
        redis.call("zrem", permitsName, unpack(expiredValues))
        -- 将释放的令牌加回到可用令牌池中
        -- 例如:如果当前有1个令牌,释放2个,则更新后有3个可用令牌
        currentValue = tonumber(currentValue) + released
        redis.call("set", valueName, currentValue)
    end

    -- === 处理当前请求 ===
    -- 检查令牌是否足够
    if tonumber(currentValue) < tonumber(ARGV[1]) then
        -- 令牌不足,计算需要等待的时间
        -- 查找最近一次请求的时间,用于计算下一次可以获取令牌的时间点
        -- 示例:如果最近一次请求在14500ms,当前在15000ms,interval=1000ms
        -- 则需要等待到15500ms才能获取新令牌
        local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1)
        local random, permits = struct.unpack("fI", nearest[1])
        -- 返回需要等待的毫秒数
        return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
    else
        -- 令牌充足,记录本次请求并扣减令牌
        -- ARGV[2]:当前时间戳
        -- ARGV[3]:用于生成随机数的种子
        -- ARGV[1]:请求的令牌数
        redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
        -- 扣减使用的令牌数
        redis.call("decrby", valueName, ARGV[1])
        -- 返回nil表示可以立即获取令牌
        return nil
    end
else
    -- === 首次请求的特殊处理 ===
    -- 初始化令牌桶,设置满额令牌
    -- 例如:rate=5时,设置5个初始令牌
    redis.call("set", valueName, rate)
    -- 记录首次请求信息到ZSET
    redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
    -- 扣减本次使用的令牌数
    redis.call("decrby", valueName, ARGV[1])
    -- 返回nil表示可以立即获取令牌
    return nil
end

具体示例场景

假设配置为每秒 5 个令牌(rate=5, interval=1000ms),模拟以下请求序列:

  1. 首次请求(t=1000ms,请求 1 个令牌):

    • 初始化令牌桶:5 个令牌
    • 记录请求到 ZSET:score=1000
    • 扣减 1 个令牌,剩余 4 个
    • 返回 nil(立即通过)
  2. 快速连续请求(t=1100ms,请求 2 个令牌):

    • 当前有 4 个令牌,足够使用
    • 记录请求到 ZSET:score=1100
    • 扣减 2 个令牌,剩余 2 个
    • 返回 nil(立即通过)
  3. 令牌不足请求(t=1200ms,请求 3 个令牌):

    • 当前只有 2 个令牌,不够使用
    • 查找最近请求时间(1100ms)
    • 计算等待时间:需要等到 2000ms 才能获得新的令牌
    • 返回 800ms 的等待时间
  4. 新的时间窗口请求(t=2100ms,请求 1 个令牌):

    • 检查过期请求:找到 1000ms 和 1100ms 的请求
    • 释放这些请求占用的令牌(1+2=3 个)
    • 当前可用令牌:2+3=5 个
    • 记录新请求,扣减 1 个令牌
    • 返回 nil(立即通过)

这个实现通过巧妙运用 Redis 的数据结构和 Lua 脚本的原子性,实现了高效的分布式限流控制。它的关键特点是:

  1. 懒惰删除:只在必要时才清理过期请求
  2. 精确控制:通过时间戳和 ZSET 实现精确的时间窗口控制
  3. 非阻塞:通过返回等待时间而不是直接阻塞来处理令牌不足的情况
LICENSED UNDER CC BY-NC-SA 4.0
Comment