令牌桶 (Token Bucket) 是一个常用的限流算法。想象一个固定大小的桶,系统会以固定的速率往桶里放入令牌。当请求到来时,需要从桶中获取一个令牌才能继续处理,如果桶中没有令牌,请求就会被拒绝或等待。这种机制可以很好地控制系统的请求处理速率。
虽然底层使用的是令牌桶算法,但当 limit 设置为 1 时,它的行为更像是滑动窗口限流:在固定时间窗口内只允许一个请求通过。这是因为:
- 令牌产生速率被设置为每 60 秒 1 个
- 桶的容量实际上也是 1
- 每次请求都需要消耗一个令牌
下面详细解释一下 Redisson 中 RRateLimiter 的令牌桶算法实现原理。
定义一个通用的限流器:
/**
* 限流服务
*/
public class SlidingWindowRateLimiter {
private RedissonClient redissonClient;
private static final String LIMIT_KEY_PREFIX = "nft:turbo:limit:";
public SlidingWindowRateLimiter(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public Boolean tryAcquire(String key, int limit, int windowSize) {
RRateLimiter rRateLimiter = redissonClient.getRateLimiter(LIMIT_KEY_PREFIX + key);
if (!rRateLimiter.isExists()) {
rRateLimiter.trySetRate(RateType.OVERALL, limit, windowSize, RateIntervalUnit.SECONDS);
}
return rRateLimiter.tryAcquire();
}
}
测试:
@Test
public void tryAcquire1() {
Boolean result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
Assert.assertTrue(result);
result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
Assert.assertTrue(result);
result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
Assert.assertTrue(result);
result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
Assert.assertFalse(result);
try {
Thread.currentThread().sleep(10000);
}catch (Exception e){
}
result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
Assert.assertTrue(result);
}
Redisson 的 RRateLimiter 在 Redis 中会生成三个数据结构:
- Hash 结构(基础配置)
键名:nft:turbo:limit:testLock997
HMSET "nft:turbo:limit:testLock997"
"rate" "3" # 令牌产生速率:每个时间间隔产生3个令牌
"interval" "10000" # 时间间隔:10000毫秒(10秒)
"type" "0" # 限流类型:0表示集群限流模式
这个 Hash 结构存储了限流器的基本配置信息。通过这些参数,我们可以知道这个限流器被配置为每 10 秒允许 3 个请求通过,并且是在集群范围内生效的。
- Zset 结构(令牌分配记录)
键名:{nft:turbo:limit:testLock997}:permits
这个有序集合用来记录令牌的分配情况。其中:
- score(分数)是时间戳:1737885857942
- member(成员)是一个二进制值:"\x10\x0e)/\x91y"\x1f\xb8j\xd2\x06*.\xd6|N\x01\x00\x00\x00"
这个结构的巧妙之处在于:
- 使用时间戳作为分数,可以方便地清理过期的令牌记录
- 二进制值包含了令牌的标识信息,确保每个令牌的唯一性
- 有序集合的特性让我们可以高效地按时间范围查询和清理令牌
- String 结构(当前可用令牌数)
键名:{nft:turbo:limit:testLock997}:value
值:2
这个简单的字符串值记录了当前可用的令牌数量。在这个例子中,还剩 2 个令牌可以使用。这个计数器会随着:
- 时间推移而增加(但不超过配置的速率限制)
- 令牌被使用而减少
源码分析
- 基础配置获取和验证
local rate = redis.call("hget", KEYS[1], "rate")
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
这部分代码从 Redis hash 中获取限流器的基本配置。这些配置在初始化时就已经设置好了:
- rate:令牌生成速率
- interval:时间窗口大小(毫秒)
- type:运行模式(单机/集群)
- 键名准备
local valueName = KEYS[2]
local permitsName = KEYS[4]
if type == "1" then
valueName = KEYS[3]
permitsName = KEYS[5]
end
这里设置了两个重要的 Redis 键:
- valueName:存储当前可用令牌数的键
- permitsName:存储请求历史的有序集合键
type == "1" 的判断是为了支持单机模式,这种情况下使用不同的键名。
- 令牌桶状态检查和更新
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
这是一个关键部分。当获取到当前令牌数后,系统会检查是否有已经过期的请求。过期的判断标准是:
请求时间 < (当前时间 - 时间窗口)
举个例子:
假设时间窗口是 1 秒(1000ms),当前时间是 15000ms:
- 系统会查找 score < (15000 - 1000) = 14000 的所有请求
- 这些请求都被认为是 " 过期 " 的,它们占用的令牌可以被释放了
- 令牌释放机制
local released = 0
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("fI", v)
released = released + permits
end
if released > 0 then
redis.call("zrem", permitsName, unpack(expiredValues))
currentValue = tonumber(currentValue) + released
redis.call("set", valueName, currentValue)
end
这段代码实现了令牌的回收机制:
- 遍历所有过期的请求,累加它们占用的令牌数
- 如果有令牌要释放(released > 0):
- 从有序集合中删除这些过期请求
- 将释放的令牌数添加到当前可用令牌数中
例如:
如果 rate=5,有三个过期请求各使用了 1 个令牌,那么:
- released = 3
- currentValue 会增加 3
- 这三个请求记录会从 ZSET 中删除
- 令牌分配决策
if tonumber(currentValue) < tonumber(ARGV[1]) then
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1)
local random, permits = struct.unpack("fI", nearest[1])
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
redis.call("decrby", valueName, ARGV[1])
return nil
end
这是决策部分:
- 如果令牌不够(currentValue < 请求数量):
- 找到最近一次请求的时间
- 计算还需要等待多久才能获得足够的令牌
- 返回等待时间
- 如果令牌充足:
- 记录本次请求到有序集合
- 减少可用令牌数
- 返回 nil(表示可以立即获取令牌)
- 首次请求处理
else
redis.call("set", valueName, rate)
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
redis.call("decrby", valueName, ARGV[1])
return nil
end
这是处理首次请求的逻辑:
- 初始化令牌桶,设置满额令牌
- 记录首次请求信息
- 扣减使用的令牌数
这个实现的巧妙之处在于:
- 使用 ZSET 的 score 作为时间戳,可以精确控制时间窗口
- 懒惰删除策略:只在新请求来临时才清理过期数据
- 通过返回等待时间而不是直接阻塞,实现了非阻塞的限流
- 使用打包的数据结构(struct.pack)高效存储请求信息
这样的设计既保证了分布式环境下的一致性,又实现了高效的限流控制。需要注意的是,这个实现是非公平的,因为它不保证等待中的请求按照到达顺序获取令牌。
完整源码解读
-- 基础配置获取部分
-- 示例:假设配置的限流规则是每秒允许5个请求(rate=5, interval=1000ms)
-- 从Redis的hash结构中获取限流器的基本配置
-- rate:令牌产生速率,例如 rate = 5 表示每个时间窗口产生5个令牌
local rate = redis.call("hget", KEYS[1], "rate")
-- interval:时间窗口大小(毫秒),例如 interval = 1000 表示1秒
local interval = redis.call("hget", KEYS[1], "interval")
-- type:运行模式(1=单机,其他=集群)
local type = redis.call("hget", KEYS[1], "type")
-- 确保限流器已经正确初始化
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
-- 令牌桶相关的Redis键名设置
-- valueName:存储当前可用令牌数的键,格式为 {limitername}:value
-- 例如:myRateLimiter:value = 3 表示当前有3个可用令牌
local valueName = KEYS[2]
-- permitsName:存储请求历史的有序集合键,格式为 {limitername}:permits
-- 使用ZSET结构,score为请求时间戳,value为打包的请求信息
local permitsName = KEYS[4]
-- 单机模式特殊处理:使用不同的键名
if type == "1" then
valueName = KEYS[3]
permitsName = KEYS[5]
end
-- 验证请求的令牌数不超过配置的速率
-- 例如:如果rate=5,那么单次请求不能要求超过5个令牌
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
-- 获取当前可用的令牌数
-- 首次请求时为null,后续请求会返回当前剩余的令牌数
local currentValue = redis.call("get", valueName)
-- 主要处理逻辑
if currentValue ~= false then
-- === 清理过期请求,释放令牌 ===
-- 示例:当前时间为15000ms,interval=1000ms
-- 则查找score在[0, 14000]范围内的请求,这些都是过期请求
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
-- 计算要释放的令牌总数
local released = 0
-- 遍历过期的请求记录,累加它们占用的令牌数
-- struct.unpack("fI", v) 解包:f=随机数(float),I=请求的令牌数(integer)
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("fI", v)
released = released + permits
end
-- 如果有过期的令牌需要释放
-- 示例:如果找到两个过期请求,每个占用1个令牌,则released=2
if released > 0 then
-- 从ZSET中删除这些过期请求的记录
redis.call("zrem", permitsName, unpack(expiredValues))
-- 将释放的令牌加回到可用令牌池中
-- 例如:如果当前有1个令牌,释放2个,则更新后有3个可用令牌
currentValue = tonumber(currentValue) + released
redis.call("set", valueName, currentValue)
end
-- === 处理当前请求 ===
-- 检查令牌是否足够
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 令牌不足,计算需要等待的时间
-- 查找最近一次请求的时间,用于计算下一次可以获取令牌的时间点
-- 示例:如果最近一次请求在14500ms,当前在15000ms,interval=1000ms
-- 则需要等待到15500ms才能获取新令牌
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1)
local random, permits = struct.unpack("fI", nearest[1])
-- 返回需要等待的毫秒数
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
-- 令牌充足,记录本次请求并扣减令牌
-- ARGV[2]:当前时间戳
-- ARGV[3]:用于生成随机数的种子
-- ARGV[1]:请求的令牌数
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- 扣减使用的令牌数
redis.call("decrby", valueName, ARGV[1])
-- 返回nil表示可以立即获取令牌
return nil
end
else
-- === 首次请求的特殊处理 ===
-- 初始化令牌桶,设置满额令牌
-- 例如:rate=5时,设置5个初始令牌
redis.call("set", valueName, rate)
-- 记录首次请求信息到ZSET
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- 扣减本次使用的令牌数
redis.call("decrby", valueName, ARGV[1])
-- 返回nil表示可以立即获取令牌
return nil
end
具体示例场景
假设配置为每秒 5 个令牌(rate=5, interval=1000ms),模拟以下请求序列:
-
首次请求(t=1000ms,请求 1 个令牌):
- 初始化令牌桶:5 个令牌
- 记录请求到 ZSET:score=1000
- 扣减 1 个令牌,剩余 4 个
- 返回 nil(立即通过)
-
快速连续请求(t=1100ms,请求 2 个令牌):
- 当前有 4 个令牌,足够使用
- 记录请求到 ZSET:score=1100
- 扣减 2 个令牌,剩余 2 个
- 返回 nil(立即通过)
-
令牌不足请求(t=1200ms,请求 3 个令牌):
- 当前只有 2 个令牌,不够使用
- 查找最近请求时间(1100ms)
- 计算等待时间:需要等到 2000ms 才能获得新的令牌
- 返回 800ms 的等待时间
-
新的时间窗口请求(t=2100ms,请求 1 个令牌):
- 检查过期请求:找到 1000ms 和 1100ms 的请求
- 释放这些请求占用的令牌(1+2=3 个)
- 当前可用令牌:2+3=5 个
- 记录新请求,扣减 1 个令牌
- 返回 nil(立即通过)
这个实现通过巧妙运用 Redis 的数据结构和 Lua 脚本的原子性,实现了高效的分布式限流控制。它的关键特点是:
- 懒惰删除:只在必要时才清理过期请求
- 精确控制:通过时间戳和 ZSET 实现精确的时间窗口控制
- 非阻塞:通过返回等待时间而不是直接阻塞来处理令牌不足的情况