1
0
mirror of https://github.com/go-kratos/kratos.git synced 2026-05-22 10:15:24 +02:00

feat(selector): add node scheme (#1932)

* add node scheme
This commit is contained in:
Tony Chen
2022-04-14 13:48:20 +08:00
committed by GitHub
parent 3990d91b9b
commit d0b704b8f3
11 changed files with 29 additions and 4 deletions
+9 -2
View File
@@ -8,6 +8,7 @@ import (
// DefaultNode is selector node // DefaultNode is selector node
type DefaultNode struct { type DefaultNode struct {
scheme string
addr string addr string
weight *int64 weight *int64
version string version string
@@ -15,6 +16,11 @@ type DefaultNode struct {
metadata map[string]string metadata map[string]string
} }
// Scheme is node scheme
func (n *DefaultNode) Scheme() string {
return n.scheme
}
// Address is node address // Address is node address
func (n *DefaultNode) Address() string { func (n *DefaultNode) Address() string {
return n.addr return n.addr
@@ -41,9 +47,10 @@ func (n *DefaultNode) Metadata() map[string]string {
} }
// NewNode new node // NewNode new node
func NewNode(addr string, ins *registry.ServiceInstance) Node { func NewNode(scheme, addr string, ins *registry.ServiceInstance) Node {
n := &DefaultNode{ n := &DefaultNode{
addr: addr, scheme: scheme,
addr: addr,
} }
if ins != nil { if ins != nil {
n.name = ins.Name n.name = ins.Name
+2
View File
@@ -13,6 +13,7 @@ func TestVersion(t *testing.T) {
f := Version("v2.0.0") f := Version("v2.0.0")
var nodes []selector.Node var nodes []selector.Node
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@@ -22,6 +23,7 @@ func TestVersion(t *testing.T) {
})) }))
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.2:9090", "127.0.0.2:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.2:9090", ID: "127.0.0.2:9090",
+2
View File
@@ -13,6 +13,7 @@ import (
func TestDirect(t *testing.T) { func TestDirect(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@@ -42,6 +43,7 @@ func TestDirect(t *testing.T) {
func TestDirectDefaultWeight(t *testing.T) { func TestDirectDefaultWeight(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
+3
View File
@@ -13,6 +13,7 @@ import (
func TestDirect(t *testing.T) { func TestDirect(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@@ -53,6 +54,7 @@ func TestDirect(t *testing.T) {
func TestDirectError(t *testing.T) { func TestDirectError(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@@ -89,6 +91,7 @@ func TestDirectErrorHandler(t *testing.T) {
}, },
} }
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
+2
View File
@@ -21,6 +21,7 @@ func TestWrr3(t *testing.T) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i) addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
addr, addr,
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: addr, ID: addr,
@@ -96,6 +97,7 @@ func TestOne(t *testing.T) {
for i := 0; i < 1; i++ { for i := 0; i < 1; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i) addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
addr, addr,
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: addr, ID: addr,
+2
View File
@@ -13,6 +13,7 @@ func TestWrr(t *testing.T) {
random := New(WithFilter(filter.Version("v2.0.0"))) random := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node var nodes []selector.Node
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:8080", "127.0.0.1:8080",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:8080", ID: "127.0.0.1:8080",
@@ -20,6 +21,7 @@ func TestWrr(t *testing.T) {
Metadata: map[string]string{"weight": "10"}, Metadata: map[string]string{"weight": "10"},
})) }))
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
+3
View File
@@ -31,6 +31,9 @@ type Builder interface {
// Node is node interface. // Node is node interface.
type Node interface { type Node interface {
// Scheme is service node scheme
Scheme() string
// Address is the unique address under the same service // Address is the unique address under the same service
Address() string Address() string
+2
View File
@@ -89,6 +89,7 @@ func TestDefault(t *testing.T) {
selector := builder.Build() selector := builder.Build()
var nodes []Node var nodes []Node
nodes = append(nodes, NewNode( nodes = append(nodes, NewNode(
"http",
"127.0.0.1:8080", "127.0.0.1:8080",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:8080", ID: "127.0.0.1:8080",
@@ -98,6 +99,7 @@ func TestDefault(t *testing.T) {
Metadata: map[string]string{"weight": "10"}, Metadata: map[string]string{"weight": "10"},
})) }))
nodes = append(nodes, NewNode( nodes = append(nodes, NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
+2
View File
@@ -14,6 +14,7 @@ func TestWrr(t *testing.T) {
wrr := New(WithFilter(filter.Version("v2.0.0"))) wrr := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node var nodes []selector.Node
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:8080", "127.0.0.1:8080",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:8080", ID: "127.0.0.1:8080",
@@ -21,6 +22,7 @@ func TestWrr(t *testing.T) {
Metadata: map[string]string{"weight": "10"}, Metadata: map[string]string{"weight": "10"},
})) }))
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
+1 -1
View File
@@ -58,7 +58,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
for conn, info := range info.ReadySCs { for conn, info := range info.ReadySCs {
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance) ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
nodes = append(nodes, &grpcNode{ nodes = append(nodes, &grpcNode{
Node: selector.NewNode(info.Address.Addr, ins), Node: selector.NewNode("grpc", info.Address.Addr, ins),
subConn: conn, subConn: conn,
}) })
} }
+1 -1
View File
@@ -122,7 +122,7 @@ func (r *resolver) update(services []*registry.ServiceInstance) bool {
if ept == "" { if ept == "" {
continue continue
} }
nodes = append(nodes, selector.NewNode(ept, ins)) nodes = append(nodes, selector.NewNode("http", ept, ins))
} }
if len(nodes) == 0 { if len(nodes) == 0 {
r.logger.Warnf("[http resolver]Zero endpoint found,refused to write,set: %s ins: %v", r.target.Endpoint, nodes) r.logger.Warnf("[http resolver]Zero endpoint found,refused to write,set: %s ins: %v", r.target.Endpoint, nodes)