mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-28 03:57:02 +02:00
Discovery For Tencent Polaris (#1839)
* feat (registry/polaris): discovery for polaris 1.add heartbeat report option 2.implements GetService function 3.implements Watch function * test (registry/polaris): test for discovery 1.add GetService test 2.add Watch test * test (registry/polaris): fix lint * fix (registry/polaris): fix missing ServiceInstances
This commit is contained in:
parent
2ee4e5f37a
commit
f3313476ac
@ -2,6 +2,7 @@ package polaris
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@ -11,11 +12,16 @@ import (
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/registry"
|
||||
|
||||
"github.com/polarismesh/polaris-go/api"
|
||||
"github.com/polarismesh/polaris-go/pkg/config"
|
||||
"github.com/polarismesh/polaris-go/pkg/model"
|
||||
)
|
||||
|
||||
var _ registry.Registrar = (*Registry)(nil)
|
||||
var (
|
||||
_ registry.Registrar = (*Registry)(nil)
|
||||
_ registry.Discovery = (*Registry)(nil)
|
||||
)
|
||||
|
||||
// _instanceIDSeparator . Instance id Separator.
|
||||
const _instanceIDSeparator = "-"
|
||||
@ -40,6 +46,9 @@ type options struct {
|
||||
// To show service is healthy or not. Default value is True .
|
||||
Healthy bool
|
||||
|
||||
// Heartbeat enable .Not in polaris . Default value is True.
|
||||
Heartbeat bool
|
||||
|
||||
// To show service is isolate or not. Default value is False .
|
||||
Isolate bool
|
||||
|
||||
@ -61,6 +70,7 @@ type Option func(o *options)
|
||||
type Registry struct {
|
||||
opt options
|
||||
provider api.ProviderAPI
|
||||
consumer api.ConsumerAPI
|
||||
}
|
||||
|
||||
// WithNamespace with Namespace option.
|
||||
@ -108,7 +118,12 @@ func WithRetryCount(retryCount int) Option {
|
||||
return func(o *options) { o.RetryCount = retryCount }
|
||||
}
|
||||
|
||||
func NewRegistry(provider api.ProviderAPI, opts ...Option) (r *Registry) {
|
||||
// WithHeartbeat . with Heartbeat option.
|
||||
func WithHeartbeat(heartbeat bool) Option {
|
||||
return func(o *options) { o.Heartbeat = heartbeat }
|
||||
}
|
||||
|
||||
func NewRegistry(provider api.ProviderAPI, consumer api.ConsumerAPI, opts ...Option) (r *Registry) {
|
||||
op := options{
|
||||
Namespace: "default",
|
||||
ServiceToken: "",
|
||||
@ -116,6 +131,7 @@ func NewRegistry(provider api.ProviderAPI, opts ...Option) (r *Registry) {
|
||||
Weight: 0,
|
||||
Priority: 0,
|
||||
Healthy: true,
|
||||
Heartbeat: true,
|
||||
Isolate: false,
|
||||
TTL: 0,
|
||||
Timeout: 0,
|
||||
@ -127,9 +143,22 @@ func NewRegistry(provider api.ProviderAPI, opts ...Option) (r *Registry) {
|
||||
return &Registry{
|
||||
opt: op,
|
||||
provider: provider,
|
||||
consumer: consumer,
|
||||
}
|
||||
}
|
||||
|
||||
func NewRegistryWithConfig(conf config.Configuration, opts ...Option) (r *Registry) {
|
||||
provider, err := api.NewProviderAPIByConfig(conf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
consumer, err := api.NewConsumerAPIByConfig(conf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return NewRegistry(provider, consumer, opts...)
|
||||
}
|
||||
|
||||
// Register the registration.
|
||||
func (r *Registry) Register(_ context.Context, serviceInstance *registry.ServiceInstance) error {
|
||||
ids := make([]string, 0, len(serviceInstance.Endpoints))
|
||||
@ -193,32 +222,34 @@ func (r *Registry) Register(_ context.Context, serviceInstance *registry.Service
|
||||
}
|
||||
instanceID := service.InstanceID
|
||||
|
||||
// start heartbeat report
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second * time.Duration(r.opt.TTL))
|
||||
defer ticker.Stop()
|
||||
if r.opt.Heartbeat {
|
||||
// start heartbeat report
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second * time.Duration(r.opt.TTL))
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
<-ticker.C
|
||||
for {
|
||||
<-ticker.C
|
||||
|
||||
err = r.provider.Heartbeat(&api.InstanceHeartbeatRequest{
|
||||
InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{
|
||||
Service: serviceInstance.Name + u.Scheme,
|
||||
Namespace: r.opt.Namespace,
|
||||
Host: host,
|
||||
Port: portNum,
|
||||
ServiceToken: r.opt.ServiceToken,
|
||||
InstanceID: instanceID,
|
||||
Timeout: &r.opt.Timeout,
|
||||
RetryCount: &r.opt.RetryCount,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
continue
|
||||
err = r.provider.Heartbeat(&api.InstanceHeartbeatRequest{
|
||||
InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{
|
||||
Service: serviceInstance.Name + u.Scheme,
|
||||
Namespace: r.opt.Namespace,
|
||||
Host: host,
|
||||
Port: portNum,
|
||||
ServiceToken: r.opt.ServiceToken,
|
||||
InstanceID: instanceID,
|
||||
Timeout: &r.opt.Timeout,
|
||||
RetryCount: &r.opt.RetryCount,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
}
|
||||
|
||||
ids = append(ids, instanceID)
|
||||
}
|
||||
@ -228,7 +259,7 @@ func (r *Registry) Register(_ context.Context, serviceInstance *registry.Service
|
||||
}
|
||||
|
||||
// Deregister the registration.
|
||||
func (r *Registry) Deregister(ctx context.Context, serviceInstance *registry.ServiceInstance) error {
|
||||
func (r *Registry) Deregister(_ context.Context, serviceInstance *registry.ServiceInstance) error {
|
||||
split := strings.Split(serviceInstance.ID, _instanceIDSeparator)
|
||||
for i, endpoint := range serviceInstance.Endpoints {
|
||||
// get url
|
||||
@ -269,3 +300,138 @@ func (r *Registry) Deregister(ctx context.Context, serviceInstance *registry.Ser
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetService return the service instances in memory according to the service name.
|
||||
func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
|
||||
// get all instances
|
||||
instancesResponse, err := r.consumer.GetAllInstances(&api.GetAllInstancesRequest{
|
||||
GetAllInstancesRequest: model.GetAllInstancesRequest{
|
||||
Service: serviceName,
|
||||
Namespace: r.opt.Namespace,
|
||||
Timeout: &r.opt.Timeout,
|
||||
RetryCount: &r.opt.RetryCount,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceInstances := instancesToServiceInstances(instancesResponse.GetInstances())
|
||||
|
||||
return serviceInstances, nil
|
||||
}
|
||||
|
||||
// Watch creates a watcher according to the service name.
|
||||
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
|
||||
return newWatcher(ctx, r.opt.Namespace, serviceName, r.consumer)
|
||||
}
|
||||
|
||||
type Watcher struct {
|
||||
ServiceName string
|
||||
Namespace string
|
||||
Ctx context.Context
|
||||
Cancel context.CancelFunc
|
||||
Channel <-chan model.SubScribeEvent
|
||||
ServiceInstances []*registry.ServiceInstance
|
||||
}
|
||||
|
||||
func newWatcher(ctx context.Context, namespace string, serviceName string, consumer api.ConsumerAPI) (*Watcher, error) {
|
||||
watchServiceResponse, err := consumer.WatchService(&api.WatchServiceRequest{
|
||||
WatchServiceRequest: model.WatchServiceRequest{
|
||||
Key: model.ServiceKey{
|
||||
Namespace: namespace,
|
||||
Service: serviceName,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w := &Watcher{
|
||||
Namespace: namespace,
|
||||
ServiceName: serviceName,
|
||||
Channel: watchServiceResponse.EventChannel,
|
||||
ServiceInstances: instancesToServiceInstances(watchServiceResponse.GetAllInstancesResp.GetInstances()),
|
||||
}
|
||||
w.Ctx, w.Cancel = context.WithCancel(ctx)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Next returns services in the following two cases:
|
||||
// 1.the first time to watch and the service instance list is not empty.
|
||||
// 2.any service instance changes found.
|
||||
// if the above two conditions are not met, it will block until context deadline exceeded or canceled
|
||||
func (w *Watcher) Next() ([]*registry.ServiceInstance, error) {
|
||||
select {
|
||||
case <-w.Ctx.Done():
|
||||
return nil, w.Ctx.Err()
|
||||
case event := <-w.Channel:
|
||||
if event.GetSubScribeEventType() == model.EventInstance {
|
||||
// this always true, but we need to check it to make sure EventType not change
|
||||
if instanceEvent, ok := event.(*model.InstanceEvent); ok {
|
||||
// handle DeleteEvent
|
||||
if instanceEvent.DeleteEvent != nil {
|
||||
for _, instance := range instanceEvent.DeleteEvent.Instances {
|
||||
for i, serviceInstance := range w.ServiceInstances {
|
||||
if serviceInstance.ID == instance.GetId() {
|
||||
// remove equal
|
||||
if len(w.ServiceInstances) <= 1 {
|
||||
w.ServiceInstances = w.ServiceInstances[0:0]
|
||||
continue
|
||||
}
|
||||
w.ServiceInstances = append(w.ServiceInstances[:i], w.ServiceInstances[i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// handle UpdateEvent
|
||||
if instanceEvent.UpdateEvent != nil {
|
||||
for i, serviceInstance := range w.ServiceInstances {
|
||||
for _, update := range instanceEvent.UpdateEvent.UpdateList {
|
||||
if serviceInstance.ID == update.Before.GetId() {
|
||||
w.ServiceInstances[i] = instanceToServiceInstance(update.After)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// handle AddEvent
|
||||
if instanceEvent.AddEvent != nil {
|
||||
w.ServiceInstances = append(w.ServiceInstances, instancesToServiceInstances(instanceEvent.AddEvent.Instances)...)
|
||||
}
|
||||
}
|
||||
return w.ServiceInstances, nil
|
||||
}
|
||||
}
|
||||
return w.ServiceInstances, nil
|
||||
}
|
||||
|
||||
// Stop close the watcher.
|
||||
func (w *Watcher) Stop() error {
|
||||
w.Cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
func instancesToServiceInstances(instances []model.Instance) []*registry.ServiceInstance {
|
||||
serviceInstances := make([]*registry.ServiceInstance, 0, len(instances))
|
||||
for _, instance := range instances {
|
||||
serviceInstances = append(serviceInstances, instanceToServiceInstance(instance))
|
||||
}
|
||||
return serviceInstances
|
||||
}
|
||||
|
||||
func instanceToServiceInstance(instance model.Instance) *registry.ServiceInstance {
|
||||
metadata := instance.GetMetadata()
|
||||
// Usually, it won't fail in kratos if register correctly
|
||||
kind := ""
|
||||
if k, ok := metadata["kind"]; ok {
|
||||
kind = k
|
||||
}
|
||||
return ®istry.ServiceInstance{
|
||||
ID: instance.GetId(),
|
||||
Name: instance.GetService(),
|
||||
Version: metadata["version"],
|
||||
Metadata: metadata,
|
||||
Endpoints: []string{fmt.Sprintf("%s://%s:%d", kind, instance.GetHost(), instance.GetPort())},
|
||||
}
|
||||
}
|
||||
|
@ -5,25 +5,21 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
|
||||
"github.com/polarismesh/polaris-go/pkg/config"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/registry"
|
||||
"github.com/polarismesh/polaris-go/api"
|
||||
)
|
||||
|
||||
// TestRegistry . TestRegistryManyService
|
||||
func TestRegistry(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
provider, err := api.NewProviderAPIByConfig(conf)
|
||||
defer provider.Destroy()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := NewRegistry(
|
||||
provider,
|
||||
WithTimeout(1*time.Second),
|
||||
WithTTL(5),
|
||||
r := NewRegistryWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
ctx := context.Background()
|
||||
@ -35,7 +31,7 @@ func TestRegistry(t *testing.T) {
|
||||
Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"},
|
||||
}
|
||||
|
||||
err = r.Register(ctx, svc)
|
||||
err := r.Register(ctx, svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -49,38 +45,33 @@ func TestRegistry(t *testing.T) {
|
||||
// TestRegistryMany . TestRegistryManyService
|
||||
func TestRegistryMany(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
provider, err := api.NewProviderAPIByConfig(conf)
|
||||
defer provider.Destroy()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := NewRegistry(
|
||||
provider,
|
||||
WithTimeout(1*time.Second),
|
||||
WithTTL(10),
|
||||
r := NewRegistryWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
svc := ®istry.ServiceInstance{
|
||||
Name: "kratos-provider-0-",
|
||||
Name: "kratos-provider-1-",
|
||||
Version: "test",
|
||||
Metadata: map[string]string{"app": "kratos"},
|
||||
Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"},
|
||||
}
|
||||
svc1 := ®istry.ServiceInstance{
|
||||
Name: "kratos-provider-1-",
|
||||
Name: "kratos-provider-2-",
|
||||
Version: "test",
|
||||
Metadata: map[string]string{"app": "kratos"},
|
||||
Endpoints: []string{"tcp://127.0.0.1:9001?isSecure=false"},
|
||||
}
|
||||
svc2 := ®istry.ServiceInstance{
|
||||
Name: "kratos-provider-2-",
|
||||
Name: "kratos-provider-3-",
|
||||
Version: "test",
|
||||
Metadata: map[string]string{"app": "kratos"},
|
||||
Endpoints: []string{"tcp://127.0.0.1:9002?isSecure=false"},
|
||||
}
|
||||
|
||||
err = r.Register(context.Background(), svc)
|
||||
err := r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -110,3 +101,106 @@ func TestRegistryMany(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetService . TestGetService
|
||||
func TestGetService(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
|
||||
r := NewRegistryWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
svc := ®istry.ServiceInstance{
|
||||
Name: "kratos-provider-4-",
|
||||
Version: "test",
|
||||
Metadata: map[string]string{"app": "kratos"},
|
||||
Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"},
|
||||
}
|
||||
|
||||
err := r.Register(ctx, svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(time.Second * 1)
|
||||
serviceInstances, err := r.GetService(ctx, "kratos-provider-4-tcp")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range serviceInstances {
|
||||
log.Info(instance)
|
||||
}
|
||||
|
||||
err = r.Deregister(ctx, svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatch . TestWatch
|
||||
func TestWatch(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
|
||||
r := NewRegistryWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
svc := ®istry.ServiceInstance{
|
||||
Name: "kratos-provider-4-",
|
||||
Version: "test",
|
||||
Metadata: map[string]string{"app": "kratos"},
|
||||
Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"},
|
||||
}
|
||||
|
||||
watch, err := r.Watch(context.Background(), "kratos-provider-4-tcp")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// watch svc
|
||||
time.Sleep(time.Second * 1)
|
||||
|
||||
// svc register, AddEvent
|
||||
next, err := watch.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output one instance
|
||||
log.Info(instance)
|
||||
}
|
||||
|
||||
err = r.Deregister(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// svc deregister, DeleteEvent
|
||||
next, err = watch.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output nothing
|
||||
log.Info(instance)
|
||||
}
|
||||
|
||||
err = watch.Stop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = watch.Next()
|
||||
if err == nil {
|
||||
// if nil, stop failed
|
||||
t.Fatal()
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user