mirror of
https://github.com/go-kratos/kratos.git
synced 2026-05-22 10:15:24 +02:00
@@ -19,6 +19,9 @@ type BalancerBuilder interface {
|
|||||||
type WeightedNode interface {
|
type WeightedNode interface {
|
||||||
Node
|
Node
|
||||||
|
|
||||||
|
// Raw returns the original node
|
||||||
|
Raw() Node
|
||||||
|
|
||||||
// Weight is the runtime calculated weight
|
// Weight is the runtime calculated weight
|
||||||
Weight() float64
|
Weight() float64
|
||||||
|
|
||||||
|
|||||||
@@ -38,7 +38,11 @@ func (d *Default) Select(ctx context.Context, opts ...SelectOption) (selected No
|
|||||||
if len(candidates) == 0 {
|
if len(candidates) == 0 {
|
||||||
return nil, nil, ErrNoAvailable
|
return nil, nil, ErrNoAvailable
|
||||||
}
|
}
|
||||||
return d.Balancer.Pick(ctx, candidates)
|
wn, done, err := d.Balancer.Pick(ctx, candidates)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return wn.Raw(), done, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply update nodes info.
|
// Apply update nodes info.
|
||||||
|
|||||||
@@ -50,3 +50,7 @@ func (n *Node) Weight() float64 {
|
|||||||
func (n *Node) PickElapsed() time.Duration {
|
func (n *Node) PickElapsed() time.Duration {
|
||||||
return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick))
|
return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Node) Raw() selector.Node {
|
||||||
|
return n.Node
|
||||||
|
}
|
||||||
|
|||||||
@@ -48,6 +48,6 @@ func TestDirectDefaultWeight(t *testing.T) {
|
|||||||
time.Sleep(time.Millisecond * 10)
|
time.Sleep(time.Millisecond * 10)
|
||||||
done(context.Background(), selector.DoneInfo{})
|
done(context.Background(), selector.DoneInfo{})
|
||||||
assert.Equal(t, float64(100), wn.Weight())
|
assert.Equal(t, float64(100), wn.Weight())
|
||||||
assert.Greater(t, time.Millisecond*15, wn.PickElapsed())
|
assert.Greater(t, time.Millisecond*20, wn.PickElapsed())
|
||||||
assert.Less(t, time.Millisecond*5, wn.PickElapsed())
|
assert.Less(t, time.Millisecond*5, wn.PickElapsed())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -178,3 +178,7 @@ func (n *Node) Weight() (weight float64) {
|
|||||||
func (n *Node) PickElapsed() time.Duration {
|
func (n *Node) PickElapsed() time.Duration {
|
||||||
return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick))
|
return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Node) Raw() selector.Node {
|
||||||
|
return n.Node
|
||||||
|
}
|
||||||
|
|||||||
@@ -17,6 +17,11 @@ type mockWeightedNode struct {
|
|||||||
lastPick int64
|
lastPick int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Raw returns the original node
|
||||||
|
func (n *mockWeightedNode) Raw() Node {
|
||||||
|
return n.Node
|
||||||
|
}
|
||||||
|
|
||||||
// Weight is the runtime calculated weight
|
// Weight is the runtime calculated weight
|
||||||
func (n *mockWeightedNode) Weight() float64 {
|
func (n *mockWeightedNode) Weight() float64 {
|
||||||
if n.InitialWeight() != nil {
|
if n.InitialWeight() != nil {
|
||||||
|
|||||||
+10
-11
@@ -50,19 +50,15 @@ type Builder struct {
|
|||||||
// Build creates a grpc Picker.
|
// Build creates a grpc Picker.
|
||||||
func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
|
func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
|
||||||
nodes := make([]selector.Node, 0)
|
nodes := make([]selector.Node, 0)
|
||||||
subConns := make(map[string]gBalancer.SubConn)
|
|
||||||
for conn, info := range info.ReadySCs {
|
for conn, info := range info.ReadySCs {
|
||||||
if _, ok := subConns[info.Address.Addr]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
subConns[info.Address.Addr] = conn
|
|
||||||
|
|
||||||
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
|
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
|
||||||
nodes = append(nodes, selector.NewNode(info.Address.Addr, ins))
|
nodes = append(nodes, &grpcNode{
|
||||||
|
Node: selector.NewNode(info.Address.Addr, ins),
|
||||||
|
subConn: conn,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
p := &Picker{
|
p := &Picker{
|
||||||
selector: b.builder.Build(),
|
selector: b.builder.Build(),
|
||||||
subConns: subConns,
|
|
||||||
}
|
}
|
||||||
p.selector.Apply(nodes)
|
p.selector.Apply(nodes)
|
||||||
return p
|
return p
|
||||||
@@ -70,7 +66,6 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
|
|||||||
|
|
||||||
// Picker is a grpc picker.
|
// Picker is a grpc picker.
|
||||||
type Picker struct {
|
type Picker struct {
|
||||||
subConns map[string]gBalancer.SubConn
|
|
||||||
selector selector.Selector
|
selector selector.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,10 +82,9 @@ func (p *Picker) Pick(info gBalancer.PickInfo) (gBalancer.PickResult, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return gBalancer.PickResult{}, err
|
return gBalancer.PickResult{}, err
|
||||||
}
|
}
|
||||||
sub := p.subConns[n.Address()]
|
|
||||||
|
|
||||||
return gBalancer.PickResult{
|
return gBalancer.PickResult{
|
||||||
SubConn: sub,
|
SubConn: n.(*grpcNode).subConn,
|
||||||
Done: func(di gBalancer.DoneInfo) {
|
Done: func(di gBalancer.DoneInfo) {
|
||||||
done(info.Ctx, selector.DoneInfo{
|
done(info.Ctx, selector.DoneInfo{
|
||||||
Err: di.Err,
|
Err: di.Err,
|
||||||
@@ -113,3 +107,8 @@ func (t Trailer) Get(k string) string {
|
|||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type grpcNode struct {
|
||||||
|
selector.Node
|
||||||
|
subConn gBalancer.SubConn
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user