mirror of
				https://github.com/go-kratos/kratos.git
				synced 2025-10-30 23:47:59 +02:00 
			
		
		
		
	feat(registry/nacos): support service discovery weight (#3561)
* feat(registry/nacos): support service discovery weight - Add support for reading weight information from metadata during service registration and discovery - Update test cases to use non-local test server addresses * chore(registry/nacos): fmt fix --------- Co-authored-by: Cluas <Cluas@live.cn>
This commit is contained in:
		| @@ -4,6 +4,7 @@ import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
| 	"net" | ||||
| 	"net/url" | ||||
| 	"strconv" | ||||
| @@ -100,6 +101,7 @@ func (r *Registry) Register(_ context.Context, si *registry.ServiceInstance) err | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		weight := r.opts.weight | ||||
| 		var rmd map[string]string | ||||
| 		if si.Metadata == nil { | ||||
| 			rmd = map[string]string{ | ||||
| @@ -113,12 +115,18 @@ func (r *Registry) Register(_ context.Context, si *registry.ServiceInstance) err | ||||
| 			} | ||||
| 			rmd["kind"] = u.Scheme | ||||
| 			rmd["version"] = si.Version | ||||
| 			if w, ok := si.Metadata["weight"]; ok { | ||||
| 				weight, err = strconv.ParseFloat(w, 64) | ||||
| 				if err != nil { | ||||
| 					weight = r.opts.weight | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		_, e := r.cli.RegisterInstance(vo.RegisterInstanceParam{ | ||||
| 			Ip:          host, | ||||
| 			Port:        uint64(p), | ||||
| 			ServiceName: si.Name + "." + u.Scheme, | ||||
| 			Weight:      r.opts.weight, | ||||
| 			Weight:      weight, | ||||
| 			Enable:      true, | ||||
| 			Healthy:     true, | ||||
| 			Ephemeral:   true, | ||||
| @@ -180,16 +188,23 @@ func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registr | ||||
| 	items := make([]*registry.ServiceInstance, 0, len(res)) | ||||
| 	for _, in := range res { | ||||
| 		kind := r.opts.kind | ||||
| 		weight := r.opts.weight | ||||
| 		if k, ok := in.Metadata["kind"]; ok { | ||||
| 			kind = k | ||||
| 		} | ||||
| 		items = append(items, ®istry.ServiceInstance{ | ||||
| 		if in.Weight > 0 { | ||||
| 			weight = in.Weight | ||||
| 		} | ||||
|  | ||||
| 		r := ®istry.ServiceInstance{ | ||||
| 			ID:        in.InstanceId, | ||||
| 			Name:      in.ServiceName, | ||||
| 			Version:   in.Metadata["version"], | ||||
| 			Metadata:  in.Metadata, | ||||
| 			Endpoints: []string{fmt.Sprintf("%s://%s:%d", kind, in.Ip, in.Port)}, | ||||
| 		}) | ||||
| 		} | ||||
| 		r.Metadata["weight"] = strconv.Itoa(int(math.Ceil(weight))) | ||||
| 		items = append(items, r) | ||||
| 	} | ||||
| 	return items, nil | ||||
| } | ||||
|   | ||||
| @@ -13,10 +13,12 @@ import ( | ||||
| 	"github.com/go-kratos/kratos/v2/registry" | ||||
| ) | ||||
|  | ||||
| var testServerConfig = []constant.ServerConfig{ | ||||
| 	*constant.NewServerConfig("127.0.0.1", 8848), | ||||
| } | ||||
|  | ||||
| func TestRegistry_Register(t *testing.T) { | ||||
| 	sc := []constant.ServerConfig{ | ||||
| 		*constant.NewServerConfig("127.0.0.1", 8848), | ||||
| 	} | ||||
| 	sc := testServerConfig | ||||
|  | ||||
| 	cc := constant.ClientConfig{ | ||||
| 		NamespaceId:         "public", // namespace id | ||||
| @@ -231,9 +233,7 @@ func TestRegistry_Deregister(t *testing.T) { | ||||
| 			}, | ||||
| 			wantErr: false, | ||||
| 			preFunc: func(t *testing.T) { | ||||
| 				sc := []constant.ServerConfig{ | ||||
| 					*constant.NewServerConfig("127.0.0.1", 8848), | ||||
| 				} | ||||
| 				sc := testServerConfig | ||||
|  | ||||
| 				cc := constant.ClientConfig{ | ||||
| 					NamespaceId:         "public", // namespace id | ||||
| @@ -292,9 +292,7 @@ func TestRegistry_Deregister(t *testing.T) { | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			sc := []constant.ServerConfig{ | ||||
| 				*constant.NewServerConfig("127.0.0.1", 8848), | ||||
| 			} | ||||
| 			sc := testServerConfig | ||||
|  | ||||
| 			cc := constant.ClientConfig{ | ||||
| 				NamespaceId:         "public", // namespace id | ||||
| @@ -329,9 +327,7 @@ func TestRegistry_Deregister(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestRegistry_GetService(t *testing.T) { | ||||
| 	sc := []constant.ServerConfig{ | ||||
| 		*constant.NewServerConfig("127.0.0.1", 8848), | ||||
| 	} | ||||
| 	sc := testServerConfig | ||||
|  | ||||
| 	cc := constant.ClientConfig{ | ||||
| 		NamespaceId:         "public", // namespace id | ||||
| @@ -385,7 +381,7 @@ func TestRegistry_GetService(t *testing.T) { | ||||
| 				if err != nil { | ||||
| 					t.Error(err) | ||||
| 				} | ||||
| 				time.Sleep(time.Second) | ||||
| 				time.Sleep(time.Second * 3) | ||||
| 			}, | ||||
| 			deferFunc: func(t *testing.T) { | ||||
| 				err = r.Deregister(context.Background(), testServer) | ||||
| @@ -404,7 +400,7 @@ func TestRegistry_GetService(t *testing.T) { | ||||
| 				ID:        "127.0.0.1#8080#DEFAULT#DEFAULT_GROUP@@test3.grpc", | ||||
| 				Name:      "DEFAULT_GROUP@@test3.grpc", | ||||
| 				Version:   "v1.0.0", | ||||
| 				Metadata:  map[string]string{"version": "v1.0.0", "kind": "grpc"}, | ||||
| 				Metadata:  map[string]string{"version": "v1.0.0", "kind": "grpc", "weight": "100"}, | ||||
| 				Endpoints: []string{"grpc://127.0.0.1:8080"}, | ||||
| 			}}, | ||||
| 			wantErr: false, | ||||
| @@ -445,9 +441,7 @@ func TestRegistry_GetService(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestRegistry_Watch(t *testing.T) { | ||||
| 	sc := []constant.ServerConfig{ | ||||
| 		*constant.NewServerConfig("127.0.0.1", 8848), | ||||
| 	} | ||||
| 	sc := testServerConfig | ||||
|  | ||||
| 	cc := constant.ClientConfig{ | ||||
| 		NamespaceId:         "public", // namespace id | ||||
|   | ||||
		Reference in New Issue
	
	Block a user