限流器是什么
在并发系统中,为了防止对系统资源的频繁访问,造成系统压力增大,影响系统稳定性,而采用的限制流量访问的技术
常见的限流器
计数器限流器:在一定时间范围内,对访问进行计数并判断是否超过限制
漏桶限流器:像漏桶一样,以稳定速率将访问流量流出
令牌桶限流器:每次访问消耗一个令牌,令牌数量没有达到最大时,会以稳定速率产生令牌
分布式限流器:利用 Redis 等中心化存储中间件管理流量,适合对集群进行整体限流
限流器接口
@Getter
public abstract class RateLimiter {
/**
* 限流单位时间,秒
*/
protected final Long time;
/**
* 时间单位
*/
protected final TimeUnit timeUnit;
/**
* 限流时间内可通过的次数
*/
protected final Long limit;
public RateLimiter(Long time, TimeUnit timeUnit, Long limit) {
this.time = time;
this.timeUnit = timeUnit;
this.limit = limit;
}
/**
* 检查是否被限流,true 表示被限流
*
* @return 是否被限流
*/
public abstract boolean rateLimited();
}
固定窗口限流器
维护一个计数器,在一段时间内,对访问数量进行限制,过了该时间段后重置计数器
缺点:临界问题,在时间段的结尾和下个时间段瞬时流量可能达到两倍的限制数
public class CounterRateLimiter extends RateLimiter {
/**
* 计数器
*/
private long counter = 0;
/**
* 时间戳
*/
private volatile long timestamp;
public CounterRateLimiter(Long time, TimeUnit timeUnit, Long limit) {
super(time, timeUnit, limit);
timestamp = System.currentTimeMillis();
}
@Override
public synchronized boolean rateLimited() {
long nowStamp = System.currentTimeMillis();
// 如果已经超过时间窗口
if (nowStamp - timestamp > timeUnit.toMillis(time)) {
// 重置
timestamp = nowStamp;
counter = 0;
return false;
}
if (counter < limit) {
counter++;
return false;
}
return true;
}
}
测试
@SneakyThrows
public static void test() {
// 限制 5s 内最多 5 次访问
RateLimiter limiter = new CounterRateLimiter(5L, TimeUnit.SECONDS, 5L);
// 睡眠 5s 让访问集中在最后一秒
Thread.sleep(4000L);
// 模拟访问 5 次
for (int i = 0; i < 5; i++) {
log.info("{}", limiter.rateLimited());
}
// 睡眠 1s 保证已经到下一个时间周期
Thread.sleep(1000L);
// 模拟访问 5 次
for (int i = 0; i < 5; i++) {
log.info("{}", limiter.rateLimited());
}
}
从结果可以看到固定窗口限流的缺点,在两秒内就通过了十次访问,是限制数量的二倍
滑动窗口限流器
不以固定的时间窗口为界,改为每次访问基于请求时间,向前看一个时间周期,根据周期内的请求数来判断是否达到限制
public class CounterRateLimiter extends RateLimiter {
/**
* 时间戳 访问数量 MAP
*/
private final TreeMap<Long, Long> countMap;
public CounterRateLimiter(Long time, TimeUnit timeUnit, Long limit) {
super(time, timeUnit, limit);
countMap = new TreeMap<>();
}
@Override
public synchronized boolean rateLimited() {
long nowStamp = System.currentTimeMillis();
// 向前找一个时间周期
long startStamp = nowStamp - timeUnit.toMillis(time);
// 获取已经过时的访问
SortedMap<Long, Long> headMap = countMap.headMap(startStamp);
// 清理已经过时的访问
headMap.clear();
// 求和访问次数
Long total = countMap.values().stream().reduce(0L, Long::sum);
if (total < limit) {
// 加入 MAP
countMap.merge(nowStamp, 1L, Long::sum);
return false;
}
return true;
}
}
和固定窗口一样的测试代码,结果如下
漏桶算法
桶容量设为限制数量,访问量类比成水量,每次访问时比较当前水量和桶容量的大小,判断是否可以继续加水,水量达到桶总容量时限流。在每次访问时计算距离上次访问流出的水量以更新当前水量
漏桶算法限流器
通过时间、时间单位以及 limit 可以计算单位时间允许的访问量
根据当前时间计算出距离上次访问应该流出的水量,得出当前水量,与 limit 比较,判断是否限流
public class LeakyBucketRateLimiter extends RateLimiter {
/**
* 上次访问时间戳
*/
private volatile long timestamp;
/**
* 漏桶内剩余水量
*/
private volatile long left;
public LeakyBucketRateLimiter(Long time, TimeUnit timeUnit, Long limit) {
super(time, timeUnit, limit);
timestamp = System.currentTimeMillis();
}
@Override
public synchronized boolean rateLimited() {
// 当前时间
long now = System.currentTimeMillis();
// 计算距离上次访问时间差与单位时间段之比
long interval = (now - timestamp) / timeUnit.toMillis(time);
// 更新剩余水量
// interval * limit 表示本次访问距离上次流出了多少水量
left = Math.max(0, left - interval * limit);
timestamp = now;
// 判断水是否溢出
if (left < limit) {
left++;
return false;
}
return true;
}
}
令牌桶算法
桶容量为令牌数量,每次访问消耗一个令牌,并在每次访问时根据过去的时间计算应该放入多少个令牌,没有令牌时拒绝访问
令牌桶算法限流器
令牌桶算法和漏桶的思维方式相反
令牌桶是减到没令牌时限流,漏桶是水加满时限流
public class TokenBucketRateLimiter extends RateLimiter {
/**
* 上次访问时间戳
*/
private volatile long timestamp;
/**
* 令牌桶桶内剩余令牌数量
*/
private volatile long left;
public TokenBucketRateLimiter(Long time, TimeUnit timeUnit, Long limit) {
super(time, timeUnit, limit);
}
@Override
public synchronized boolean rateLimited() {
// 当前时间
long now = System.currentTimeMillis();
// 计算距离上次访问时间差与单位时间段之比
long interval = (now - timestamp) / timeUnit.toMillis(time);
// 更新令牌数
// interval * limit 表示本次访问距离上次产生了多少令牌
left = Math.min(limit, left + interval * limit);
timestamp = now;
if (left > 0) {
left--;
return false;
}
return true;
}
}
Guava 的 RateLimiter 工具类提供了一个高性能的限流器实现
Guava RateLimiter 基于令牌桶算法
有平滑突发限流 SmoothBursty 和平滑预热限流 SmoothWarmingUp 两种实现
使用示例
// 创建限流器
// 每秒五次访问
RateLimiter limiter = RateLimiter.create(5);
// 调用限流器
// 阻塞式等待获取令牌
limiter.acquire();
// 非阻塞式获取令牌
boolean ok = limiter.tryAcquire();
acquire 和 tryAcquire 可以指定一次请求的令牌数量
tryAcquire 可以指定尝试的时长
Spring AOP + 注解配置限流
限流注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimit {
@AliasFor("limitPerSecond")
double value() default 1;
/**
* 每秒限制数量
*/
@AliasFor("value")
double limitPerSecond() default 1;
/**
* 是否阻塞等待
*/
boolean blockWait() default false;
/**
* 等待时间
*/
long time() default 0;
/**
* 等待时间单位
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
}
限流切面
@Slf4j
@Aspect
@Component
public class RateLimitAspect {
private static final ConcurrentHashMap<String, RateLimiter> LIMITER_MAP = new ConcurrentHashMap<>();
/**
* 配置 RateLimit 注解切入点
*/
@Pointcut("@within(com.example.annotation.RateLimit) " +
"|| @annotation(com.example.annotation.RateLimit)")
public void pointCut() {
}
@Before("pointCut()")
public void rateLimit(JoinPoint point) {
// 获取方法上的注解
MethodSignature methodSignature = (MethodSignature) point.getSignature();
// 获取注解
RateLimit rateLimit = methodSignature.getMethod().getAnnotation(RateLimit.class);
// 获取 RateLimiter
RateLimiter rateLimiter =
LIMITER_MAP.computeIfAbsent(methodSignature.toLongString(), m -> RateLimiter.create(rateLimit.limitPerSecond()));
// 判断是否需要阻塞等待
if (!rateLimit.blockWait()) {
boolean acquireOk = false;
if (rateLimit.time() != 0) {
acquireOk = rateLimiter.tryAcquire(rateLimit.time(), rateLimit.timeUnit());
} else {
acquireOk = rateLimiter.tryAcquire();
}
if (acquireOk) {
return;
}
throw new RuntimeException("请稍后再尝试访问");
}
rateLimiter.acquire();
}
}