RateLimiter算法与使用

服务降级是服务自我保护的一种方式,或者保护下游服务的一种方式,用于确保服务不会受请求突增影响变得不可用,至少确保服务不会奔溃。常见的服务降级实现方式有:开关降级、限流降级、熔断降级

基础限流算法

  • 令牌桶算法(Token Bucket)
  • 漏桶算法(Leaky Bucket)
  • 滑动窗口算法(Sliding Window)
  • 计数器算法(Fixed Window Counter)

令牌桶算法(Token Bucket)

系统以固定速率向桶中放入令牌,请求需要先获取令牌才能被处理:

  • 可以应对突发流量
  • 具有缓冲能力
  • 实现相对简单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TokenBucketLimiter {
private final long capacity; // 桶的容量
private final double refillTokensPerSecond;
private double availableTokens; // 当前令牌数量
private long lastRefillTimestamp;

public TokenBucketLimiter(long capacity, double refillTokensPerSecond) {
this.capacity = capacity;
this.refillTokensPerSecond = refillTokensPerSecond;
this.availableTokens = capacity;
this.lastRefillTimestamp = System.nanoTime();
}

public synchronized boolean tryAcquire() {
refill();
if (availableTokens >= 1) {
availableTokens -= 1;
return true;
}
return false;
}

private void refill() {
long now = System.nanoTime();
double tokensToAdd = (now - lastRefillTimestamp) * refillTokensPerSecond / 1e9;
availableTokens = Math.min(capacity, availableTokens + tokensToAdd);
lastRefillTimestamp = now;
}
}

漏桶算法(Leaky Bucket)

请求先进入到漏桶,漏桶以固定的速率处理请求。

  • 控制流量的平滑性好
  • 无法应对突发流量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LeakyBucketLimiter {
private final long capacity; // 桶的容量
private final double leakRatePerSecond;
private double currentVolume; // 当前水量
private long lastLeakTimestamp;

public LeakyBucketLimiter(long capacity, double leakRatePerSecond) {
this.capacity = capacity;
this.leakRatePerSecond = leakRatePerSecond;
this.currentVolume = 0;
this.lastLeakTimestamp = System.nanoTime();
}

public synchronized boolean tryAcquire() {
leak();
if (currentVolume < capacity) {
currentVolume++;
return true;
}
return false;
}

private void leak() {
long now = System.nanoTime();
double timeElapsed = (now - lastLeakTimestamp) / 1e9;
double leakedVolume = timeElapsed * leakRatePerSecond;
currentVolume = Math.max(0, currentVolume - leakedVolume);
lastLeakTimestamp = now;
}
}

滑动窗口算法(Sliding Window)

将时间划分为多个小窗口,统计每个窗口内的请求数。

  • 相比固定窗口更平滑
  • 能够避免临界问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SlidingWindowLimiter {
private final int windowSizeInSeconds;
private final int maxRequestsPerWindow;
private final Queue<Long> requestTimestamps;

public SlidingWindowLimiter(int windowSizeInSeconds, int maxRequestsPerWindow) {
this.windowSizeInSeconds = windowSizeInSeconds;
this.maxRequestsPerWindow = maxRequestsPerWindow;
this.requestTimestamps = new LinkedList<>();
}

public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
long windowStart = currentTime - (windowSizeInSeconds * 1000);

// 移除窗口外的请求记录
while (!requestTimestamps.isEmpty() && requestTimestamps.peek() <= windowStart) {
requestTimestamps.poll();
}

if (requestTimestamps.size() < maxRequestsPerWindow) {
requestTimestamps.offer(currentTime);
return true;
}
return false;
}
}

计数器算法(Fixed Window Counter)

在固定时间窗口内统计请求数量。

  • 实现最简单
  • 可能存在临界问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class FixedWindowLimiter {
private final int maxRequestsPerWindow;
private final long windowSizeInMillis;
private long windowStart;
private int currentCount;

public FixedWindowLimiter(int maxRequestsPerWindow, long windowSizeInMillis) {
this.maxRequestsPerWindow = maxRequestsPerWindow;
this.windowSizeInMillis = windowSizeInMillis;
this.windowStart = System.currentTimeMillis();
this.currentCount = 0;
}

public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
if (currentTime - windowStart >= windowSizeInMillis) {
// 新窗口开始
currentCount = 0;
windowStart = currentTime;
}

if (currentCount < maxRequestsPerWindow) {
currentCount++;
return true;
}
return false;
}
}

算法对比

算法 核心优势 缺陷 适用场景
计数器 实现简单 临界值问题 低频验证码
滑动窗口 精度高 计算复杂度高 API接口QPS限制
令牌桶 支持突发流量 需要维护令牌池 秒杀、直播等高并发
漏桶 强制平滑输出 无法应对突发流量 API网关流量整形

SpringBoot集成

  • GuavaRateLimiter
  • Resilience4j
  • RedisRateLimiter
  • SlidingWindowRateLimiter

定义配置项与类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
- RateLimiterProperties
@Setter
@Getter
@ConfigurationProperties(prefix = "rate-limiter")
public class RateLimiterProperties {

/**
* 是否启用限流功能
*/
private boolean enabled = true;

/**
* 默认限流类型: GUAVA, REDIS, RESILIENCE4J, SLIDING_WINDOW
*/
private String defaultType = RateLimiterType.GUAVA.name();

/**
* 默认每个时间窗口允许的请求数
*/
private int defaultPermits = 100;

/**
* 默认时间窗口大小(秒)
*/
private int defaultTimeWindowSeconds = 1;

/**
* 限流触发时的默认响应消息
*/
private String defaultMessage = "请求过于频繁,请稍后再试";

/**
* 自定义接口限流配置
* Key 是 API 路径或方法名,Value 是限流配置
*/
private Map<String, ApiRateLimit> apis = new HashMap<>();

@Setter
@Getter
public static class ApiRateLimit {
private String type;
private int permits;
private int timeWindowSeconds;
private String message;
}
}
  • RateLimiterType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public enum RateLimiterType {
/**
* 基于Guava的RateLimiter
*/
GUAVA,

/**
* 基于Redis的分布式限流器
*/
REDIS,

/**
* 基于Resilience4j的限流器
*/
RESILIENCE4J,

/**
* 自定义滑动窗口限流器
*/
SLIDING_WINDOW
}

AutoConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@AutoConfiguration
@ConditionalOnClass(CustomerRateLimiter.class)
@ConditionalOnProperty(prefix = "rate-limiter", name = "enabled", havingValue = "true",
matchIfMissing = true)
@EnableConfigurationProperties(RateLimiterProperties.class)
@Import({RateLimiterRedisConfiguration.class, RateLimiterResilience4jConfiguration.class})
@ComponentScan("com.ares.factory.ratelimt")
public class RateLimiterAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public RateLimiterFactory rateLimiterFactory(RateLimiterProperties properties,
StringRedisTemplate redisTemplate) {
return new RateLimiterFactory(properties, redisTemplate);
}

@Bean
@ConditionalOnMissingBean
public RateLimitAspect rateLimitAspect(RateLimiterFactory rateLimiterFactory) {
return new RateLimitAspect(rateLimiterFactory);
}
}

@Configuration
@ConditionalOnClass(RateLimiterRegistry.class)
public class RateLimiterResilience4jConfiguration {
@Bean
@ConditionalOnMissingBean
public RateLimiterRegistry rateLimiterRegistry() {
return RateLimiterRegistry.ofDefaults();
}
}


@Configuration
@ConditionalOnClass(StringRedisTemplate.class)
public class RateLimiterRedisConfiguration {

@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}

RateLimiterFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
public class RateLimiterFactory {

private final Map<String, CustomRateLimiter> rateLimiterCache = new ConcurrentHashMap<>();

private final RateLimiterProperties properties;
private final StringRedisTemplate stringRedisTemplate;

public RateLimiterFactory(RateLimiterProperties properties,
StringRedisTemplate stringRedisTemplate) {
this.properties = properties;
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* 根据限流注解创建相应的限流器
*/
public CustomRateLimiter createRateLimiter(RateLimiter rateLimit, String fallbackKey) {
String key = rateLimit.key().isEmpty() ? fallbackKey : rateLimit.key();

return rateLimiterCache.computeIfAbsent(key, v -> {
RateLimiterType type = rateLimit.type();

// 如果配置了该key的特定配置,则应用配置
if (properties.getApis().containsKey(key)) {

RateLimiterProperties.ApiRateLimit apiConfig = properties.getApis().get(key);

if (apiConfig.getType() != null) {
type = RateLimiterType.valueOf(apiConfig.getType());
}

int permits = apiConfig.getPermits() > 0 ? apiConfig.getPermits() : rateLimit.permits();
int timeWindow = apiConfig.getTimeWindowSeconds() > 0 ? apiConfig.getTimeWindowSeconds()
: (int) rateLimit.timeUnit().toSeconds(rateLimit.timeWindow());
return switch (type) {
case REDIS -> createRedisRateLimiter(permits, timeWindow);
case RESILIENCE4J -> createResilience4jRateLimiter(permits, timeWindow);
case SLIDING_WINDOW -> createSlidingWindowRateLimiter(permits, timeWindow);
default -> createGuavaRateLimiter(permits, timeWindow);
};
}

// 否则使用注解上的配置
return switch (type) {
case REDIS -> createRedisRateLimiter(rateLimit);
case RESILIENCE4J -> createResilience4jRateLimiter(rateLimit);
case SLIDING_WINDOW -> createSlidingWindowRateLimiter(rateLimit);
default -> createGuavaRateLimiter(rateLimit);
};
});
}


/**
* 从配置中创建限流器
*/

public CustomRateLimiter createRateLimiterFromProperties(String key) {

RateLimiterType type = RateLimiterType.valueOf(properties.getDefaultType());
int permits = properties.getDefaultPermits();
int timeWindow = properties.getDefaultTimeWindowSeconds();

if (properties.getApis().containsKey(key)) {
RateLimiterProperties.ApiRateLimit apiConfig = properties.getApis().get(key);
if (apiConfig.getType() != null) {
type = RateLimiterType.valueOf(apiConfig.getType());
}
if (apiConfig.getPermits() > 0) {
permits = apiConfig.getPermits();
}
if (apiConfig.getTimeWindowSeconds() > 0) {
timeWindow = apiConfig.getTimeWindowSeconds();
}
}
return switch (type) {
case REDIS -> createRedisRateLimiter(permits, timeWindow);
case RESILIENCE4J -> createResilience4jRateLimiter(permits, timeWindow);
case SLIDING_WINDOW -> createSlidingWindowRateLimiter(permits, timeWindow);
default -> createGuavaRateLimiter(permits, timeWindow);
};
}

private CustomRateLimiter createGuavaRateLimiter(RateLimiter rateLimiter) {
double permitsPerSecond = calculatePermitsPerSecond(rateLimiter);
return new GuavaRateLimiter(permitsPerSecond);
}

private CustomRateLimiter createGuavaRateLimiter(int permits, int timeWindowSeconds) {
double permitsPerSecond = (double) permits / timeWindowSeconds;
return new GuavaRateLimiter(permitsPerSecond);
}

private CustomRateLimiter createRedisRateLimiter(RateLimiter rateLimit) {
if (stringRedisTemplate == null) {
throw new IllegalStateException("Redis template is not available for Redis rate limiter");
}
int windowSeconds = (int) rateLimit.timeUnit().toSeconds(rateLimit.timeWindow());
return new RedisRateLimiter(stringRedisTemplate, rateLimit.permits(), windowSeconds);
}

private CustomRateLimiter createRedisRateLimiter(int permits, int timeWindowSeconds) {
if (stringRedisTemplate == null) {
throw new IllegalStateException("Redis template is not available for Redis rate limiter");
}
return new RedisRateLimiter(stringRedisTemplate, permits, timeWindowSeconds);
}

private CustomRateLimiter createResilience4jRateLimiter(RateLimiter rateLimit) {
return new Resilience4jRateLimiter(
rateLimit.permits(),
rateLimit.timeWindow(),
rateLimit.timeUnit());
}

private CustomRateLimiter createResilience4jRateLimiter(int permits, int timeWindowSeconds) {
return new Resilience4jRateLimiter(
permits,
timeWindowSeconds,
TimeUnit.SECONDS);
}

private CustomRateLimiter createSlidingWindowRateLimiter(RateLimiter rateLimit) {
return new SlidingWindowRateLimiter(
rateLimit.permits(),
rateLimit.timeWindow(),
rateLimit.timeUnit());
}

private CustomRateLimiter createSlidingWindowRateLimiter(int permits, int timeWindowSeconds) {
return new SlidingWindowRateLimiter(
permits,
timeWindowSeconds,
TimeUnit.SECONDS);
}

private double calculatePermitsPerSecond(RateLimiter rateLimit) {
// 将配置的时间窗口和许可数转换为每秒许可数
double timeWindowInSeconds = rateLimit.timeUnit().toSeconds(rateLimit.timeWindow());
if (timeWindowInSeconds == 0) {
timeWindowInSeconds = 1; // 防止除零
}
return (double) rateLimit.permits() / timeWindowInSeconds;
}
}

CustomRateLimiter 接口与实现

接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface CustomRateLimiter {
/**
* 尝试获取许可
*
* @param key 限流标识
* @return 是否获取成功
*/
boolean tryAcquire(String key);

/**
* 释放许可(如需要)
*
* @param key 限流标识
*/
default void release(String key) {
// 默认空实现
}
}

实现

  • SlidingWindowRateLimiter实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SlidingWindowRateLimiter implements CustomRateLimiter {
private final Map<String, Queue<Long>> windowMap = new ConcurrentHashMap<>();
private final int maxPermits;
private final long windowMillis;


public SlidingWindowRateLimiter(int maxPermits, int timeWindow, TimeUnit timeUnit) {
this.maxPermits = maxPermits;
this.windowMillis = timeUnit.toMillis(timeWindow);
}

@Override
public synchronized boolean tryAcquire(String key) {
long currentTime = Instant.now().toEpochMilli();
Queue<Long> window = windowMap.computeIfAbsent(key, k -> new ConcurrentLinkedQueue<>());
// 清理过期的时间戳
while (!window.isEmpty() && currentTime - window.peek() > windowMillis) {
window.poll();
} // 判断是否超过限制
if (window.size() < maxPermits) {
window.offer(currentTime);
return true;
}
return false;
}
}
  • Resilience4jRateLimiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Resilience4jRateLimiter implements CustomRateLimiter {

private final Map<String, RateLimiter> limiters = new ConcurrentHashMap<>();
private final int limitForPeriod;
private final int timeWindow;
private final TimeUnit timeUnit;

public Resilience4jRateLimiter(int limitForPeriod, int timeWindow, TimeUnit timeUnit) {
this.timeUnit = timeUnit;
this.timeWindow = timeWindow;
this.limitForPeriod = limitForPeriod;
}

@Override
public boolean tryAcquire(String key) {
RateLimiter rateLimiter = limiters.computeIfAbsent(key, v -> {
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(limitForPeriod)
.limitRefreshPeriod(Duration.ofMillis(timeUnit.toMillis(timeWindow)))
.timeoutDuration(Duration.ofMillis(0)) // 非阻塞
.build();
return RateLimiterRegistry.of(config).rateLimiter(key);
});
return rateLimiter.acquirePermission();
}
}
  • RedisRateLimiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RedisRateLimiter implements CustomRateLimiter {

private final StringRedisTemplate stringRedisTemplate;
private final int maxPermits;
private final int windowSeconds;

private static final String REDIS_SCRIPT =
"local key = KEYS[1] " +
"local currentCount = redis.call('incr', key) " +
"if tonumber(currentCount) == 1 then " +
" redis.call('expire', key, ARGV[1]) " +
"end " +
"return currentCount <= tonumber(ARGV[2])";


public RedisRateLimiter(StringRedisTemplate stringRedisTemplate, int maxPermits,
int windowSeconds) {
this.maxPermits = maxPermits;
this.windowSeconds = windowSeconds;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryAcquire(String key) {
DefaultRedisScript<Boolean> script = new DefaultRedisScript<>(REDIS_SCRIPT, Boolean.class);

Boolean result =
stringRedisTemplate.execute(script, Collections.singletonList("rate:limit:" + key),
String.valueOf(windowSeconds), String.valueOf(maxPermits));
return result != null && result;
}
}
  • GuavaRateLimiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class GuavaRateLimiter implements CustomRateLimiter {

private final Map<String, RateLimiter> limiters = new ConcurrentHashMap<>();
private final double permitsPerSecond;

public GuavaRateLimiter(double permitsPerSecond) {
this.permitsPerSecond = permitsPerSecond;
}

@Override
public boolean tryAcquire(String key) {
RateLimiter rateLimiter =
limiters.computeIfAbsent(key, k -> RateLimiter.create(permitsPerSecond));
return rateLimiter.tryAcquire();
}
}

RateLimiter注解实现

  • @RateLimiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {

/**
* 限流唯一标识,默认为方法全限定名
*/
String key() default "";

/**
* 限流策略,支持不同类型的限流器
*/
RateLimiterType type() default RateLimiterType.GUAVA;

/**
* 限流时间窗口
*/
int timeWindow() default 1;

/**
* 时间单位
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;

/**
* 在时间窗口内允许通过的请求数
*/
int permits() default 50;

/**
* 获取令牌最大等待时间,0表示非阻塞
*/
long timeout() default 0;

/**
* 触发限流时的提示消息
*/
String message() default "请求过于频繁,请稍后再试";
}
  • RateLimiterAspect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Aspect
public class RateLimiterAspect {


private final RateLimiterProperties properties;


private final RateLimiterFactory rateLimiterFactory;

public RateLimiterAspect(RateLimiterProperties properties,
RateLimiterFactory rateLimiterFactory) {
this.properties = properties;
this.rateLimiterFactory = rateLimiterFactory;
}

@Around("@annotation(com.ares.ratelimiter.annotation.RateLimiter)")
public Object rateLimit(ProceedingJoinPoint point) throws Throwable {

if (!properties.isEnabled()) {
return point.proceed();
}

MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();

RateLimiter rateLimit = method.getAnnotation(RateLimiter.class);

String methodName = method.getDeclaringClass().getName() + "." + method.getName();
CustomRateLimiter limiter = rateLimiterFactory.createRateLimiter(rateLimit, methodName);

try {
if (limiter.tryAcquire(methodName)) {
return point.proceed();
} else {
// 获取自定义消息或使用默认消息
String message = rateLimit.message();
if (message.isEmpty()) {
message = properties.getDefaultMessage();
}
if (properties.getApis().containsKey(methodName)
&& properties.getApis().get(methodName).getMessage() != null) {
message = properties.getApis().get(methodName).getMessage();
}
throw new ResponseStatusException(HttpStatus.TOO_MANY_REQUESTS, message);
}
} finally {
limiter.release(methodName); // 如果有需要释放的资源
}
}
}

待优化扩展方向

  • 动态配置
  • 支持多级限流
  • 支持降级处理
  • 支持用户级别或IP级别限流
  • 限流指标监控

RateLimiter算法与使用
http://example.com/2025/06/10/架构-RateLimiter基础/
作者
ares
发布于
2025年6月10日
许可协议