跳到主要内容

Rate Limiter 中间件

Rate Limiter 中间件用于限制 API 的访问速率,防止滥用和保护服务器资源。基于令牌桶算法实现。

使用方法

基础用法

import (
"go-slim.dev/slim"
"go-slim.dev/slim/middleware"
"golang.org/x/time/rate"
)

func main() {
s := slim.New()

// 创建速率限制器存储(20 请求/秒)
limiterStore := middleware.NewRateLimiterMemoryStore(20)

s.GET("/api/limited", func(c slim.Context) error {
return c.String(200, "success")
}, middleware.RateLimiter(limiterStore))

s.Start(":8080")
}

全局速率限制

s := slim.New()

// 为所有路由应用速率限制
limiterStore := middleware.NewRateLimiterMemoryStore(20)
s.Use(middleware.RateLimiter(limiterStore))

s.GET("/api/users", handler)
s.POST("/api/users", handler)

自定义配置

s := slim.New()

limiterStore := middleware.NewRateLimiterMemoryStoreWithConfig(
middleware.RateLimiterMemoryStoreConfig{
Rate: 10, // 10 请求/秒
Burst: 30, // 突发容量
ExpiresIn: 3 * time.Minute, // 限制器过期时间
},
)

s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: limiterStore,
IdentifierExtractor: func(c slim.Context) (string, error) {
// 基于 IP 地址限流
return c.RealIP(), nil
},
ErrorHandler: func(c slim.Context, err error) error {
return c.JSON(403, map[string]string{
"error": "无法识别客户端",
})
},
DenyHandler: func(c slim.Context, identifier string, err error) error {
return c.JSON(429, map[string]string{
"error": "请求过于频繁,请稍后再试",
})
},
}))

配置选项

RateLimiterConfig

type RateLimiterConfig struct {
// BeforeFunc 在速率限制检查之前执行
// 可选
BeforeFunc BeforeFunc

// IdentifierExtractor 从上下文中提取访客标识符
// 可选。默认使用 IP 地址
IdentifierExtractor Extractor

// Store 定义速率限制器的存储实现
// 必需
Store RateLimiterStore

// ErrorHandler 当 IdentifierExtractor 返回错误时调用
// 可选。默认返回 403 Forbidden
ErrorHandler func(context slim.Context, err error) error

// DenyHandler 当速率限制器拒绝访问时调用
// 可选。默认返回 429 Too Many Requests
DenyHandler func(context slim.Context, identifier string, err error) error
}

RateLimiterMemoryStoreConfig

type RateLimiterMemoryStoreConfig struct {
// Rate 允许通过的请求速率(请求/秒)
// 必需
Rate rate.Limit

// Burst 最大突发请求数
// 可选。默认为 Rate 的向下取整值
Burst int

// ExpiresIn 限制器清理的过期时间
// 可选。默认值 3 分钟
ExpiresIn time.Duration
}

使用示例

1. 基于 IP 地址限流

// 每个 IP 每秒最多 20 个请求
limiterStore := middleware.NewRateLimiterMemoryStore(20)

s.Use(middleware.RateLimiter(limiterStore))

2. 基于用户 ID 限流

limiterStore := middleware.NewRateLimiterMemoryStore(10)

s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: limiterStore,
IdentifierExtractor: func(c slim.Context) (string, error) {
// 从 JWT 或 session 中获取用户 ID
userID := c.Get("user_id")
if userID == nil {
return "", errors.New("未登录")
}
return fmt.Sprintf("user:%v", userID), nil
},
}))

3. 基于 API Key 限流

limiterStore := middleware.NewRateLimiterMemoryStore(100)

s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: limiterStore,
IdentifierExtractor: func(c slim.Context) (string, error) {
apiKey := c.Request().Header.Get("X-API-Key")
if apiKey == "" {
return "", errors.New("缺少 API Key")
}
return fmt.Sprintf("apikey:%s", apiKey), nil
},
ErrorHandler: func(c slim.Context, err error) error {
return c.JSON(401, map[string]string{
"error": "无效的 API Key",
})
},
}))

4. 不同路由不同限流策略

// 公开 API:每秒 10 个请求
publicLimiter := middleware.NewRateLimiterMemoryStore(10)

// 认证 API:每秒 50 个请求
authLimiter := middleware.NewRateLimiterMemoryStore(50)

// 公开路由组
public := s.Group("/api/public", middleware.RateLimiter(publicLimiter))
public.GET("/posts", listPosts)

// 认证路由组
auth := s.Group("/api/auth", middleware.RateLimiter(authLimiter))
auth.GET("/users", listUsers)
auth.POST("/users", createUser)

5. 配置突发容量

// 允许 10 请求/秒,但可以突发到 30 个请求
limiterStore := middleware.NewRateLimiterMemoryStoreWithConfig(
middleware.RateLimiterMemoryStoreConfig{
Rate: 10, // 10 请求/秒
Burst: 30, // 突发容量 30
},
)

s.Use(middleware.RateLimiter(limiterStore))

说明:当短时间内有大量请求时,突发容量允许在速率限制达到之前通过更多请求。

6. 自定义错误响应

s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: middleware.NewRateLimiterMemoryStore(20),
DenyHandler: func(c slim.Context, identifier string, err error) error {
// 返回自定义响应
c.Response().Header().Set("X-RateLimit-Limit", "20")
c.Response().Header().Set("X-RateLimit-Remaining", "0")
c.Response().Header().Set("Retry-After", "60")

return c.JSON(429, map[string]any{
"error": "速率限制",
"message": "您的请求过于频繁,请稍后再试",
"retry_after": 60,
})
},
}))

7. 跳过特定 IP

whitelistedIPs := map[string]bool{
"127.0.0.1": true,
"10.0.0.1": true,
}

s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: middleware.NewRateLimiterMemoryStore(20),
BeforeFunc: func(c slim.Context) {
ip := c.RealIP()
if whitelistedIPs[ip] {
// 跳过速率限制检查
c.Set("skip_rate_limit", true)
}
},
IdentifierExtractor: func(c slim.Context) (string, error) {
if skip := c.Get("skip_rate_limit"); skip != nil {
return "", errors.New("whitelisted")
}
return c.RealIP(), nil
},
ErrorHandler: func(c slim.Context, err error) error {
if err.Error() == "whitelisted" {
return nil // 放行
}
return middleware.ErrExtractorError
},
}))

令牌桶算法

Rate Limiter 使用令牌桶算法(Token Bucket Algorithm):

  1. 桶容量(Burst):桶中可以存储的最大令牌数
  2. 填充速率(Rate):每秒向桶中添加的令牌数
  3. 请求处理
    • 每个请求需要消耗一个令牌
    • 如果桶中有令牌,请求通过,消耗一个令牌
    • 如果桶中没有令牌,请求被拒绝

示例

// Rate: 10, Burst: 30
// - 初始桶中有 30 个令牌
// - 可以立即处理 30 个请求
// - 之后每秒可以处理 10 个请求
limiterStore := middleware.NewRateLimiterMemoryStoreWithConfig(
middleware.RateLimiterMemoryStoreConfig{
Rate: 10,
Burst: 30,
},
)

内存存储特性

特点

  • 适用于中等负载的场景
  • 简单易用,无需外部依赖
  • 自动清理过期的限制器

限制

  • 并发性能:100+ 并发请求可能会出现锁竞争
  • 内存使用:16000+ 不同 IP 地址可能影响性能
  • 单机限制:不支持分布式限流

何时需要自定义存储

对于高负载场景,建议实现自定义存储(如 Redis):

type RedisRateLimiterStore struct {
client *redis.Client
rate rate.Limit
burst int
}

func (r *RedisRateLimiterStore) Allow(identifier string) (bool, error) {
// 使用 Redis 实现分布式限流
// 可以使用 Redis 的 INCR + EXPIRE 或 Lua 脚本
// ...
}

// 使用自定义存储
s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: &RedisRateLimiterStore{
client: redisClient,
rate: 10,
burst: 30,
},
}))

默认配置

DefaultRateLimiterConfig = RateLimiterConfig{
IdentifierExtractor: func(ctx slim.Context) (string, error) {
return ctx.RealIP(), nil
},
ErrorHandler: func(c slim.Context, err error) error {
return &slim.HTTPError{
Code: 403,
Message: "error while extracting identifier",
Internal: err,
}
},
DenyHandler: func(c slim.Context, identifier string, err error) error {
return &slim.HTTPError{
Code: 429,
Message: "rate limit exceeded",
Internal: err,
}
},
}

DefaultRateLimiterMemoryStoreConfig = RateLimiterMemoryStoreConfig{
ExpiresIn: 3 * time.Minute,
}

最佳实践

1. 根据资源类型设置不同限制

// 读操作:100 请求/秒
readLimiter := middleware.NewRateLimiterMemoryStore(100)

// 写操作:10 请求/秒
writeLimiter := middleware.NewRateLimiterMemoryStore(10)

s.GET("/api/posts", handler, middleware.RateLimiter(readLimiter))
s.POST("/api/posts", handler, middleware.RateLimiter(writeLimiter))

2. 返回限流信息

s.Use(func(c slim.Context, next slim.HandlerFunc) error {
err := next(c)

// 在响应头中添加限流信息
c.Response().Header().Set("X-RateLimit-Limit", "100")
c.Response().Header().Set("X-RateLimit-Remaining", "95")
c.Response().Header().Set("X-RateLimit-Reset", "1699430400")

return err
})

s.Use(middleware.RateLimiter(limiterStore))

3. 监控限流情况

var rateLimitCounter int64

s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: middleware.NewRateLimiterMemoryStore(20),
DenyHandler: func(c slim.Context, identifier string, err error) error {
// 记录被限流的请求
atomic.AddInt64(&rateLimitCounter, 1)

// 记录日志
log.Printf("Rate limit exceeded for %s", identifier)

return c.JSON(429, map[string]string{
"error": "rate limit exceeded",
})
},
}))

// 定期导出指标
go func() {
ticker := time.NewTicker(1 * time.Minute)
for range ticker.C {
count := atomic.SwapInt64(&rateLimitCounter, 0)
log.Printf("Rate limited requests in last minute: %d", count)
}
}()

4. 渐进式限流

// 根据负载动态调整限流策略
func getDynamicRateLimit() rate.Limit {
load := getSystemLoad()
if load > 0.8 {
return 10 // 高负载时降低限制
} else if load > 0.5 {
return 50
}
return 100 // 低负载时放宽限制
}

5. 为登录用户提供更高限额

s.Use(middleware.RateLimiterWithConfig(middleware.RateLimiterConfig{
Store: middleware.NewRateLimiterMemoryStore(20), // 匿名用户:20/秒
IdentifierExtractor: func(c slim.Context) (string, error) {
// 已登录用户使用单独的限制器
if userID := c.Get("user_id"); userID != nil {
return fmt.Sprintf("user:%v", userID), nil
}
// 匿名用户基于 IP
return fmt.Sprintf("ip:%s", c.RealIP()), nil
},
}))

// 为登录用户添加更高的限额
authenticatedLimiter := middleware.NewRateLimiterMemoryStore(100) // 100/秒
s.Use(func(c slim.Context, next slim.HandlerFunc) error {
if c.Get("user_id") != nil {
return authenticatedLimiter.ToMiddleware()(c, next)
}
return next(c)
})

错误处理

预定义错误

// 速率限制超出
var ErrRateLimitExceeded = slim.NewHTTPError(http.StatusTooManyRequests, "rate limit exceeded")

// 提取器错误
var ErrExtractorError = slim.NewHTTPError(http.StatusForbidden, "error while extracting identifier")

性能考虑

内存存储

  • 适用于单机部署
  • 适用于中小型应用(< 10000 个不同标识符)
  • 并发性能在 100 请求/秒以下表现良好

分布式存储

对于分布式部署,建议使用 Redis 或其他分布式存储:

// 实现 RateLimiterStore 接口
type RateLimiterStore interface {
Allow(identifier string) (bool, error)
}

参考资料