1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-28 03:57:02 +02:00

fix global selector bug (#1564)

This commit is contained in:
longxboy 2021-10-18 19:26:00 +08:00 committed by GitHub
parent 2d026f1f95
commit 86dec76aa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 100 additions and 41 deletions

View File

@ -10,6 +10,11 @@ type Balancer interface {
Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, done DoneFunc, err error)
}
// BalancerBuilder build balancer
type BalancerBuilder interface {
Build() Balancer
}
// WeightedNode calculates scheduling weight in real time
type WeightedNode interface {
Node

View File

@ -52,3 +52,19 @@ func (d *Default) Apply(nodes []Node) {
d.weightedNodes = weightedNodes
d.lk.Unlock()
}
// DefaultBuilder is de
type DefaultBuilder struct {
Node WeightedNodeBuilder
Balancer BalancerBuilder
Filters []Filter
}
// Build create builder
func (db *DefaultBuilder) Build() Selector {
return &Default{
NodeBuilder: db.Node,
Balancer: db.Balancer.Build(),
Filters: db.Filters,
}
}

View File

@ -35,17 +35,7 @@ type options struct {
// New creates a p2c selector.
func New(opts ...Option) selector.Selector {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.Default{
NodeBuilder: &ewma.Builder{},
Balancer: &Balancer{
r: rand.New(rand.NewSource(time.Now().UnixNano())),
},
Filters: option.filters,
}
return NewBuilder(opts...).Build()
}
// Balancer is p2c selector.
@ -92,3 +82,24 @@ func (s *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (sel
done := pc.Pick()
return pc, done, nil
}
// NewBuilder returns a selector builder with p2c balancer
func NewBuilder(opts ...Option) selector.Builder {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.DefaultBuilder{
Filters: option.filters,
Balancer: &Builder{},
Node: &ewma.Builder{},
}
}
// Builder is p2c builder
type Builder struct{}
// Build creates Balancer
func (b *Builder) Build() selector.Balancer {
return &Balancer{r: rand.New(rand.NewSource(time.Now().UnixNano()))}
}

View File

@ -33,18 +33,9 @@ type options struct {
// Balancer is a random balancer.
type Balancer struct{}
// New random a selector.
// New an random selector.
func New(opts ...Option) selector.Selector {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.Default{
Balancer: &Balancer{},
NodeBuilder: &direct.Builder{},
Filters: option.filters,
}
return NewBuilder(opts...).Build()
}
// Pick pick a weighted node.
@ -57,3 +48,24 @@ func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selec
d := selected.Pick()
return selected, d, nil
}
// NewBuilder returns a selector builder with random balancer
func NewBuilder(opts ...Option) selector.Builder {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.DefaultBuilder{
Filters: option.filters,
Balancer: &Builder{},
Node: &direct.Builder{},
}
}
// Builder is random builder
type Builder struct{}
// Build creates Balancer
func (b *Builder) Build() selector.Balancer {
return &Balancer{}
}

View File

@ -24,6 +24,11 @@ type Rebalancer interface {
Apply(nodes []Node)
}
// Builder build selector
type Builder interface {
Build() Selector
}
// Node is node interface.
type Node interface {
// Address is the unique address under the same service

View File

@ -38,18 +38,7 @@ type Balancer struct {
// New random a selector.
func New(opts ...Option) selector.Selector {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.Default{
Balancer: &Balancer{
currentWeight: make(map[string]float64),
},
NodeBuilder: &direct.Builder{},
Filters: option.filters,
}
return NewBuilder(opts...).Build()
}
// Pick pick a weighted node.
@ -80,3 +69,24 @@ func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selec
d := selected.Pick()
return selected, d, nil
}
// NewBuilder returns a selector builder with wrr balancer
func NewBuilder(opts ...Option) selector.Builder {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.DefaultBuilder{
Filters: option.filters,
Balancer: &Builder{},
Node: &direct.Builder{},
}
}
// Builder is wrr builder
type Builder struct{}
// Build creates Balancer
func (b *Builder) Build() selector.Balancer {
return &Balancer{currentWeight: make(map[string]float64)}
}

View File

@ -25,19 +25,19 @@ var (
func init() {
// inject global grpc balancer
SetGlobalBalancer(random.Name, random.New())
SetGlobalBalancer(wrr.Name, wrr.New())
SetGlobalBalancer(p2c.Name, p2c.New())
SetGlobalBalancer(random.Name, random.NewBuilder())
SetGlobalBalancer(wrr.Name, wrr.NewBuilder())
SetGlobalBalancer(p2c.Name, p2c.NewBuilder())
}
// SetGlobalBalancer set grpc balancer with scheme.
func SetGlobalBalancer(scheme string, selector selector.Selector) {
func SetGlobalBalancer(scheme string, builder selector.Builder) {
mu.Lock()
defer mu.Unlock()
b := base.NewBalancerBuilder(
scheme,
&Builder{selector},
&Builder{builder: builder},
base.Config{HealthCheck: true},
)
gBalancer.Register(b)
@ -45,7 +45,7 @@ func SetGlobalBalancer(scheme string, selector selector.Selector) {
// Builder is grpc balancer builder.
type Builder struct {
selector selector.Selector
builder selector.Builder
}
// Build creates a grpc Picker.
@ -62,7 +62,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
nodes = append(nodes, node.New(info.Address.Addr, ins))
}
p := &Picker{
selector: b.selector,
selector: b.builder.Build(),
subConns: subConns,
}
p.selector.Apply(nodes)