diff --git a/contrib/registry/discovery/README.md b/contrib/registry/discovery/README.md new file mode 100644 index 000000000..1fa340973 --- /dev/null +++ b/contrib/registry/discovery/README.md @@ -0,0 +1,2 @@ +## [discovery](https://github.com/bilibili/discovery) + diff --git a/contrib/registry/discovery/discovery.go b/contrib/registry/discovery/discovery.go new file mode 100644 index 000000000..d122989b4 --- /dev/null +++ b/contrib/registry/discovery/discovery.go @@ -0,0 +1,494 @@ +package discovery + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "os" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/go-resty/resty/v2" + "github.com/pkg/errors" + + "github.com/go-kratos/kratos/v2/log" +) + +type Discovery struct { + config *Config + once sync.Once + ctx context.Context + cancelFunc context.CancelFunc + httpClient *resty.Client + + node atomic.Value + nodeIdx uint64 + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} + lastHost string + cancelPolls context.CancelFunc + + logger log.Logger +} + +type appInfo struct { + resolver map[*Resolve]struct{} + zoneIns atomic.Value + lastTs int64 // latest timestamp +} + +// New construct a Discovery instance which implements registry.Registrar, +// registry.Discovery and registry.Watcher. +func New(c *Config, logger log.Logger) *Discovery { + if logger == nil { + logger = log.NewStdLogger(os.Stdout) + logger = log.With(logger, + "registry.pluginName", "Discovery", + "ts", log.DefaultTimestamp, + "caller", log.DefaultCaller, + ) + } + if c == nil { + c = new(Config) + } + if err := fixConfig(c); err != nil { + panic(err) + } + ctx, cancel := context.WithCancel(context.Background()) + d := &Discovery{ + config: c, + ctx: ctx, + cancelFunc: cancel, + apps: map[string]*appInfo{}, + registry: map[string]struct{}{}, + logger: logger, + } + + d.httpClient = resty.New(). + SetTimeout(40 * time.Second) + + // Discovery self found and watch + r := d.resolveBuild(_discoveryAppID) + event := r.Watch() + _, ok := <-event + if !ok { + panic("Discovery watch self failed") + } + discoveryIns, ok := r.Fetch(context.Background()) + if ok { + d.newSelf(discoveryIns.Instances) + } + go d.selfProc(r, event) + + return d +} + +// Close stop all running process including Discovery and register +func (d *Discovery) Close() error { + d.cancelFunc() + return nil +} + +func (d *Discovery) Logger() *log.Helper { + return log.NewHelper(d.logger) +} + +// selfProc start a goroutine to refresh Discovery self registration information. +func (d *Discovery) selfProc(resolver *Resolve, event <-chan struct{}) { + for { + _, ok := <-event + if !ok { + return + } + zones, ok := resolver.Fetch(context.Background()) + if ok { + d.newSelf(zones.Instances) + } + } +} + +// newSelf +func (d *Discovery) newSelf(zones map[string][]*discoveryInstance) { + ins, ok := zones[d.config.Zone] + if !ok { + return + } + var nodes []string + for _, in := range ins { + for _, addr := range in.Addrs { + u, err := url.Parse(addr) + if err == nil && u.Scheme == "http" { + nodes = append(nodes, u.Host) + } + } + } + // diff old nodes + var olds int + for _, n := range nodes { + if node, ok := d.node.Load().([]string); ok { + for _, o := range node { + if o == n { + olds++ + break + } + } + } + } + if len(nodes) == olds { + return + } + + rand.Shuffle(len(nodes), func(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] + }) + d.node.Store(nodes) +} + +// resolveBuild Discovery resolver builder. +func (d *Discovery) resolveBuild(appID string) *Resolve { + r := &Resolve{ + id: appID, + d: d, + event: make(chan struct{}, 1), + } + + d.mutex.Lock() + app, ok := d.apps[appID] + if !ok { + app = &appInfo{ + resolver: make(map[*Resolve]struct{}), + } + d.apps[appID] = app + cancel := d.cancelPolls + if cancel != nil { + cancel() + } + } + app.resolver[r] = struct{}{} + d.mutex.Unlock() + if ok { + select { + case r.event <- struct{}{}: + default: + } + } + + d.Logger().Debugf("Discovery: AddWatch(%s) already watch(%v)", appID, ok) + d.once.Do(func() { + go d.serverProc() + }) + return r +} + +func (d *Discovery) serverProc() { + defer d.Logger().Debug("Discovery serverProc quit") + + var ( + retry int + ctx context.Context + cancel context.CancelFunc + ) + + ticker := time.NewTicker(time.Minute * 30) + defer ticker.Stop() + + for { + if ctx == nil { + ctx, cancel = context.WithCancel(d.ctx) + d.mutex.Lock() + d.cancelPolls = cancel + d.mutex.Unlock() + } + select { + case <-d.ctx.Done(): + return + case <-ticker.C: + d.switchNode() + default: + } + + apps, err := d.polls(ctx) + if err != nil { + d.switchNode() + if ctx.Err() == context.Canceled { + ctx = nil + continue + } + time.Sleep(time.Second) + retry++ + continue + } + retry = 0 + d.broadcast(apps) + } +} + +func (d *Discovery) pickNode() string { + nodes, ok := d.node.Load().([]string) + if !ok || len(nodes) == 0 { + return d.config.Nodes[rand.Intn(len(d.config.Nodes))] + } + return nodes[atomic.LoadUint64(&d.nodeIdx)%uint64(len(nodes))] +} + +func (d *Discovery) switchNode() { + atomic.AddUint64(&d.nodeIdx, 1) +} + +// renew an instance with Discovery +func (d *Discovery) renew(ctx context.Context, ins *discoveryInstance) (err error) { + // d.Logger().Debug("Discovery:renew renew calling") + + d.mutex.RLock() + c := d.config + d.mutex.RUnlock() + + res := new(discoveryCommonResp) + uri := fmt.Sprintf(_renewURL, d.pickNode()) + + // construct parameters to renew + p := newParams(d.config) + p.Set(_paramKeyAppID, ins.AppID) + + // send request to Discovery server. + if _, err = d.httpClient.R(). + SetContext(ctx). + SetQueryParamsFromValues(p). + SetResult(&res). + Post(uri); err != nil { + d.switchNode() + d.Logger().Errorf("Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", + uri, c.Env, ins.AppID, c.Host, err) + return + } + + if res.Code != _codeOK { + err = fmt.Errorf("discovery.renew failed ErrorCode: %d", res.Code) + if res.Code == _codeNotFound { + if err = d.register(ctx, ins); err != nil { + err = errors.Wrap(err, "Discovery.renew instance, and failed to register ins") + } + return + } + + d.Logger().Errorf( + "Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", + uri, c.Env, ins.AppID, c.Host, res.Code, + ) + } + + return +} + +// cancel Remove the registered instance from Discovery +func (d *Discovery) cancel(ins *discoveryInstance) (err error) { + d.mutex.RLock() + config := d.config + d.mutex.RUnlock() + + res := new(discoveryCommonResp) + uri := fmt.Sprintf(_cancelURL, d.pickNode()) + p := newParams(d.config) + p.Set(_paramKeyAppID, ins.AppID) + + // request + // send request to Discovery server. + if _, err = d.httpClient.R(). + SetContext(context.TODO()). + SetQueryParamsFromValues(p). + SetResult(&res). + Post(uri); err != nil { + d.switchNode() + d.Logger().Errorf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", + uri, config.Env, ins.AppID, config.Host, err) + return + } + + // handle response error + if res.Code != _codeOK { + if res.Code == _codeNotFound { + return nil + } + + d.Logger().Warnf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", + uri, config.Env, ins.AppID, config.Host, res.Code) + err = fmt.Errorf("ErrorCode: %d", res.Code) + return + } + + return +} + +func (d *Discovery) broadcast(apps map[string]*disInstancesInfo) { + for appID, v := range apps { + var count int + // v maybe nil in old version(less than v1.1) Discovery,check incase of panic + if v == nil { + continue + } + for zone, ins := range v.Instances { + if len(ins) == 0 { + delete(v.Instances, zone) + } + count += len(ins) + } + if count == 0 { + continue + } + d.mutex.RLock() + app, ok := d.apps[appID] + d.mutex.RUnlock() + if ok { + app.lastTs = v.LastTs + app.zoneIns.Store(v) + d.mutex.RLock() + for rs := range app.resolver { + select { + case rs.event <- struct{}{}: + default: + } + } + d.mutex.RUnlock() + } + } +} + +func (d *Discovery) polls(ctx context.Context) (apps map[string]*disInstancesInfo, err error) { + var ( + lastTss = make([]int64, 0, 4) + appIDs = make([]string, 0, 16) + host = d.pickNode() + changed bool + ) + if host != d.lastHost { + d.lastHost = host + changed = true + } + + d.mutex.RLock() + config := d.config + for k, v := range d.apps { + if changed { + v.lastTs = 0 + } + appIDs = append(appIDs, k) + lastTss = append(lastTss, v.lastTs) + } + d.mutex.RUnlock() + + // if there is no app, polls just return. + if len(appIDs) == 0 { + return + } + + uri := fmt.Sprintf(_pollURL, host) + res := new(discoveryPollsResp) + + // params + p := newParams(nil) + p.Set(_paramKeyEnv, config.Env) + p.Set(_paramKeyHostname, config.Host) + for _, appID := range appIDs { + p.Add(_paramKeyAppID, appID) + } + for _, ts := range lastTss { + p.Add("latest_timestamp", strconv.FormatInt(ts, 10)) + } + + // request + reqURI := uri + "?" + p.Encode() + if _, err = d.httpClient.R(). + SetContext(ctx). + SetQueryParamsFromValues(p). + SetResult(res).Get(uri); err != nil { + d.switchNode() + d.Logger().Errorf("Discovery: client.Get(%s) error(%+v)", reqURI, err) + return nil, err + } + + if res.Code != _codeOK { + if res.Code != _codeNotModified { + d.Logger().Errorf("Discovery: client.Get(%s) get error code(%d)", reqURI, res.Code) + } + err = fmt.Errorf("discovery.polls failed ErrCode: %d", res.Code) + return + } + + for _, app := range res.Data { + if app.LastTs == 0 { + err = ErrServerError + d.Logger().Errorf("Discovery: client.Get(%s) latest_timestamp is 0, instances:(%+v)", reqURI, res.Data) + return + } + } + + d.Logger().Debugf("Discovery: successfully polls(%s) instances (%+v)", reqURI, res.Data) + apps = res.Data + return +} + +// Resolve Discovery resolver. +type Resolve struct { + id string + event chan struct{} + d *Discovery +} + +// Watch instance. +func (r *Resolve) Watch() <-chan struct{} { + return r.event +} + +// Fetch resolver instance. +func (r *Resolve) Fetch(ctx context.Context) (ins *disInstancesInfo, ok bool) { + r.d.mutex.RLock() + app, ok := r.d.apps[r.id] + r.d.mutex.RUnlock() + if ok { + var appIns *disInstancesInfo + appIns, ok = app.zoneIns.Load().(*disInstancesInfo) + if !ok { + return + } + ins = new(disInstancesInfo) + ins.LastTs = appIns.LastTs + ins.Scheduler = appIns.Scheduler + ins.Instances = make(map[string][]*discoveryInstance) + for zone, in := range appIns.Instances { + ins.Instances[zone] = in + } + //if r.opt.Filter != nil { + // ins.Instances = r.opt.Filter(appIns.Instances) + //} else { + // ins.Instances = make(map[string][]*discoveryInstance) + // for zone, in := range appIns.Instances { + // ins.Instances[zone] = in + // } + //} + //if r.opt.scheduler != nil { + // ins.Instances[r.opt.ClientZone] = r.opt.scheduler(ins) + //} + //if r.opt.Subset != nil && r.opt.SubsetSize != 0 { + // for zone, inss := range ins.Instances { + // ins.Instances[zone] = r.opt.Subset(inss, r.opt.SubsetSize) + // } + //} + } + return +} + +// Close resolver +func (r *Resolve) Close() error { + r.d.mutex.Lock() + if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 { + delete(app.resolver, r) + // TODO: delete app from builder + } + r.d.mutex.Unlock() + return nil +} diff --git a/contrib/registry/discovery/discovery_helper.go b/contrib/registry/discovery/discovery_helper.go new file mode 100644 index 000000000..9b13d0e9a --- /dev/null +++ b/contrib/registry/discovery/discovery_helper.go @@ -0,0 +1,185 @@ +package discovery + +import ( + "fmt" + "net/url" + "os" + "strconv" + "time" + + "github.com/pkg/errors" + + "github.com/go-kratos/kratos/v2/registry" +) + +var ( + ErrDuplication = errors.New("register failed: instance duplicated: ") + ErrServerError = errors.New("server error") +) + +const ( + // Discovery server resource uri + _registerURL = "http://%s/discovery/register" + //_setURL = "http://%s/discovery/set" + _cancelURL = "http://%s/discovery/cancel" + _renewURL = "http://%s/discovery/renew" + _pollURL = "http://%s/discovery/polls" + + // Discovery server error codes + _codeOK = 0 + _codeNotFound = -404 + _codeNotModified = -304 + //_SERVER_ERROR = -500 + + // _registerGap is the gap to renew instance registration. + _registerGap = 30 * time.Second + _statusUP = "1" + _discoveryAppID = "infra.discovery" +) + +// Config Discovery configures. +type Config struct { + Nodes []string + Region string + Zone string + Env string + Host string +} + +func fixConfig(c *Config) error { + if c.Host == "" { + c.Host, _ = os.Hostname() + } + if len(c.Nodes) == 0 || c.Region == "" || c.Zone == "" || c.Env == "" || c.Host == "" { + return fmt.Errorf( + "invalid Discovery config nodes:%+v region:%s zone:%s deployEnv:%s host:%s", + c.Nodes, + c.Region, + c.Zone, + c.Env, + c.Host, + ) + } + return nil +} + +// discoveryInstance represents a server the client connects to. +type discoveryInstance struct { + Region string `json:"region"` // Region is region. + Zone string `json:"zone"` // Zone is IDC. + Env string `json:"env"` // Env prod/pre/uat/fat1 + AppID string `json:"appid"` // AppID is mapping service-tree appId. + Hostname string `json:"hostname"` // Hostname is hostname from docker + Addrs []string `json:"addrs"` // Addrs is the address of app instance format: scheme://host + Version string `json:"version"` // Version is publishing version. + LastTs int64 `json:"latest_timestamp"` // LastTs is instance latest updated timestamp + // Metadata is the information associated with Addr, which may be used to make load balancing decision. + Metadata map[string]string `json:"metadata"` + Status int64 `json:"status"` // Status instance status, eg: 1UP 2Waiting +} + +const _reservedInstanceIDKey = "kratos.v2.serviceinstance.id" + +// fromServerInstance convert registry.ServiceInstance into discoveryInstance +func fromServerInstance(ins *registry.ServiceInstance, config *Config) *discoveryInstance { + if ins == nil { + return nil + } + + metadata := ins.Metadata + if ins.Metadata == nil { + metadata = make(map[string]string, 8) + } + metadata[_reservedInstanceIDKey] = ins.ID + + return &discoveryInstance{ + Region: config.Region, + Zone: config.Zone, + Env: config.Env, + AppID: ins.Name, + Hostname: config.Host, + Addrs: ins.Endpoints, + Version: ins.Version, + LastTs: time.Now().Unix(), + Metadata: metadata, + Status: 1, + } +} + +// toServiceInstance convert discoveryInstance into registry.ServiceInstance +func toServiceInstance(ins *discoveryInstance) *registry.ServiceInstance { + if ins == nil { + return nil + } + + return ®istry.ServiceInstance{ + ID: ins.Metadata[_reservedInstanceIDKey], + Name: ins.AppID, + Version: ins.Version, + Metadata: map[string]string{ + "region": ins.Region, + "zone": ins.Region, + "lastTs": strconv.Itoa(int(ins.LastTs)), + "env": ins.Env, + "hostname": ins.Hostname, + }, + Endpoints: ins.Addrs, + } +} + +// disInstancesInfo instance info. +type disInstancesInfo struct { + Instances map[string][]*discoveryInstance `json:"instances"` + LastTs int64 `json:"latest_timestamp"` + Scheduler *scheduler `json:"scheduler"` +} + +// scheduler scheduler. +type scheduler struct { + Clients map[string]*zoneStrategy `json:"clients"` +} + +// zoneStrategy is the scheduling strategy of all zones +type zoneStrategy struct { + Zones map[string]*strategy `json:"zones"` +} + +// strategy is zone scheduling strategy. +type strategy struct { + Weight int64 `json:"weight"` +} + +const ( + _paramKeyRegion = "region" + _paramKeyZone = "zone" + _paramKeyEnv = "env" + _paramKeyHostname = "hostname" + _paramKeyAppID = "appid" + _paramKeyAddrs = "addrs" + _paramKeyVersion = "version" + _paramKeyStatus = "status" + _paramKeyMetadata = "metadata" +) + +func newParams(c *Config) url.Values { + p := make(url.Values, 8) + if c == nil { + return p + } + + p.Set(_paramKeyRegion, c.Region) + p.Set(_paramKeyZone, c.Zone) + p.Set(_paramKeyEnv, c.Env) + p.Set(_paramKeyHostname, c.Host) + return p +} + +type discoveryCommonResp struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type discoveryPollsResp struct { + Code int `json:"code"` + Data map[string]*disInstancesInfo `json:"data"` +} diff --git a/contrib/registry/discovery/go.mod b/contrib/registry/discovery/go.mod new file mode 100644 index 000000000..bd6909efd --- /dev/null +++ b/contrib/registry/discovery/go.mod @@ -0,0 +1,11 @@ +module github.com/go-kratos/kratos/contrib/registry/discovery/v2 + +go 1.16 + +require ( + github.com/go-kratos/kratos/v2 v2.0.5 + github.com/go-resty/resty/v2 v2.6.0 + github.com/pkg/errors v0.9.1 +) + +replace github.com/go-kratos/kratos/v2 v2.0.5 => ../../../ diff --git a/contrib/registry/discovery/go.sum b/contrib/registry/discovery/go.sum new file mode 100644 index 000000000..aa330259e --- /dev/null +++ b/contrib/registry/discovery/go.sum @@ -0,0 +1,158 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kratos/aegis v0.1.1/go.mod h1:jYeSQ3Gesba478zEnujOiG5QdsyF3Xk/8owFUeKcHxw= +github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/form/v4 v4.2.0/go.mod h1:q1a2BY+AQUUzhl6xA/6hBetay6dEIhMHjgvJiGo6K7U= +github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKYS4= +github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q= +github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/shirou/gopsutil/v3 v3.21.8/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= +github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ= +go.opentelemetry.io/otel/sdk v1.0.0-RC3/go.mod h1:78H6hyg2fka0NYT9fqGuFLvly2yCxiBXDJAgLKo/2Us= +go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= +google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/contrib/registry/discovery/impl_discover.go b/contrib/registry/discovery/impl_discover.go new file mode 100644 index 000000000..f6623195b --- /dev/null +++ b/contrib/registry/discovery/impl_discover.go @@ -0,0 +1,81 @@ +package discovery + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + + "github.com/go-kratos/kratos/v2/registry" +) + +func filterInstancesByZone(ins *disInstancesInfo, zone string) []*registry.ServiceInstance { + zoneInstance, ok := ins.Instances[zone] + if !ok || len(zoneInstance) == 0 { + return nil + } + + out := make([]*registry.ServiceInstance, 0, len(zoneInstance)) + for _, v := range zoneInstance { + if v == nil { + continue + } + out = append(out, toServiceInstance(v)) + } + + return out +} + +func (d *Discovery) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) { + r := d.resolveBuild(serviceName) + ins, ok := r.Fetch(ctx) + if !ok { + return nil, errors.New("Discovery.GetService fetch failed") + } + + out := filterInstancesByZone(ins, d.config.Zone) + if len(out) == 0 { + return nil, fmt.Errorf("Discovery.GetService(%s) not found", serviceName) + } + + return out, nil +} + +func (d *Discovery) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { + return &watcher{ + Resolve: d.resolveBuild(serviceName), + serviceName: serviceName, + }, nil +} + +type watcher struct { + *Resolve + + serviceName string +} + +func (w *watcher) Next() ([]*registry.ServiceInstance, error) { + event := w.Resolve.Watch() + // change event come + <-event + + ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second) + defer cancel() + + ins, ok := w.Resolve.Fetch(ctx) + if !ok { + return nil, errors.New("Discovery.GetService fetch failed") + } + + out := filterInstancesByZone(ins, w.Resolve.d.config.Zone) + if len(out) == 0 { + return nil, fmt.Errorf("Discovery.GetService(%s) not found", w.serviceName) + } + + return out, nil +} + +func (w *watcher) Stop() error { + return w.Resolve.Close() +} diff --git a/contrib/registry/discovery/impl_registrar.go b/contrib/registry/discovery/impl_registrar.go new file mode 100644 index 000000000..f044d8b6c --- /dev/null +++ b/contrib/registry/discovery/impl_registrar.go @@ -0,0 +1,127 @@ +package discovery + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/pkg/errors" + + "github.com/go-kratos/kratos/v2/registry" +) + +func (d *Discovery) Register(ctx context.Context, service *registry.ServiceInstance) (err error) { + ins := fromServerInstance(service, d.config) + + d.mutex.Lock() + if _, ok := d.registry[ins.AppID]; ok { + err = errors.Wrap(ErrDuplication, ins.AppID) + } else { + d.registry[ins.AppID] = struct{}{} + } + d.mutex.Unlock() + if err != nil { + return + } + + ctx, cancel := context.WithCancel(d.ctx) + if err = d.register(ctx, ins); err != nil { + d.mutex.Lock() + delete(d.registry, ins.AppID) + d.mutex.Unlock() + cancel() + return + } + + ch := make(chan struct{}, 1) + d.cancelFunc = func() { + cancel() + <-ch + } + + // renew the current register_service + go func() { + defer d.Logger().Warn("Discovery:register_service goroutine quit") + ticker := time.NewTicker(_registerGap) + defer ticker.Stop() + for { + select { + case <-ticker.C: + _ = d.renew(ctx, ins) + case <-ctx.Done(): + _ = d.cancel(ins) + ch <- struct{}{} + return + } + } + }() + + return +} + +// register an instance with Discovery +func (d *Discovery) register(ctx context.Context, ins *discoveryInstance) (err error) { + d.mutex.RLock() + c := d.config + d.mutex.RUnlock() + + var metadata []byte + if ins.Metadata != nil { + if metadata, err = json.Marshal(ins.Metadata); err != nil { + d.Logger().Errorf( + "Discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err, + ) + } + } + res := new(struct { + Code int `json:"code"` + Message string `json:"message"` + }) + uri := fmt.Sprintf(_registerURL, d.pickNode()) + + // params + p := newParams(d.config) + p.Set(_paramKeyAppID, ins.AppID) + for _, addr := range ins.Addrs { + p.Add(_paramKeyAddrs, addr) + } + p.Set(_paramKeyVersion, ins.Version) + if ins.Status == 0 { + p.Set(_paramKeyStatus, _statusUP) + } else { + p.Set(_paramKeyStatus, strconv.FormatInt(ins.Status, 10)) + } + p.Set(_paramKeyMetadata, string(metadata)) + + // send request to Discovery server. + if _, err = d.httpClient.R(). + SetContext(ctx). + SetQueryParamsFromValues(p). + SetResult(&res). + Post(uri); err != nil { + d.switchNode() + d.Logger().Errorf("Discovery: register client.Get(%s) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", + uri+"?"+p.Encode(), c.Zone, c.Env, ins.AppID, ins.Addrs, err) + return + } + + if res.Code != 0 { + err = fmt.Errorf("ErrorCode: %d", res.Code) + d.Logger().Errorf("Discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", + uri, c.Env, ins.AppID, ins.Addrs, res.Code) + } + + d.Logger().Infof( + "Discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success\n", + uri, c.Env, ins.AppID, ins.Addrs, + ) + + return +} + +func (d *Discovery) Deregister(ctx context.Context, service *registry.ServiceInstance) error { + ins := fromServerInstance(service, d.config) + return d.cancel(ins) +} diff --git a/examples/go.mod b/examples/go.mod index fdabf685e..9894fc0d5 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -11,6 +11,7 @@ require ( github.com/go-kratos/kratos/contrib/config/apollo/v2 v2.0.0-20210901080230-515b71ec9061 github.com/go-kratos/kratos/contrib/metrics/prometheus/v2 v2.0.0-00010101000000-000000000000 github.com/go-kratos/kratos/contrib/registry/consul/v2 v2.0.0-00010101000000-000000000000 + github.com/go-kratos/kratos/contrib/registry/discovery/v2 v2.0.0-00010101000000-000000000000 github.com/go-kratos/kratos/contrib/registry/etcd/v2 v2.0.0-00010101000000-000000000000 github.com/go-kratos/kratos/contrib/registry/nacos/v2 v2.0.0-00010101000000-000000000000 github.com/go-kratos/kratos/contrib/registry/zookeeper/v2 v2.0.0-00010101000000-000000000000 @@ -49,6 +50,7 @@ replace ( github.com/go-kratos/kratos/contrib/config/apollo/v2 => ../contrib/config/apollo github.com/go-kratos/kratos/contrib/metrics/prometheus/v2 => ../contrib/metrics/prometheus github.com/go-kratos/kratos/contrib/registry/consul/v2 => ../contrib/registry/consul + github.com/go-kratos/kratos/contrib/registry/discovery/v2 => ../contrib/registry/discovery github.com/go-kratos/kratos/contrib/registry/etcd/v2 => ../contrib/registry/etcd github.com/go-kratos/kratos/contrib/registry/nacos/v2 => ../contrib/registry/nacos github.com/go-kratos/kratos/contrib/registry/zookeeper/v2 => ../contrib/registry/zookeeper diff --git a/examples/go.sum b/examples/go.sum index d80652064..742d10b3e 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -194,6 +194,8 @@ github.com/go-redis/redis/v8 v8.3.2/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeY github.com/go-redis/redis/v8 v8.5.0/go.mod h1:YmEcgBDttjnkbMzDAhDtQxY9yVA7jMN6PCR5HeMvqFE= github.com/go-redis/redis/v8 v8.11.2 h1:WqlSpAwz8mxDSMCvbyz1Mkiqe0LE5OY4j3lgkvu1Ts0= github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= +github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKYS4= +github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.1-0.20200311113236-681ffa848bae/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= diff --git a/examples/registry/discovery/client/main.go b/examples/registry/discovery/client/main.go new file mode 100644 index 000000000..fbfab54b8 --- /dev/null +++ b/examples/registry/discovery/client/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "log" + "time" + + srcgrpc "google.golang.org/grpc" + + "github.com/go-kratos/kratos/contrib/registry/discovery/v2" + "github.com/go-kratos/kratos/examples/helloworld/helloworld" + "github.com/go-kratos/kratos/v2/transport/grpc" +) + +func main() { + r := discovery.New(&discovery.Config{ + Nodes: []string{"0.0.0.0:7171"}, + Env: "dev", + Region: "sh1", + Zone: "zone1", + Host: "localhost", + }, nil) + + connGRPC, err := grpc.DialInsecure( + context.Background(), + grpc.WithEndpoint("discovery:///helloworld"), + grpc.WithDiscovery(r), + ) + if err != nil { + log.Fatal(err) + } + defer connGRPC.Close() + + //connHTTP, err := http.NewClient( + // context.Background(), + // http.WithEndpoint("discovery:///helloworld"), + // http.WithDiscovery(r), + // http.WithBlock(), + //) + //if err != nil { + // log.Fatal(err) + //} + //defer connHTTP.Close() + + for { + callGRPC(connGRPC) + time.Sleep(time.Second) + } +} + +func callGRPC(conn *srcgrpc.ClientConn) { + client := helloworld.NewGreeterClient(conn) + reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"}) + if err != nil { + log.Fatal(err) + } + log.Printf("[grpc] SayHello %+v\n", reply) +} diff --git a/examples/registry/discovery/server/main.go b/examples/registry/discovery/server/main.go new file mode 100644 index 000000000..f54d47795 --- /dev/null +++ b/examples/registry/discovery/server/main.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "fmt" + "os" + + "github.com/go-kratos/kratos/contrib/registry/discovery/v2" + "github.com/go-kratos/kratos/examples/helloworld/helloworld" + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/middleware/logging" + "github.com/go-kratos/kratos/v2/middleware/recovery" + "github.com/go-kratos/kratos/v2/transport/grpc" + "github.com/go-kratos/kratos/v2/transport/http" +) + +// server is used to implement helloworld.GreeterServer. +type server struct { + helloworld.UnimplementedGreeterServer +} + +// SayHello implements helloworld.GreeterServer +func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) { + return &helloworld.HelloReply{Message: fmt.Sprintf("Welcome %+v!", in.Name)}, nil +} + +func main() { + logger := log.NewStdLogger(os.Stdout) + logger = log.With(logger, "service", "example.registry.discovery") + + r := discovery.New(&discovery.Config{ + Nodes: []string{"0.0.0.0:7171"}, + Env: "dev", + Region: "sh1", + Zone: "zone1", + Host: "localhost", + }, logger) + + httpSrv := http.NewServer( + http.Address(":8000"), + http.Middleware( + recovery.Recovery(), + ), + ) + grpcSrv := grpc.NewServer( + grpc.Address(":9000"), + grpc.Middleware( + recovery.Recovery(), + logging.Server(logger), + ), + ) + + s := &server{} + helloworld.RegisterGreeterServer(grpcSrv, s) + helloworld.RegisterGreeterHTTPServer(httpSrv, s) + + app := kratos.New( + kratos.Name("helloworld"), + kratos.Server( + httpSrv, + grpcSrv, + ), + kratos.Metadata(map[string]string{"color": "gray"}), + kratos.Registrar(r), + ) + if err := app.Run(); err != nil { + log.NewHelper(logger).Fatal(err) + } +}