mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-24 03:46:37 +02:00
Merge pull request #105 from faycheng/pkg/bbr-rate-limiter
Pkg/bbr rate limiter
This commit is contained in:
commit
6d2a0f4b3f
BIN
doc/img/ratelimit-benchmark-up-1.png
Normal file
BIN
doc/img/ratelimit-benchmark-up-1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 661 KiB |
BIN
doc/img/ratelimit-rolling-window.png
Normal file
BIN
doc/img/ratelimit-rolling-window.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 19 KiB |
@ -103,6 +103,32 @@ func Example() {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# 内置中间件
|
||||||
|
|
||||||
|
## 自适应限流
|
||||||
|
|
||||||
|
更多关于自适应限流的信息,请参考:[kratos 自适应限流](/doc/wiki-cn/ratelimit.md)
|
||||||
|
|
||||||
|
```go
|
||||||
|
func Example() {
|
||||||
|
myHandler := func(ctx *bm.Context) {
|
||||||
|
mid := metadata.Int64(ctx, metadata.Mid)
|
||||||
|
ctx.JSON(fmt.Sprintf("%d", mid), nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
e := bm.DefaultServer(nil)
|
||||||
|
|
||||||
|
// 挂载自适应限流中间件到 bm engine,使用默认配置
|
||||||
|
limiter := bm.NewRateLimiter(nil)
|
||||||
|
e.Use(limiter.Limit())
|
||||||
|
|
||||||
|
e.GET("/user", myHandler)
|
||||||
|
|
||||||
|
e.Start()
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
# 扩展阅读
|
# 扩展阅读
|
||||||
|
|
||||||
[bm快速开始](blademaster-quickstart.md) [bm模块说明](blademaster-mod.md) [bm基于pb生成](blademaster-pb.md)
|
[bm快速开始](blademaster-quickstart.md) [bm模块说明](blademaster-mod.md) [bm基于pb生成](blademaster-pb.md)
|
||||||
|
57
doc/wiki-cn/ratelimit.md
Normal file
57
doc/wiki-cn/ratelimit.md
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
# 自适应限流保护
|
||||||
|
|
||||||
|
kratos 借鉴了 Sentinel 项目的自适应限流系统,通过综合分析服务的 cpu 使用率、请求成功的 qps 和请求成功的 rt 来做自适应限流保护。
|
||||||
|
|
||||||
|
|
||||||
|
## 核心目标
|
||||||
|
|
||||||
|
* 自动嗅探负载和 qps,减少人工配置
|
||||||
|
* 削顶,保证超载时系统不被拖垮,并能以高水位 qps 继续运行
|
||||||
|
|
||||||
|
|
||||||
|
## 限流规则
|
||||||
|
|
||||||
|
1,指标介绍
|
||||||
|
|
||||||
|
|指标名称|指标含义|
|
||||||
|
|---|---|
|
||||||
|
|cpu|最近 1s 的 CPU 使用率均值,使用滑动平均计算,采样周期是 250ms|
|
||||||
|
|inflight|当前处理中正在处理的请求数量|
|
||||||
|
|pass|请求处理成功的量|
|
||||||
|
|rt|请求成功的响应耗时|
|
||||||
|
|
||||||
|
|
||||||
|
2,滑动窗口
|
||||||
|
|
||||||
|
在自适应限流保护中,采集到的指标的时效性非常强,系统只需要采集最近一小段时间内的 qps、rt 即可,对于较老的数据,会自动丢弃。为了实现这个效果,kratos 使用了滑动窗口来保存采样数据。
|
||||||
|
|
||||||
|
![ratelimit-rolling-window](/doc/img/ratelimit-rolling-window.png)
|
||||||
|
|
||||||
|
如上图,展示了一个具有两个桶(bucket)的滑动窗口(rolling window)。整个滑动窗口用来保存最近 1s 的采样数据,每个小的桶用来保存 500ms 的采样数据。
|
||||||
|
当时间流动之后,过期的桶会自动被新桶的数据覆盖掉,在图中,在 1000-1500ms 时,bucket 1 的数据因为过期而被丢弃,之后 bucket 3 的数据填到了窗口的头部。
|
||||||
|
|
||||||
|
|
||||||
|
3,限流公式
|
||||||
|
|
||||||
|
判断是否丢弃当前请求的算法如下:
|
||||||
|
|
||||||
|
`cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass * MinRt * windows / 1000) < InFlight`
|
||||||
|
|
||||||
|
MaxPass 表示最近 5s 内,单个采样窗口中最大的请求数。
|
||||||
|
MinRt 表示最近 5s 内,单个采样窗口中最小的响应时间。
|
||||||
|
windows 表示一秒内采样窗口的数量,默认配置中是 5s 50 个采样,那么 windows 的值为 10。
|
||||||
|
|
||||||
|
## 压测报告
|
||||||
|
|
||||||
|
场景1,请求以每秒增加1个的速度不停上升,压测效果如下:
|
||||||
|
|
||||||
|
![ratelimit-benchmark-up-1](/doc/img/ratelimit-benchmark-up-1.png)
|
||||||
|
|
||||||
|
左测是没有限流的压测效果,右侧是带限流的压测效果。
|
||||||
|
可以看到,没有限流的场景里,系统在 700qps 时开始抖动,在 1k qps 时被拖垮,几乎没有新的请求能被放行,然而在使用限流之后,系统请求能够稳定在 600 qps 左右,rt 没有暴增,服务也没有被打垮,可见,限流有效的保护了服务。
|
||||||
|
|
||||||
|
|
||||||
|
参考资料:
|
||||||
|
|
||||||
|
[Sentinel 系统自适应限流](https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81)
|
||||||
|
|
@ -320,6 +320,52 @@ func serverLogging() grpc.UnaryServerInterceptor {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# 内置拦截器
|
||||||
|
|
||||||
|
## 自适应限流拦截器
|
||||||
|
|
||||||
|
更多关于自适应限流的信息,请参考:[kratos 自适应限流](/doc/wiki-cn/ratelimit.md)
|
||||||
|
|
||||||
|
```go
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
pb "kratos-demo/api"
|
||||||
|
"kratos-demo/internal/service"
|
||||||
|
"github.com/bilibili/kratos/pkg/conf/paladin"
|
||||||
|
"github.com/bilibili/kratos/pkg/net/rpc/warden"
|
||||||
|
"github.com/bilibili/kratos/pkg/net/rpc/warden/ratelimiter"
|
||||||
|
)
|
||||||
|
|
||||||
|
// New new a grpc server.
|
||||||
|
func New(svc *service.Service) *warden.Server {
|
||||||
|
var rc struct {
|
||||||
|
Server *warden.ServerConfig
|
||||||
|
}
|
||||||
|
if err := paladin.Get("grpc.toml").UnmarshalTOML(&rc); err != nil {
|
||||||
|
if err != paladin.ErrNotExist {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ws := warden.NewServer(rc.Server)
|
||||||
|
|
||||||
|
// 挂载自适应限流拦截器到 warden server,使用默认配置
|
||||||
|
limiter := ratelimiter.New(nil)
|
||||||
|
ws.Use(limiter.Limit())
|
||||||
|
|
||||||
|
// 注意替换这里:
|
||||||
|
// RegisterDemoServer方法是在"api"目录下代码生成的
|
||||||
|
// 对应proto文件内自定义的service名字,请使用正确方法名替换
|
||||||
|
pb.RegisterDemoServer(ws.Server(), svc)
|
||||||
|
|
||||||
|
ws, err := ws.Start()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return ws
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
# 扩展阅读
|
# 扩展阅读
|
||||||
|
|
||||||
[warden快速开始](warden-quickstart.md) [warden基于pb生成](warden-pb.md) [warden负载均衡](warden-balancer.md) [warden服务发现](warden-resolver.md)
|
[warden快速开始](warden-quickstart.md) [warden基于pb生成](warden-pb.md) [warden负载均衡](warden-balancer.md) [warden服务发现](warden-resolver.md)
|
||||||
|
12
pkg/container/group/README.md
Normal file
12
pkg/container/group/README.md
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
#### group
|
||||||
|
|
||||||
|
##### 项目简介
|
||||||
|
|
||||||
|
懒加载对象容器
|
||||||
|
|
||||||
|
##### 编译环境
|
||||||
|
|
||||||
|
- **推荐 Golang v1.12.1 以上版本编译执行**
|
||||||
|
|
||||||
|
##### 依赖包
|
||||||
|
|
46
pkg/container/group/example_test.go
Normal file
46
pkg/container/group/example_test.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
package group
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
type Counter struct {
|
||||||
|
Value int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Counter) Incr() {
|
||||||
|
c.Value++
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleGroup_Get() {
|
||||||
|
new := func() interface{} {
|
||||||
|
fmt.Println("Only Once")
|
||||||
|
return &Counter{}
|
||||||
|
}
|
||||||
|
group := NewGroup(new)
|
||||||
|
|
||||||
|
// Create a new Counter
|
||||||
|
group.Get("pass").(*Counter).Incr()
|
||||||
|
|
||||||
|
// Get the created Counter again.
|
||||||
|
group.Get("pass").(*Counter).Incr()
|
||||||
|
// Output:
|
||||||
|
// Only Once
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleGroup_Reset() {
|
||||||
|
new := func() interface{} {
|
||||||
|
return &Counter{}
|
||||||
|
}
|
||||||
|
group := NewGroup(new)
|
||||||
|
|
||||||
|
newV2 := func() interface{} {
|
||||||
|
fmt.Println("New V2")
|
||||||
|
return &Counter{}
|
||||||
|
}
|
||||||
|
// Reset the new function and clear all created objects.
|
||||||
|
group.Reset(newV2)
|
||||||
|
|
||||||
|
// Create a new Counter
|
||||||
|
group.Get("pass").(*Counter).Incr()
|
||||||
|
// Output:
|
||||||
|
// New V2
|
||||||
|
}
|
55
pkg/container/group/group.go
Normal file
55
pkg/container/group/group.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
// Package group provides a sample lazy load container.
|
||||||
|
// The group only creating a new object not until the object is needed by user.
|
||||||
|
// And it will cache all the objects to reduce the creation of object.
|
||||||
|
package group
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// Group is a lazy load container.
|
||||||
|
type Group struct {
|
||||||
|
new func() interface{}
|
||||||
|
objs sync.Map
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGroup news a group container.
|
||||||
|
func NewGroup(new func() interface{}) *Group {
|
||||||
|
if new == nil {
|
||||||
|
panic("container.group: can't assign a nil to the new function")
|
||||||
|
}
|
||||||
|
return &Group{
|
||||||
|
new: new,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get gets the object by the given key.
|
||||||
|
func (g *Group) Get(key string) interface{} {
|
||||||
|
g.RLock()
|
||||||
|
new := g.new
|
||||||
|
g.RUnlock()
|
||||||
|
obj, ok := g.objs.Load(key)
|
||||||
|
if !ok {
|
||||||
|
obj = new()
|
||||||
|
g.objs.Store(key, obj)
|
||||||
|
}
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset resets the new function and deletes all existing objects.
|
||||||
|
func (g *Group) Reset(new func() interface{}) {
|
||||||
|
if new == nil {
|
||||||
|
panic("container.group: can't assign a nil to the new function")
|
||||||
|
}
|
||||||
|
g.Lock()
|
||||||
|
g.new = new
|
||||||
|
g.Unlock()
|
||||||
|
g.Clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear deletes all objects.
|
||||||
|
func (g *Group) Clear() {
|
||||||
|
g.objs.Range(func(key, value interface{}) bool {
|
||||||
|
g.objs.Delete(key)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
69
pkg/container/group/group_test.go
Normal file
69
pkg/container/group/group_test.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package group
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGroupGet(t *testing.T) {
|
||||||
|
count := 0
|
||||||
|
g := NewGroup(func() interface{} {
|
||||||
|
count++
|
||||||
|
return count
|
||||||
|
})
|
||||||
|
v := g.Get("/x/internal/dummy/user")
|
||||||
|
assert.Equal(t, 1, v.(int))
|
||||||
|
|
||||||
|
v = g.Get("/x/internal/dummy/avatar")
|
||||||
|
assert.Equal(t, 2, v.(int))
|
||||||
|
|
||||||
|
v = g.Get("/x/internal/dummy/user")
|
||||||
|
assert.Equal(t, 1, v.(int))
|
||||||
|
assert.Equal(t, 2, count)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGroupReset(t *testing.T) {
|
||||||
|
g := NewGroup(func() interface{} {
|
||||||
|
return 1
|
||||||
|
})
|
||||||
|
g.Get("/x/internal/dummy/user")
|
||||||
|
call := false
|
||||||
|
g.Reset(func() interface{} {
|
||||||
|
call = true
|
||||||
|
return 1
|
||||||
|
})
|
||||||
|
|
||||||
|
length := 0
|
||||||
|
g.objs.Range(func(_, _ interface{}) bool {
|
||||||
|
length++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
assert.Equal(t, 0, length)
|
||||||
|
|
||||||
|
g.Get("/x/internal/dummy/user")
|
||||||
|
assert.Equal(t, true, call)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGroupClear(t *testing.T) {
|
||||||
|
g := NewGroup(func() interface{} {
|
||||||
|
return 1
|
||||||
|
})
|
||||||
|
g.Get("/x/internal/dummy/user")
|
||||||
|
length := 0
|
||||||
|
g.objs.Range(func(_, _ interface{}) bool {
|
||||||
|
length++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
assert.Equal(t, 1, length)
|
||||||
|
|
||||||
|
g.Clear()
|
||||||
|
length = 0
|
||||||
|
g.objs.Range(func(_, _ interface{}) bool {
|
||||||
|
length++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
assert.Equal(t, 0, length)
|
||||||
|
|
||||||
|
}
|
62
pkg/net/http/blademaster/ratelimit.go
Normal file
62
pkg/net/http/blademaster/ratelimit.go
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
package blademaster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bilibili/kratos/pkg/log"
|
||||||
|
limit "github.com/bilibili/kratos/pkg/ratelimit"
|
||||||
|
"github.com/bilibili/kratos/pkg/ratelimit/bbr"
|
||||||
|
"github.com/bilibili/kratos/pkg/stat/prom"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
_statName = "go_http_bbr"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
bbrStats = prom.New().WithState("go_http_bbr", []string{"url"})
|
||||||
|
)
|
||||||
|
|
||||||
|
// RateLimiter bbr middleware.
|
||||||
|
type RateLimiter struct {
|
||||||
|
group *bbr.Group
|
||||||
|
logTime int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// New return a ratelimit middleware.
|
||||||
|
func NewRateLimiter(conf *bbr.Config) (s *RateLimiter) {
|
||||||
|
return &RateLimiter{
|
||||||
|
group: bbr.NewGroup(conf),
|
||||||
|
logTime: time.Now().UnixNano(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *RateLimiter) printStats(routePath string, limiter limit.Limiter) {
|
||||||
|
now := time.Now().UnixNano()
|
||||||
|
if now-atomic.LoadInt64(&b.logTime) > int64(time.Second*3) {
|
||||||
|
atomic.StoreInt64(&b.logTime, now)
|
||||||
|
log.Info("http.bbr path:%s stat:%+v", routePath, limiter.(*bbr.BBR).Stat())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limit return a bm handler func.
|
||||||
|
func (b *RateLimiter) Limit() HandlerFunc {
|
||||||
|
return func(c *Context) {
|
||||||
|
uri := fmt.Sprintf("%s://%s%s", c.Request.URL.Scheme, c.Request.Host, c.Request.URL.Path)
|
||||||
|
limiter := b.group.Get(uri)
|
||||||
|
done, err := limiter.Allow(c)
|
||||||
|
if err != nil {
|
||||||
|
bbrStats.Incr(_statName, uri)
|
||||||
|
c.JSON(nil, err)
|
||||||
|
c.Abort()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
done(limit.DoneInfo{Op: limit.Success})
|
||||||
|
b.printStats(uri, limiter)
|
||||||
|
}()
|
||||||
|
c.Next()
|
||||||
|
}
|
||||||
|
}
|
63
pkg/net/rpc/warden/ratelimiter/ratelimiter.go
Normal file
63
pkg/net/rpc/warden/ratelimiter/ratelimiter.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package ratelimiter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bilibili/kratos/pkg/log"
|
||||||
|
limit "github.com/bilibili/kratos/pkg/ratelimit"
|
||||||
|
"github.com/bilibili/kratos/pkg/ratelimit/bbr"
|
||||||
|
"github.com/bilibili/kratos/pkg/stat/prom"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
_statName = "go_grpc_bbr"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
stats = prom.New().WithState("go_grpc_bbr", []string{"url"})
|
||||||
|
)
|
||||||
|
|
||||||
|
// RateLimiter bbr middleware.
|
||||||
|
type RateLimiter struct {
|
||||||
|
group *bbr.Group
|
||||||
|
logTime int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// New return a ratelimit middleware.
|
||||||
|
func New(conf *bbr.Config) (s *RateLimiter) {
|
||||||
|
return &RateLimiter{
|
||||||
|
group: bbr.NewGroup(conf),
|
||||||
|
logTime: time.Now().UnixNano(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *RateLimiter) printStats(fullMethod string, limiter limit.Limiter) {
|
||||||
|
now := time.Now().UnixNano()
|
||||||
|
if now-atomic.LoadInt64(&b.logTime) > int64(time.Second*3) {
|
||||||
|
atomic.StoreInt64(&b.logTime, now)
|
||||||
|
log.Info("grpc.bbr path:%s stat:%+v", fullMethod, limiter.(*bbr.BBR).Stat())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limit is a server interceptor that detects and rejects overloaded traffic.
|
||||||
|
func (b *RateLimiter) Limit() grpc.UnaryServerInterceptor {
|
||||||
|
return func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
|
uri := args.FullMethod
|
||||||
|
limiter := b.group.Get(uri)
|
||||||
|
done, err := limiter.Allow(ctx)
|
||||||
|
if err != nil {
|
||||||
|
stats.Incr(_statName, uri)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
done(limit.DoneInfo{Op: limit.Success})
|
||||||
|
b.printStats(uri, limiter)
|
||||||
|
}()
|
||||||
|
resp, err = handler(ctx, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
@ -267,7 +267,7 @@ func testBreaker(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
c := pb.NewGreeterClient(conn)
|
c := pb.NewGreeterClient(conn)
|
||||||
for i := 0; i < 35; i++ {
|
for i := 0; i < 50; i++ {
|
||||||
_, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "breaker_test"})
|
_, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "breaker_test"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ecode.ServiceUnavailable.Equal(err) {
|
if ecode.ServiceUnavailable.Equal(err) {
|
||||||
|
14
pkg/ratelimit/README.md
Normal file
14
pkg/ratelimit/README.md
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
# rate
|
||||||
|
|
||||||
|
# 项目简介
|
||||||
|
BBR 限流
|
||||||
|
|
||||||
|
# 编译环境
|
||||||
|
|
||||||
|
|
||||||
|
# 依赖包
|
||||||
|
|
||||||
|
|
||||||
|
# 编译执行
|
||||||
|
|
||||||
|
|
222
pkg/ratelimit/bbr/bbr.go
Normal file
222
pkg/ratelimit/bbr/bbr.go
Normal file
@ -0,0 +1,222 @@
|
|||||||
|
package bbr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bilibili/kratos/pkg/container/group"
|
||||||
|
"github.com/bilibili/kratos/pkg/ecode"
|
||||||
|
"github.com/bilibili/kratos/pkg/log"
|
||||||
|
limit "github.com/bilibili/kratos/pkg/ratelimit"
|
||||||
|
"github.com/bilibili/kratos/pkg/stat/metric"
|
||||||
|
|
||||||
|
cpustat "github.com/bilibili/kratos/pkg/stat/sys/cpu"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
cpu int64
|
||||||
|
decay = 0.75
|
||||||
|
defaultConf = &Config{
|
||||||
|
Window: time.Second * 5,
|
||||||
|
WinBucket: 50,
|
||||||
|
CPUThreshold: 800,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type cpuGetter func() int64
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
go cpuproc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func cpuproc() {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err != nil {
|
||||||
|
log.Error("rate.limit.cpuproc() err(%+v)", err)
|
||||||
|
go cpuproc()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
ticker := time.NewTicker(time.Millisecond * 250)
|
||||||
|
// EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
|
||||||
|
for range ticker.C {
|
||||||
|
stat := &cpustat.Stat{}
|
||||||
|
cpustat.ReadStat(stat)
|
||||||
|
prevCpu := atomic.LoadInt64(&cpu)
|
||||||
|
curCpu := int64(float64(prevCpu)*decay + float64(stat.Usage)*(1.0-decay))
|
||||||
|
atomic.StoreInt64(&cpu, curCpu)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stats contains the metrics's snapshot of bbr.
|
||||||
|
type Stat struct {
|
||||||
|
Cpu int64
|
||||||
|
InFlight int64
|
||||||
|
MaxInFlight int64
|
||||||
|
MinRt int64
|
||||||
|
MaxPass int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// BBR implements bbr-like limiter.
|
||||||
|
// It is inspired by sentinel.
|
||||||
|
// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81
|
||||||
|
type BBR struct {
|
||||||
|
cpu cpuGetter
|
||||||
|
passStat metric.RollingCounter
|
||||||
|
rtStat metric.RollingGauge
|
||||||
|
inFlight int64
|
||||||
|
winBucketPerSec int64
|
||||||
|
conf *Config
|
||||||
|
prevDrop time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config contains configs of bbr limiter.
|
||||||
|
type Config struct {
|
||||||
|
Enabled bool
|
||||||
|
Window time.Duration
|
||||||
|
WinBucket int
|
||||||
|
Rule string
|
||||||
|
Debug bool
|
||||||
|
CPUThreshold int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BBR) maxPASS() int64 {
|
||||||
|
val := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
|
||||||
|
var result = 1.0
|
||||||
|
for iterator.Next() {
|
||||||
|
bucket := iterator.Bucket()
|
||||||
|
count := 0.0
|
||||||
|
for _, p := range bucket.Points {
|
||||||
|
count += p
|
||||||
|
}
|
||||||
|
result = math.Max(result, count)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}))
|
||||||
|
if val == 0 {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BBR) minRT() int64 {
|
||||||
|
val := l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
|
||||||
|
var result = math.MaxFloat64
|
||||||
|
for iterator.Next() {
|
||||||
|
bucket := iterator.Bucket()
|
||||||
|
if len(bucket.Points) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
total := 0.0
|
||||||
|
for _, p := range bucket.Points {
|
||||||
|
total += p
|
||||||
|
}
|
||||||
|
avg := total / float64(bucket.Count)
|
||||||
|
result = math.Min(result, avg)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
})
|
||||||
|
return int64(math.Ceil(val))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BBR) maxFlight() int64 {
|
||||||
|
return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0 + 0.5))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *BBR) shouldDrop() bool {
|
||||||
|
inFlight := atomic.LoadInt64(&l.inFlight)
|
||||||
|
maxInflight := l.maxFlight()
|
||||||
|
if l.cpu() < l.conf.CPUThreshold {
|
||||||
|
if time.Now().Sub(l.prevDrop) <= 1000*time.Millisecond {
|
||||||
|
return inFlight > 1 && inFlight > maxInflight
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return inFlight > 1 && inFlight > maxInflight
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat tasks a snapshot of the bbr limiter.
|
||||||
|
func (l *BBR) Stat() Stat {
|
||||||
|
return Stat{
|
||||||
|
Cpu: l.cpu(),
|
||||||
|
InFlight: atomic.LoadInt64(&l.inFlight),
|
||||||
|
MinRt: l.minRT(),
|
||||||
|
MaxPass: l.maxPASS(),
|
||||||
|
MaxInFlight: l.maxFlight(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow checks all inbound traffic.
|
||||||
|
// Once overload is detected, it raises ecode.LimitExceed error.
|
||||||
|
func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info limit.DoneInfo), error) {
|
||||||
|
allowOpts := limit.DefaultAllowOpts()
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt.Apply(&allowOpts)
|
||||||
|
}
|
||||||
|
if l.shouldDrop() {
|
||||||
|
l.prevDrop = time.Now()
|
||||||
|
return nil, ecode.LimitExceed
|
||||||
|
}
|
||||||
|
atomic.AddInt64(&l.inFlight, 1)
|
||||||
|
stime := time.Now()
|
||||||
|
return func(do limit.DoneInfo) {
|
||||||
|
rt := int64(time.Since(stime) / time.Millisecond)
|
||||||
|
l.rtStat.Add(rt)
|
||||||
|
atomic.AddInt64(&l.inFlight, -1)
|
||||||
|
switch do.Op {
|
||||||
|
case limit.Success:
|
||||||
|
l.passStat.Add(1)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLimiter(conf *Config) limit.Limiter {
|
||||||
|
if conf == nil {
|
||||||
|
conf = defaultConf
|
||||||
|
}
|
||||||
|
size := conf.WinBucket
|
||||||
|
bucketDuration := conf.Window / time.Duration(conf.WinBucket)
|
||||||
|
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration})
|
||||||
|
rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: size, BucketDuration: bucketDuration})
|
||||||
|
cpu := func() int64 {
|
||||||
|
return atomic.LoadInt64(&cpu)
|
||||||
|
}
|
||||||
|
limiter := &BBR{
|
||||||
|
cpu: cpu,
|
||||||
|
conf: conf,
|
||||||
|
passStat: passStat,
|
||||||
|
rtStat: rtStat,
|
||||||
|
winBucketPerSec: int64(time.Second) / (int64(conf.Window) / int64(conf.WinBucket)),
|
||||||
|
prevDrop: time.Unix(0, 0),
|
||||||
|
}
|
||||||
|
return limiter
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group represents a class of BBRLimiter and forms a namespace in which
|
||||||
|
// units of BBRLimiter.
|
||||||
|
type Group struct {
|
||||||
|
group *group.Group
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGroup new a limiter group container, if conf nil use default conf.
|
||||||
|
func NewGroup(conf *Config) *Group {
|
||||||
|
if conf == nil {
|
||||||
|
conf = defaultConf
|
||||||
|
}
|
||||||
|
group := group.NewGroup(func() interface{} {
|
||||||
|
return newLimiter(conf)
|
||||||
|
})
|
||||||
|
return &Group{
|
||||||
|
group: group,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get get a limiter by a specified key, if limiter not exists then make a new one.
|
||||||
|
func (g *Group) Get(key string) limit.Limiter {
|
||||||
|
limiter := g.group.Get(key)
|
||||||
|
return limiter.(limit.Limiter)
|
||||||
|
}
|
166
pkg/ratelimit/bbr/bbr_test.go
Normal file
166
pkg/ratelimit/bbr/bbr_test.go
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
package bbr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
limit "github.com/bilibili/kratos/pkg/ratelimit"
|
||||||
|
"github.com/bilibili/kratos/pkg/stat/metric"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBBR(t *testing.T) {
|
||||||
|
cfg := &Config{
|
||||||
|
Window: time.Second * 5,
|
||||||
|
WinBucket: 50,
|
||||||
|
CPUThreshold: 100,
|
||||||
|
}
|
||||||
|
limiter := newLimiter(cfg)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var drop int64
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < 300; i++ {
|
||||||
|
f, err := limiter.Allow(context.TODO())
|
||||||
|
if err != nil {
|
||||||
|
atomic.AddInt64(&drop, 1)
|
||||||
|
} else {
|
||||||
|
count := rand.Intn(100)
|
||||||
|
time.Sleep(time.Millisecond * time.Duration(count))
|
||||||
|
f(limit.DoneInfo{Op: limit.Success})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
fmt.Println("drop: ", drop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBBRMaxPass(t *testing.T) {
|
||||||
|
bucketDuration := time.Millisecond * 100
|
||||||
|
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
passStat.Add(int64(i * 100))
|
||||||
|
time.Sleep(bucketDuration)
|
||||||
|
}
|
||||||
|
bbr := &BBR{
|
||||||
|
passStat: passStat,
|
||||||
|
}
|
||||||
|
assert.Equal(t, int64(1000), bbr.maxPASS())
|
||||||
|
|
||||||
|
// default max pass is equal to 1.
|
||||||
|
passStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
bbr = &BBR{
|
||||||
|
passStat: passStat,
|
||||||
|
}
|
||||||
|
assert.Equal(t, int64(1), bbr.maxPASS())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBBRMinRt(t *testing.T) {
|
||||||
|
bucketDuration := time.Millisecond * 100
|
||||||
|
rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
for j := i*10 + 1; j <= i*10+10; j++ {
|
||||||
|
rtStat.Add(int64(j))
|
||||||
|
}
|
||||||
|
if i != 9 {
|
||||||
|
time.Sleep(bucketDuration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bbr := &BBR{
|
||||||
|
rtStat: rtStat,
|
||||||
|
}
|
||||||
|
assert.Equal(t, int64(6), bbr.minRT())
|
||||||
|
|
||||||
|
// default max min rt is equal to maxFloat64.
|
||||||
|
bucketDuration = time.Millisecond * 100
|
||||||
|
rtStat = metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
bbr = &BBR{
|
||||||
|
rtStat: rtStat,
|
||||||
|
}
|
||||||
|
assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBBRMaxInflight(t *testing.T) {
|
||||||
|
bucketDuration := time.Millisecond * 100
|
||||||
|
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
passStat.Add(int64((i + 1) * 100))
|
||||||
|
for j := i*10 + 1; j <= i*10+10; j++ {
|
||||||
|
rtStat.Add(int64(j))
|
||||||
|
}
|
||||||
|
if i != 9 {
|
||||||
|
time.Sleep(bucketDuration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bbr := &BBR{
|
||||||
|
passStat: passStat,
|
||||||
|
rtStat: rtStat,
|
||||||
|
winBucketPerSec: 10,
|
||||||
|
}
|
||||||
|
assert.Equal(t, int64(60), bbr.maxFlight())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBBRShouldDrop(t *testing.T) {
|
||||||
|
var cpu int64
|
||||||
|
cpuGetter := func() int64 {
|
||||||
|
return cpu
|
||||||
|
}
|
||||||
|
bucketDuration := time.Millisecond * 100
|
||||||
|
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration})
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
passStat.Add(int64((i + 1) * 100))
|
||||||
|
for j := i*10 + 1; j <= i*10+10; j++ {
|
||||||
|
rtStat.Add(int64(j))
|
||||||
|
}
|
||||||
|
if i != 9 {
|
||||||
|
time.Sleep(bucketDuration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bbr := &BBR{
|
||||||
|
cpu: cpuGetter,
|
||||||
|
passStat: passStat,
|
||||||
|
rtStat: rtStat,
|
||||||
|
winBucketPerSec: 10,
|
||||||
|
prevDrop: time.Unix(0, 0),
|
||||||
|
conf: defaultConf,
|
||||||
|
}
|
||||||
|
// cpu >= 800, inflight < maxQps
|
||||||
|
cpu = 800
|
||||||
|
bbr.inFlight = 50
|
||||||
|
assert.Equal(t, false, bbr.shouldDrop())
|
||||||
|
|
||||||
|
// cpu >= 800, inflight > maxQps
|
||||||
|
cpu = 800
|
||||||
|
bbr.inFlight = 80
|
||||||
|
assert.Equal(t, true, bbr.shouldDrop())
|
||||||
|
|
||||||
|
// cpu < 800, inflight > maxQps
|
||||||
|
cpu = 700
|
||||||
|
bbr.inFlight = 80
|
||||||
|
assert.Equal(t, false, bbr.shouldDrop())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGroup(t *testing.T) {
|
||||||
|
cfg := &Config{
|
||||||
|
Window: time.Second * 5,
|
||||||
|
WinBucket: 50,
|
||||||
|
CPUThreshold: 100,
|
||||||
|
}
|
||||||
|
group := NewGroup(cfg)
|
||||||
|
t.Run("get", func(t *testing.T) {
|
||||||
|
limiter := group.Get("test")
|
||||||
|
assert.NotNil(t, limiter)
|
||||||
|
})
|
||||||
|
}
|
40
pkg/ratelimit/limiter.go
Normal file
40
pkg/ratelimit/limiter.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package ratelimit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Op operations type.
|
||||||
|
type Op int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Success opertion type: success
|
||||||
|
Success Op = iota
|
||||||
|
// Ignore opertion type: ignore
|
||||||
|
Ignore
|
||||||
|
// Drop opertion type: drop
|
||||||
|
Drop
|
||||||
|
)
|
||||||
|
|
||||||
|
type allowOptions struct{}
|
||||||
|
|
||||||
|
// AllowOptions allow options.
|
||||||
|
type AllowOption interface {
|
||||||
|
Apply(*allowOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DoneInfo done info.
|
||||||
|
type DoneInfo struct {
|
||||||
|
Err error
|
||||||
|
Op Op
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultAllowOpts returns the default allow options.
|
||||||
|
func DefaultAllowOpts() allowOptions {
|
||||||
|
return allowOptions{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limiter limit interface.
|
||||||
|
type Limiter interface {
|
||||||
|
Allow(ctx context.Context, opts ...AllowOption) (func(info DoneInfo), error)
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user