mirror of
https://github.com/go-micro/go-micro.git
synced 2025-08-10 21:52:01 +02:00
Add a selection of plugins to the core repo (#2755)
* WIP * fix: default memory registry, add registrations for mdns, nats * fix: same for broker * fix: add more * fix: http port * rename redis * chore: linting
This commit is contained in:
460
registry/consul/consul.go
Normal file
460
registry/consul/consul.go
Normal file
@@ -0,0 +1,460 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
hash "github.com/mitchellh/hashstructure"
|
||||
"go-micro.dev/v5/registry"
|
||||
mnet "go-micro.dev/v5/util/net"
|
||||
)
|
||||
|
||||
type consulRegistry struct {
|
||||
Address []string
|
||||
opts registry.Options
|
||||
|
||||
client *consul.Client
|
||||
config *consul.Config
|
||||
|
||||
// connect enabled
|
||||
connect bool
|
||||
|
||||
queryOptions *consul.QueryOptions
|
||||
|
||||
sync.Mutex
|
||||
register map[string]uint64
|
||||
// lastChecked tracks when a node was last checked as existing in Consul
|
||||
lastChecked map[string]time.Time
|
||||
}
|
||||
|
||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
||||
// splay slightly for the watcher?
|
||||
splay := time.Second * 5
|
||||
deregTTL := t + splay
|
||||
|
||||
// consul has a minimum timeout on deregistration of 1 minute.
|
||||
if t < time.Minute {
|
||||
deregTTL = time.Minute + splay
|
||||
}
|
||||
|
||||
return deregTTL
|
||||
}
|
||||
|
||||
func newTransport(config *tls.Config) *http.Transport {
|
||||
if config == nil {
|
||||
config = &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
}
|
||||
|
||||
t := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
TLSClientConfig: config,
|
||||
}
|
||||
runtime.SetFinalizer(&t, func(tr **http.Transport) {
|
||||
(*tr).CloseIdleConnections()
|
||||
})
|
||||
return t
|
||||
}
|
||||
|
||||
func configure(c *consulRegistry, opts ...registry.Option) {
|
||||
// set opts
|
||||
for _, o := range opts {
|
||||
o(&c.opts)
|
||||
}
|
||||
|
||||
// use default non pooled config
|
||||
config := consul.DefaultNonPooledConfig()
|
||||
|
||||
if c.opts.Context != nil {
|
||||
// Use the consul config passed in the options, if available
|
||||
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
|
||||
config = co
|
||||
}
|
||||
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
|
||||
c.connect = cn
|
||||
}
|
||||
|
||||
// Use the consul query options passed in the options, if available
|
||||
if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil {
|
||||
c.queryOptions = qo
|
||||
}
|
||||
if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok {
|
||||
c.queryOptions.AllowStale = as
|
||||
}
|
||||
}
|
||||
|
||||
// check if there are any addrs
|
||||
var addrs []string
|
||||
|
||||
// iterate the options addresses
|
||||
for _, address := range c.opts.Addrs {
|
||||
// check we have a port
|
||||
addr, port, err := net.SplitHostPort(address)
|
||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
||||
port = "8500"
|
||||
addr = address
|
||||
addrs = append(addrs, net.JoinHostPort(addr, port))
|
||||
} else if err == nil {
|
||||
addrs = append(addrs, net.JoinHostPort(addr, port))
|
||||
}
|
||||
}
|
||||
|
||||
// set the addrs
|
||||
if len(addrs) > 0 {
|
||||
c.Address = addrs
|
||||
config.Address = c.Address[0]
|
||||
}
|
||||
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = new(http.Client)
|
||||
}
|
||||
|
||||
// requires secure connection?
|
||||
if c.opts.Secure || c.opts.TLSConfig != nil {
|
||||
config.Scheme = "https"
|
||||
// We're going to support InsecureSkipVerify
|
||||
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
|
||||
}
|
||||
|
||||
// set timeout
|
||||
if c.opts.Timeout > 0 {
|
||||
config.HttpClient.Timeout = c.opts.Timeout
|
||||
}
|
||||
|
||||
// set the config
|
||||
c.config = config
|
||||
|
||||
// remove client
|
||||
c.client = nil
|
||||
|
||||
// setup the client
|
||||
c.Client()
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Init(opts ...registry.Option) error {
|
||||
configure(c, opts...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Deregister(s *registry.Service, opts ...registry.DeregisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("require at least one node")
|
||||
}
|
||||
|
||||
// delete our hash and time check of the service
|
||||
c.Lock()
|
||||
delete(c.register, s.Name)
|
||||
delete(c.lastChecked, s.Name)
|
||||
c.Unlock()
|
||||
|
||||
node := s.Nodes[0]
|
||||
return c.Client().Agent().ServiceDeregister(node.Id)
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("require at least one node")
|
||||
}
|
||||
|
||||
var regTCPCheck bool
|
||||
var regInterval time.Duration
|
||||
var regHTTPCheck bool
|
||||
var httpCheckConfig consul.AgentServiceCheck
|
||||
|
||||
var options registry.RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
if c.opts.Context != nil {
|
||||
if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok {
|
||||
regTCPCheck = true
|
||||
regInterval = tcpCheckInterval
|
||||
}
|
||||
var ok bool
|
||||
if httpCheckConfig, ok = c.opts.Context.Value("consul_http_check_config").(consul.AgentServiceCheck); ok {
|
||||
regHTTPCheck = true
|
||||
}
|
||||
}
|
||||
|
||||
// create hash of service; uint64
|
||||
h, err := hash.Hash(s, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// use first node
|
||||
node := s.Nodes[0]
|
||||
|
||||
// get existing hash and last checked time
|
||||
c.Lock()
|
||||
v, ok := c.register[s.Name]
|
||||
lastChecked := c.lastChecked[s.Name]
|
||||
c.Unlock()
|
||||
|
||||
// if it's already registered and matches then just pass the check
|
||||
if ok && v == h {
|
||||
if options.TTL == time.Duration(0) {
|
||||
// ensure that our service hasn't been deregistered by Consul
|
||||
if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
|
||||
return nil
|
||||
}
|
||||
services, _, err := c.Client().Health().Checks(s.Name, c.queryOptions)
|
||||
if err == nil {
|
||||
for _, v := range services {
|
||||
if v.ServiceID == node.Id {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if the err is nil we're all good, bail out
|
||||
// if not, we don't know what the state is, so full re-register
|
||||
if err := c.Client().Agent().PassTTL("service:"+node.Id, ""); err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// encode the tags
|
||||
tags := encodeMetadata(node.Metadata)
|
||||
tags = append(tags, encodeEndpoints(s.Endpoints)...)
|
||||
tags = append(tags, encodeVersion(s.Version)...)
|
||||
|
||||
var check *consul.AgentServiceCheck
|
||||
|
||||
if regTCPCheck {
|
||||
deregTTL := getDeregisterTTL(regInterval)
|
||||
|
||||
check = &consul.AgentServiceCheck{
|
||||
TCP: node.Address,
|
||||
Interval: fmt.Sprintf("%v", regInterval),
|
||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
||||
}
|
||||
|
||||
} else if regHTTPCheck {
|
||||
interval, _ := time.ParseDuration(httpCheckConfig.Interval)
|
||||
deregTTL := getDeregisterTTL(interval)
|
||||
|
||||
host, _, _ := net.SplitHostPort(node.Address)
|
||||
healthCheckURI := strings.Replace(httpCheckConfig.HTTP, "{host}", host, 1)
|
||||
|
||||
check = &consul.AgentServiceCheck{
|
||||
HTTP: healthCheckURI,
|
||||
Interval: httpCheckConfig.Interval,
|
||||
Timeout: httpCheckConfig.Timeout,
|
||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
||||
}
|
||||
|
||||
// if the TTL is greater than 0 create an associated check
|
||||
} else if options.TTL > time.Duration(0) {
|
||||
deregTTL := getDeregisterTTL(options.TTL)
|
||||
|
||||
check = &consul.AgentServiceCheck{
|
||||
TTL: fmt.Sprintf("%v", options.TTL),
|
||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
||||
}
|
||||
}
|
||||
|
||||
host, pt, _ := net.SplitHostPort(node.Address)
|
||||
if host == "" {
|
||||
host = node.Address
|
||||
}
|
||||
port, _ := strconv.Atoi(pt)
|
||||
|
||||
// register the service
|
||||
asr := &consul.AgentServiceRegistration{
|
||||
ID: node.Id,
|
||||
Name: s.Name,
|
||||
Tags: tags,
|
||||
Port: port,
|
||||
Address: host,
|
||||
Meta: node.Metadata,
|
||||
Check: check,
|
||||
}
|
||||
|
||||
// Specify consul connect
|
||||
if c.connect {
|
||||
asr.Connect = &consul.AgentServiceConnect{
|
||||
Native: true,
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.Client().Agent().ServiceRegister(asr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// save our hash and time check of the service
|
||||
c.Lock()
|
||||
c.register[s.Name] = h
|
||||
c.lastChecked[s.Name] = time.Now()
|
||||
c.Unlock()
|
||||
|
||||
// if the TTL is 0 we don't mess with the checks
|
||||
if options.TTL == time.Duration(0) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// pass the healthcheck
|
||||
return c.Client().Agent().PassTTL("service:"+node.Id, "")
|
||||
}
|
||||
|
||||
func (c *consulRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
||||
var rsp []*consul.ServiceEntry
|
||||
var err error
|
||||
|
||||
// if we're connect enabled only get connect services
|
||||
if c.connect {
|
||||
rsp, _, err = c.Client().Health().Connect(name, "", false, c.queryOptions)
|
||||
} else {
|
||||
rsp, _, err = c.Client().Health().Service(name, "", false, c.queryOptions)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceMap := map[string]*registry.Service{}
|
||||
|
||||
for _, s := range rsp {
|
||||
if s.Service.Service != name {
|
||||
continue
|
||||
}
|
||||
|
||||
// version is now a tag
|
||||
version, _ := decodeVersion(s.Service.Tags)
|
||||
// service ID is now the node id
|
||||
id := s.Service.ID
|
||||
// key is always the version
|
||||
key := version
|
||||
|
||||
// address is service address
|
||||
address := s.Service.Address
|
||||
|
||||
// use node address
|
||||
if len(address) == 0 {
|
||||
address = s.Node.Address
|
||||
}
|
||||
|
||||
svc, ok := serviceMap[key]
|
||||
if !ok {
|
||||
svc = ®istry.Service{
|
||||
Endpoints: decodeEndpoints(s.Service.Tags),
|
||||
Name: s.Service.Service,
|
||||
Version: version,
|
||||
}
|
||||
serviceMap[key] = svc
|
||||
}
|
||||
|
||||
var del bool
|
||||
|
||||
for _, check := range s.Checks {
|
||||
// delete the node if the status is critical
|
||||
if check.Status == "critical" {
|
||||
del = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// if delete then skip the node
|
||||
if del {
|
||||
continue
|
||||
}
|
||||
|
||||
svc.Nodes = append(svc.Nodes, ®istry.Node{
|
||||
Id: id,
|
||||
Address: mnet.HostPort(address, s.Service.Port),
|
||||
Metadata: decodeMetadata(s.Service.Tags),
|
||||
})
|
||||
}
|
||||
|
||||
var services []*registry.Service
|
||||
for _, service := range serviceMap {
|
||||
services = append(services, service)
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (c *consulRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
|
||||
rsp, _, err := c.Client().Catalog().Services(c.queryOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var services []*registry.Service
|
||||
|
||||
for service := range rsp {
|
||||
services = append(services, ®istry.Service{Name: service})
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
return newConsulWatcher(c, opts...)
|
||||
}
|
||||
|
||||
func (c *consulRegistry) String() string {
|
||||
return "consul"
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Options() registry.Options {
|
||||
return c.opts
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Client() *consul.Client {
|
||||
if c.client != nil {
|
||||
return c.client
|
||||
}
|
||||
|
||||
for _, addr := range c.Address {
|
||||
// set the address
|
||||
c.config.Address = addr
|
||||
|
||||
// create a new client
|
||||
tmpClient, _ := consul.NewClient(c.config)
|
||||
|
||||
// test the client
|
||||
_, err := tmpClient.Agent().Host()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// set the client
|
||||
c.client = tmpClient
|
||||
return c.client
|
||||
}
|
||||
|
||||
// set the default
|
||||
c.client, _ = consul.NewClient(c.config)
|
||||
|
||||
// return the client
|
||||
return c.client
|
||||
}
|
||||
|
||||
func NewConsulRegistry(opts ...registry.Option) registry.Registry {
|
||||
cr := &consulRegistry{
|
||||
opts: registry.Options{},
|
||||
register: make(map[string]uint64),
|
||||
lastChecked: make(map[string]time.Time),
|
||||
queryOptions: &consul.QueryOptions{
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
configure(cr, opts...)
|
||||
return cr
|
||||
}
|
171
registry/consul/encoding.go
Normal file
171
registry/consul/encoding.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/zlib"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
func encode(buf []byte) string {
|
||||
var b bytes.Buffer
|
||||
defer b.Reset()
|
||||
|
||||
w := zlib.NewWriter(&b)
|
||||
if _, err := w.Write(buf); err != nil {
|
||||
return ""
|
||||
}
|
||||
w.Close()
|
||||
|
||||
return hex.EncodeToString(b.Bytes())
|
||||
}
|
||||
|
||||
func decode(d string) []byte {
|
||||
hr, err := hex.DecodeString(d)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
br := bytes.NewReader(hr)
|
||||
zr, err := zlib.NewReader(br)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
rbuf, err := io.ReadAll(zr)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
zr.Close()
|
||||
|
||||
return rbuf
|
||||
}
|
||||
|
||||
func encodeEndpoints(en []*registry.Endpoint) []string {
|
||||
var tags []string
|
||||
for _, e := range en {
|
||||
if b, err := json.Marshal(e); err == nil {
|
||||
tags = append(tags, "e-"+encode(b))
|
||||
}
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
func decodeEndpoints(tags []string) []*registry.Endpoint {
|
||||
var en []*registry.Endpoint
|
||||
|
||||
// use the first format you find
|
||||
var ver byte
|
||||
|
||||
for _, tag := range tags {
|
||||
if len(tag) == 0 || tag[0] != 'e' {
|
||||
continue
|
||||
}
|
||||
|
||||
// check version
|
||||
if ver > 0 && tag[1] != ver {
|
||||
continue
|
||||
}
|
||||
|
||||
var e *registry.Endpoint
|
||||
var buf []byte
|
||||
|
||||
// Old encoding was plain
|
||||
if tag[1] == '=' {
|
||||
buf = []byte(tag[2:])
|
||||
}
|
||||
|
||||
// New encoding is hex
|
||||
if tag[1] == '-' {
|
||||
buf = decode(tag[2:])
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(buf, &e); err == nil {
|
||||
en = append(en, e)
|
||||
}
|
||||
|
||||
// set version
|
||||
ver = tag[1]
|
||||
}
|
||||
return en
|
||||
}
|
||||
|
||||
func encodeMetadata(md map[string]string) []string {
|
||||
var tags []string
|
||||
for k, v := range md {
|
||||
if b, err := json.Marshal(map[string]string{
|
||||
k: v,
|
||||
}); err == nil {
|
||||
// new encoding
|
||||
tags = append(tags, "t-"+encode(b))
|
||||
}
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
func decodeMetadata(tags []string) map[string]string {
|
||||
md := make(map[string]string)
|
||||
|
||||
var ver byte
|
||||
|
||||
for _, tag := range tags {
|
||||
if len(tag) == 0 || tag[0] != 't' {
|
||||
continue
|
||||
}
|
||||
|
||||
// check version
|
||||
if ver > 0 && tag[1] != ver {
|
||||
continue
|
||||
}
|
||||
|
||||
var kv map[string]string
|
||||
var buf []byte
|
||||
|
||||
// Old encoding was plain
|
||||
if tag[1] == '=' {
|
||||
buf = []byte(tag[2:])
|
||||
}
|
||||
|
||||
// New encoding is hex
|
||||
if tag[1] == '-' {
|
||||
buf = decode(tag[2:])
|
||||
}
|
||||
|
||||
// Now unmarshal
|
||||
if err := json.Unmarshal(buf, &kv); err == nil {
|
||||
for k, v := range kv {
|
||||
md[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// set version
|
||||
ver = tag[1]
|
||||
}
|
||||
return md
|
||||
}
|
||||
|
||||
func encodeVersion(v string) []string {
|
||||
return []string{"v-" + encode([]byte(v))}
|
||||
}
|
||||
|
||||
func decodeVersion(tags []string) (string, bool) {
|
||||
for _, tag := range tags {
|
||||
if len(tag) < 2 || tag[0] != 'v' {
|
||||
continue
|
||||
}
|
||||
|
||||
// Old encoding was plain
|
||||
if tag[1] == '=' {
|
||||
return tag[2:], true
|
||||
}
|
||||
|
||||
// New encoding is hex
|
||||
if tag[1] == '-' {
|
||||
return string(decode(tag[2:])), true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
147
registry/consul/encoding_test.go
Normal file
147
registry/consul/encoding_test.go
Normal file
@@ -0,0 +1,147 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
func TestEncodingEndpoints(t *testing.T) {
|
||||
eps := []*registry.Endpoint{
|
||||
{
|
||||
Name: "endpoint1",
|
||||
Request: ®istry.Value{
|
||||
Name: "request",
|
||||
Type: "request",
|
||||
},
|
||||
Response: ®istry.Value{
|
||||
Name: "response",
|
||||
Type: "response",
|
||||
},
|
||||
Metadata: map[string]string{
|
||||
"foo1": "bar1",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "endpoint2",
|
||||
Request: ®istry.Value{
|
||||
Name: "request",
|
||||
Type: "request",
|
||||
},
|
||||
Response: ®istry.Value{
|
||||
Name: "response",
|
||||
Type: "response",
|
||||
},
|
||||
Metadata: map[string]string{
|
||||
"foo2": "bar2",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "endpoint3",
|
||||
Request: ®istry.Value{
|
||||
Name: "request",
|
||||
Type: "request",
|
||||
},
|
||||
Response: ®istry.Value{
|
||||
Name: "response",
|
||||
Type: "response",
|
||||
},
|
||||
Metadata: map[string]string{
|
||||
"foo3": "bar3",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testEp := func(ep *registry.Endpoint, enc string) {
|
||||
// encode endpoint
|
||||
e := encodeEndpoints([]*registry.Endpoint{ep})
|
||||
|
||||
// check there are two tags; old and new
|
||||
if len(e) != 1 {
|
||||
t.Fatalf("Expected 1 encoded tags, got %v", e)
|
||||
}
|
||||
|
||||
// check old encoding
|
||||
var seen bool
|
||||
|
||||
for _, en := range e {
|
||||
if en == enc {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !seen {
|
||||
t.Fatalf("Expected %s but not found", enc)
|
||||
}
|
||||
|
||||
// decode
|
||||
d := decodeEndpoints([]string{enc})
|
||||
if len(d) == 0 {
|
||||
t.Fatalf("Expected %v got %v", ep, d)
|
||||
}
|
||||
|
||||
// check name
|
||||
if d[0].Name != ep.Name {
|
||||
t.Fatalf("Expected ep %s got %s", ep.Name, d[0].Name)
|
||||
}
|
||||
|
||||
// check all the metadata exists
|
||||
for k, v := range ep.Metadata {
|
||||
if gv := d[0].Metadata[k]; gv != v {
|
||||
t.Fatalf("Expected key %s val %s got val %s", k, v, gv)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, ep := range eps {
|
||||
// JSON encoded
|
||||
jencoded, err := json.Marshal(ep)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// HEX encoded
|
||||
hencoded := encode(jencoded)
|
||||
// endpoint tag
|
||||
hepTag := "e-" + hencoded
|
||||
testEp(ep, hepTag)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodingVersion(t *testing.T) {
|
||||
testData := []struct {
|
||||
decoded string
|
||||
encoded string
|
||||
}{
|
||||
{"1.0.0", "v-789c32d433d03300040000ffff02ce00ee"},
|
||||
{"latest", "v-789cca492c492d2e01040000ffff08cc028e"},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
e := encodeVersion(data.decoded)
|
||||
|
||||
if e[0] != data.encoded {
|
||||
t.Fatalf("Expected %s got %s", data.encoded, e)
|
||||
}
|
||||
|
||||
d, ok := decodeVersion(e)
|
||||
if !ok {
|
||||
t.Fatalf("Unexpected %t for %s", ok, data.encoded)
|
||||
}
|
||||
|
||||
if d != data.decoded {
|
||||
t.Fatalf("Expected %s got %s", data.decoded, d)
|
||||
}
|
||||
|
||||
d, ok = decodeVersion([]string{data.encoded})
|
||||
if !ok {
|
||||
t.Fatalf("Unexpected %t for %s", ok, data.encoded)
|
||||
}
|
||||
|
||||
if d != data.decoded {
|
||||
t.Fatalf("Expected %s got %s", data.decoded, d)
|
||||
}
|
||||
}
|
||||
}
|
111
registry/consul/options.go
Normal file
111
registry/consul/options.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
// Define a custom type for context keys to avoid collisions.
|
||||
type contextKey string
|
||||
|
||||
const consulConnectKey contextKey = "consul_connect"
|
||||
const consulConfigKey contextKey = "consul_config"
|
||||
const consulAllowStaleKey contextKey = "consul_allow_stale"
|
||||
const consulQueryOptionsKey contextKey = "consul_query_options"
|
||||
const consulTCPCheckKey contextKey = "consul_tcp_check"
|
||||
const consulHTTPCheckConfigKey contextKey = "consul_http_check_config"
|
||||
|
||||
// Connect specifies services should be registered as Consul Connect services.
|
||||
func Connect() registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, consulConnectKey, true)
|
||||
}
|
||||
}
|
||||
|
||||
func Config(c *consul.Config) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, consulConfigKey, c)
|
||||
}
|
||||
}
|
||||
|
||||
// AllowStale sets whether any Consul server (non-leader) can service
|
||||
// a read. This allows for lower latency and higher throughput
|
||||
// at the cost of potentially stale data.
|
||||
// Works similar to Consul DNS Config option [1].
|
||||
// Defaults to true.
|
||||
//
|
||||
// [1] https://www.consul.io/docs/agent/options.html#allow_stale
|
||||
func AllowStale(v bool) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, consulAllowStaleKey, v)
|
||||
}
|
||||
}
|
||||
|
||||
// QueryOptions specifies the QueryOptions to be used when calling
|
||||
// Consul. See `Consul API` for more information [1].
|
||||
//
|
||||
// [1] https://godoc.org/github.com/hashicorp/consul/api#QueryOptions
|
||||
func QueryOptions(q *consul.QueryOptions) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, consulQueryOptionsKey, q)
|
||||
}
|
||||
}
|
||||
|
||||
// TCPCheck will tell the service provider to check the service address
|
||||
// and port every `t` interval. It will enabled only if `t` is greater than 0.
|
||||
// See `TCP + Interval` for more information [1].
|
||||
//
|
||||
// [1] https://www.consul.io/docs/agent/checks.html
|
||||
func TCPCheck(t time.Duration) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if t <= time.Duration(0) {
|
||||
return
|
||||
}
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, consulTCPCheckKey, t)
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPCheck will tell the service provider to invoke the health check endpoint
|
||||
// with an interval and timeout. It will be enabled only if interval and
|
||||
// timeout are greater than 0.
|
||||
// See `HTTP + Interval` for more information [1].
|
||||
//
|
||||
// [1] https://www.consul.io/docs/agent/checks.html
|
||||
func HTTPCheck(protocol, port, httpEndpoint string, interval, timeout time.Duration) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if interval <= time.Duration(0) || timeout <= time.Duration(0) {
|
||||
return
|
||||
}
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
check := consul.AgentServiceCheck{
|
||||
HTTP: fmt.Sprintf("%s://{host}:%s%s", protocol, port, httpEndpoint),
|
||||
Interval: fmt.Sprintf("%v", interval),
|
||||
Timeout: fmt.Sprintf("%v", timeout),
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, consulHTTPCheckConfigKey, check)
|
||||
}
|
||||
}
|
208
registry/consul/registry_test.go
Normal file
208
registry/consul/registry_test.go
Normal file
@@ -0,0 +1,208 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
type mockRegistry struct {
|
||||
body []byte
|
||||
status int
|
||||
err error
|
||||
url string
|
||||
}
|
||||
|
||||
func encodeData(obj interface{}) ([]byte, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
enc := json.NewEncoder(buf)
|
||||
if err := enc.Encode(obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func newMockServer(rg *mockRegistry, l net.Listener) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(rg.url, func(w http.ResponseWriter, r *http.Request) {
|
||||
if rg.err != nil {
|
||||
http.Error(w, rg.err.Error(), 500)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(rg.status)
|
||||
w.Write(rg.body)
|
||||
})
|
||||
return http.Serve(l, mux)
|
||||
}
|
||||
|
||||
func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
|
||||
l, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
// blurgh?!!
|
||||
panic(err.Error())
|
||||
}
|
||||
cfg := consul.DefaultConfig()
|
||||
cfg.Address = l.Addr().String()
|
||||
|
||||
go newMockServer(r, l)
|
||||
|
||||
var cr = &consulRegistry{
|
||||
config: cfg,
|
||||
Address: []string{cfg.Address},
|
||||
opts: registry.Options{},
|
||||
register: make(map[string]uint64),
|
||||
lastChecked: make(map[string]time.Time),
|
||||
queryOptions: &consul.QueryOptions{
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
cr.Client()
|
||||
|
||||
return cr, func() {
|
||||
l.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func newServiceList(svc []*consul.ServiceEntry) []byte {
|
||||
bts, _ := encodeData(svc)
|
||||
return bts
|
||||
}
|
||||
|
||||
func TestConsul_GetService_WithError(t *testing.T) {
|
||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
||||
err: errors.New("client-error"),
|
||||
url: "/v1/health/service/service-name",
|
||||
})
|
||||
defer cl()
|
||||
|
||||
if _, err := cr.GetService("test-service"); err == nil {
|
||||
t.Fatalf("Expected error not to be `nil`")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsul_GetService_WithHealthyServiceNodes(t *testing.T) {
|
||||
// warning is still seen as healthy, critical is not
|
||||
svcs := []*consul.ServiceEntry{
|
||||
newServiceEntry(
|
||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
||||
newHealthCheck("node-name-1", "service-name", "warning"),
|
||||
},
|
||||
),
|
||||
newServiceEntry(
|
||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
||||
newHealthCheck("node-name-2", "service-name", "warning"),
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
||||
status: 200,
|
||||
body: newServiceList(svcs),
|
||||
url: "/v1/health/service/service-name",
|
||||
})
|
||||
defer cl()
|
||||
|
||||
svc, err := cr.GetService("service-name")
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error", err)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
|
||||
if exp, act := 2, len(svc[0].Nodes); exp != act {
|
||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsul_GetService_WithUnhealthyServiceNode(t *testing.T) {
|
||||
// warning is still seen as healthy, critical is not
|
||||
svcs := []*consul.ServiceEntry{
|
||||
newServiceEntry(
|
||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
||||
newHealthCheck("node-name-1", "service-name", "warning"),
|
||||
},
|
||||
),
|
||||
newServiceEntry(
|
||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
||||
newHealthCheck("node-name-2", "service-name", "critical"),
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
||||
status: 200,
|
||||
body: newServiceList(svcs),
|
||||
url: "/v1/health/service/service-name",
|
||||
})
|
||||
defer cl()
|
||||
|
||||
svc, err := cr.GetService("service-name")
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error", err)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc[0].Nodes); exp != act {
|
||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsul_GetService_WithUnhealthyServiceNodes(t *testing.T) {
|
||||
// warning is still seen as healthy, critical is not
|
||||
svcs := []*consul.ServiceEntry{
|
||||
newServiceEntry(
|
||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
||||
newHealthCheck("node-name-1", "service-name", "critical"),
|
||||
},
|
||||
),
|
||||
newServiceEntry(
|
||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
||||
newHealthCheck("node-name-2", "service-name", "critical"),
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
||||
status: 200,
|
||||
body: newServiceList(svcs),
|
||||
url: "/v1/health/service/service-name",
|
||||
})
|
||||
defer cl()
|
||||
|
||||
svc, err := cr.GetService("service-name")
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error", err)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
|
||||
if exp, act := 0, len(svc[0].Nodes); exp != act {
|
||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
}
|
299
registry/consul/watcher.go
Normal file
299
registry/consul/watcher.go
Normal file
@@ -0,0 +1,299 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/api/watch"
|
||||
"go-micro.dev/v5/registry"
|
||||
regutil "go-micro.dev/v5/util/registry"
|
||||
)
|
||||
|
||||
type consulWatcher struct {
|
||||
r *consulRegistry
|
||||
wo registry.WatchOptions
|
||||
wp *watch.Plan
|
||||
watchers map[string]*watch.Plan
|
||||
|
||||
next chan *registry.Result
|
||||
exit chan bool
|
||||
|
||||
sync.RWMutex
|
||||
services map[string][]*registry.Service
|
||||
}
|
||||
|
||||
func newConsulWatcher(cr *consulRegistry, opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
cw := &consulWatcher{
|
||||
r: cr,
|
||||
wo: wo,
|
||||
exit: make(chan bool),
|
||||
next: make(chan *registry.Result, 10),
|
||||
watchers: make(map[string]*watch.Plan),
|
||||
services: make(map[string][]*registry.Service),
|
||||
}
|
||||
|
||||
wp, err := watch.Parse(map[string]interface{}{
|
||||
"service": wo.Service,
|
||||
"type": "service",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wp.Handler = cw.serviceHandler
|
||||
go wp.RunWithClientAndHclog(cr.Client(), wp.Logger)
|
||||
cw.wp = wp
|
||||
|
||||
return cw, nil
|
||||
}
|
||||
|
||||
func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
|
||||
entries, ok := data.([]*api.ServiceEntry)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
serviceMap := map[string]*registry.Service{}
|
||||
serviceName := ""
|
||||
|
||||
for _, e := range entries {
|
||||
serviceName = e.Service.Service
|
||||
// version is now a tag
|
||||
version, _ := decodeVersion(e.Service.Tags)
|
||||
// service ID is now the node id
|
||||
id := e.Service.ID
|
||||
// key is always the version
|
||||
key := version
|
||||
// address is service address
|
||||
address := e.Service.Address
|
||||
|
||||
// use node address
|
||||
if len(address) == 0 {
|
||||
address = e.Node.Address
|
||||
}
|
||||
|
||||
svc, ok := serviceMap[key]
|
||||
if !ok {
|
||||
svc = ®istry.Service{
|
||||
Endpoints: decodeEndpoints(e.Service.Tags),
|
||||
Name: e.Service.Service,
|
||||
Version: version,
|
||||
}
|
||||
serviceMap[key] = svc
|
||||
}
|
||||
|
||||
var del bool
|
||||
|
||||
for _, check := range e.Checks {
|
||||
// delete the node if the status is critical
|
||||
if check.Status == "critical" {
|
||||
del = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// if delete then skip the node
|
||||
if del {
|
||||
continue
|
||||
}
|
||||
|
||||
svc.Nodes = append(svc.Nodes, ®istry.Node{
|
||||
Id: id,
|
||||
Address: net.JoinHostPort(address, fmt.Sprint(e.Service.Port)),
|
||||
Metadata: decodeMetadata(e.Service.Tags),
|
||||
})
|
||||
}
|
||||
|
||||
cw.RLock()
|
||||
// make a copy
|
||||
rservices := make(map[string][]*registry.Service)
|
||||
for k, v := range cw.services {
|
||||
rservices[k] = v
|
||||
}
|
||||
cw.RUnlock()
|
||||
|
||||
var newServices []*registry.Service
|
||||
|
||||
// serviceMap is the new set of services keyed by name+version
|
||||
for _, newService := range serviceMap {
|
||||
// append to the new set of cached services
|
||||
newServices = append(newServices, newService)
|
||||
|
||||
// check if the service exists in the existing cache
|
||||
oldServices, ok := rservices[serviceName]
|
||||
if !ok {
|
||||
// does not exist? then we're creating brand new entries
|
||||
cw.next <- ®istry.Result{Action: "create", Service: newService}
|
||||
continue
|
||||
}
|
||||
|
||||
// service exists. ok let's figure out what to update and delete version wise
|
||||
action := "create"
|
||||
|
||||
for _, oldService := range oldServices {
|
||||
// does this version exist?
|
||||
// no? then default to create
|
||||
if oldService.Version != newService.Version {
|
||||
continue
|
||||
}
|
||||
|
||||
// yes? then it's an update
|
||||
action = "update"
|
||||
|
||||
var nodes []*registry.Node
|
||||
// check the old nodes to see if they've been deleted
|
||||
for _, oldNode := range oldService.Nodes {
|
||||
var seen bool
|
||||
for _, newNode := range newService.Nodes {
|
||||
if newNode.Id == oldNode.Id {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
// does the old node exist in the new set of nodes
|
||||
// no? then delete that shit
|
||||
if !seen {
|
||||
nodes = append(nodes, oldNode)
|
||||
}
|
||||
}
|
||||
|
||||
// it's an update rather than creation
|
||||
if len(nodes) > 0 {
|
||||
delService := regutil.CopyService(oldService)
|
||||
delService.Nodes = nodes
|
||||
cw.next <- ®istry.Result{Action: "delete", Service: delService}
|
||||
}
|
||||
}
|
||||
|
||||
cw.next <- ®istry.Result{Action: action, Service: newService}
|
||||
}
|
||||
|
||||
// Now check old versions that may not be in new services map
|
||||
for _, old := range rservices[serviceName] {
|
||||
// old version does not exist in new version map
|
||||
// kill it with fire!
|
||||
if _, ok := serviceMap[old.Version]; !ok {
|
||||
cw.next <- ®istry.Result{Action: "delete", Service: old}
|
||||
}
|
||||
}
|
||||
|
||||
// there are no services in the service, empty all services
|
||||
if len(rservices) != 0 && serviceName == "" {
|
||||
for _, services := range rservices {
|
||||
for _, service := range services {
|
||||
cw.next <- ®istry.Result{Action: "delete", Service: service}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cw.Lock()
|
||||
cw.services[serviceName] = newServices
|
||||
cw.Unlock()
|
||||
}
|
||||
|
||||
func (cw *consulWatcher) handle(idx uint64, data interface{}) {
|
||||
services, ok := data.(map[string][]string)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// add new watchers
|
||||
for service := range services {
|
||||
// Filter on watch options
|
||||
// wo.Service: Only watch services we care about
|
||||
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := cw.watchers[service]; ok {
|
||||
continue
|
||||
}
|
||||
wp, err := watch.Parse(map[string]interface{}{
|
||||
"type": "service",
|
||||
"service": service,
|
||||
})
|
||||
if err == nil {
|
||||
wp.Handler = cw.serviceHandler
|
||||
go wp.RunWithClientAndHclog(cw.r.Client(), wp.Logger)
|
||||
cw.watchers[service] = wp
|
||||
cw.next <- ®istry.Result{Action: "create", Service: ®istry.Service{Name: service}}
|
||||
}
|
||||
}
|
||||
|
||||
cw.RLock()
|
||||
// make a copy
|
||||
rservices := make(map[string][]*registry.Service)
|
||||
for k, v := range cw.services {
|
||||
rservices[k] = v
|
||||
}
|
||||
cw.RUnlock()
|
||||
|
||||
// remove unknown services from registry
|
||||
// save the things we want to delete
|
||||
deleted := make(map[string][]*registry.Service)
|
||||
|
||||
for service := range rservices {
|
||||
if _, ok := services[service]; !ok {
|
||||
cw.Lock()
|
||||
// save this before deleting
|
||||
deleted[service] = cw.services[service]
|
||||
delete(cw.services, service)
|
||||
cw.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// remove unknown services from watchers
|
||||
for service, w := range cw.watchers {
|
||||
if _, ok := services[service]; !ok {
|
||||
w.Stop()
|
||||
delete(cw.watchers, service)
|
||||
for _, oldService := range deleted[service] {
|
||||
// send a delete for the service nodes that we're removing
|
||||
cw.next <- ®istry.Result{Action: "delete", Service: oldService}
|
||||
}
|
||||
// sent the empty list as the last resort to indicate to delete the entire service
|
||||
cw.next <- ®istry.Result{Action: "delete", Service: ®istry.Service{Name: service}}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cw *consulWatcher) Next() (*registry.Result, error) {
|
||||
select {
|
||||
case <-cw.exit:
|
||||
return nil, registry.ErrWatcherStopped
|
||||
case r, ok := <-cw.next:
|
||||
if !ok {
|
||||
return nil, registry.ErrWatcherStopped
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (cw *consulWatcher) Stop() {
|
||||
select {
|
||||
case <-cw.exit:
|
||||
return
|
||||
default:
|
||||
close(cw.exit)
|
||||
if cw.wp == nil {
|
||||
return
|
||||
}
|
||||
cw.wp.Stop()
|
||||
|
||||
// drain results
|
||||
for {
|
||||
select {
|
||||
case <-cw.next:
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
86
registry/consul/watcher_test.go
Normal file
86
registry/consul/watcher_test.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
func TestHealthyServiceHandler(t *testing.T) {
|
||||
watcher := newWatcher()
|
||||
serviceEntry := newServiceEntry(
|
||||
"node-name", "node-address", "service-name", "v1.0.0",
|
||||
[]*api.HealthCheck{
|
||||
newHealthCheck("node-name", "service-name", "passing"),
|
||||
},
|
||||
)
|
||||
|
||||
watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry})
|
||||
|
||||
if len(watcher.services["service-name"][0].Nodes) != 1 {
|
||||
t.Errorf("Expected length of the service nodes to be 1")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnhealthyServiceHandler(t *testing.T) {
|
||||
watcher := newWatcher()
|
||||
serviceEntry := newServiceEntry(
|
||||
"node-name", "node-address", "service-name", "v1.0.0",
|
||||
[]*api.HealthCheck{
|
||||
newHealthCheck("node-name", "service-name", "critical"),
|
||||
},
|
||||
)
|
||||
|
||||
watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry})
|
||||
|
||||
if len(watcher.services["service-name"][0].Nodes) != 0 {
|
||||
t.Errorf("Expected length of the service nodes to be 0")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnhealthyNodeServiceHandler(t *testing.T) {
|
||||
watcher := newWatcher()
|
||||
serviceEntry := newServiceEntry(
|
||||
"node-name", "node-address", "service-name", "v1.0.0",
|
||||
[]*api.HealthCheck{
|
||||
newHealthCheck("node-name", "service-name", "passing"),
|
||||
newHealthCheck("node-name", "serfHealth", "critical"),
|
||||
},
|
||||
)
|
||||
|
||||
watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry})
|
||||
|
||||
if len(watcher.services["service-name"][0].Nodes) != 0 {
|
||||
t.Errorf("Expected length of the service nodes to be 0")
|
||||
}
|
||||
}
|
||||
|
||||
func newWatcher() *consulWatcher {
|
||||
return &consulWatcher{
|
||||
exit: make(chan bool),
|
||||
next: make(chan *registry.Result, 10),
|
||||
services: make(map[string][]*registry.Service),
|
||||
}
|
||||
}
|
||||
|
||||
func newHealthCheck(node, name, status string) *api.HealthCheck {
|
||||
return &api.HealthCheck{
|
||||
Node: node,
|
||||
Name: name,
|
||||
Status: status,
|
||||
ServiceName: name,
|
||||
}
|
||||
}
|
||||
|
||||
func newServiceEntry(node, address, name, version string, checks []*api.HealthCheck) *api.ServiceEntry {
|
||||
return &api.ServiceEntry{
|
||||
Node: &api.Node{Node: node, Address: name},
|
||||
Service: &api.AgentService{
|
||||
Service: name,
|
||||
Address: address,
|
||||
Tags: encodeVersion(version),
|
||||
},
|
||||
Checks: checks,
|
||||
}
|
||||
}
|
418
registry/etcd/etcd.go
Normal file
418
registry/etcd/etcd.go
Normal file
@@ -0,0 +1,418 @@
|
||||
// Package etcd provides an etcd service registry
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
hash "github.com/mitchellh/hashstructure"
|
||||
"go-micro.dev/v5/logger"
|
||||
"go-micro.dev/v5/registry"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
prefix = "/micro/registry/"
|
||||
)
|
||||
|
||||
type etcdRegistry struct {
|
||||
client *clientv3.Client
|
||||
options registry.Options
|
||||
|
||||
sync.RWMutex
|
||||
register map[string]uint64
|
||||
leases map[string]clientv3.LeaseID
|
||||
}
|
||||
|
||||
func NewEtcdRegistry(opts ...registry.Option) registry.Registry {
|
||||
e := &etcdRegistry{
|
||||
options: registry.Options{},
|
||||
register: make(map[string]uint64),
|
||||
leases: make(map[string]clientv3.LeaseID),
|
||||
}
|
||||
username, password := os.Getenv("ETCD_USERNAME"), os.Getenv("ETCD_PASSWORD")
|
||||
if len(username) > 0 && len(password) > 0 {
|
||||
opts = append(opts, Auth(username, password))
|
||||
}
|
||||
address := os.Getenv("MICRO_REGISTRY_ADDRESS")
|
||||
if len(address) > 0 {
|
||||
opts = append(opts, registry.Addrs(address))
|
||||
}
|
||||
configure(e, opts...)
|
||||
return e
|
||||
}
|
||||
|
||||
func configure(e *etcdRegistry, opts ...registry.Option) error {
|
||||
config := clientv3.Config{
|
||||
Endpoints: []string{"127.0.0.1:2379"},
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&e.options)
|
||||
}
|
||||
|
||||
if e.options.Timeout == 0 {
|
||||
e.options.Timeout = 5 * time.Second
|
||||
}
|
||||
|
||||
if e.options.Logger == nil {
|
||||
e.options.Logger = logger.DefaultLogger
|
||||
}
|
||||
|
||||
config.DialTimeout = e.options.Timeout
|
||||
|
||||
if e.options.Secure || e.options.TLSConfig != nil {
|
||||
tlsConfig := e.options.TLSConfig
|
||||
if tlsConfig == nil {
|
||||
tlsConfig = &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
}
|
||||
|
||||
config.TLS = tlsConfig
|
||||
}
|
||||
|
||||
if e.options.Context != nil {
|
||||
u, ok := e.options.Context.Value(authKey{}).(*authCreds)
|
||||
if ok {
|
||||
config.Username = u.Username
|
||||
config.Password = u.Password
|
||||
}
|
||||
cfg, ok := e.options.Context.Value(logConfigKey{}).(*zap.Config)
|
||||
if ok && cfg != nil {
|
||||
config.LogConfig = cfg
|
||||
}
|
||||
}
|
||||
|
||||
var cAddrs []string
|
||||
|
||||
for _, address := range e.options.Addrs {
|
||||
if len(address) == 0 {
|
||||
continue
|
||||
}
|
||||
addr, port, err := net.SplitHostPort(address)
|
||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
||||
port = "2379"
|
||||
addr = address
|
||||
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
|
||||
} else if err == nil {
|
||||
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
|
||||
}
|
||||
}
|
||||
|
||||
// if we got addrs then we'll update
|
||||
if len(cAddrs) > 0 {
|
||||
config.Endpoints = cAddrs
|
||||
}
|
||||
|
||||
cli, err := clientv3.New(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = cli
|
||||
return nil
|
||||
}
|
||||
|
||||
func encode(s *registry.Service) string {
|
||||
b, _ := json.Marshal(s)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func decode(ds []byte) *registry.Service {
|
||||
var s *registry.Service
|
||||
json.Unmarshal(ds, &s)
|
||||
return s
|
||||
}
|
||||
|
||||
func nodePath(s, id string) string {
|
||||
service := strings.Replace(s, "/", "-", -1)
|
||||
node := strings.Replace(id, "/", "-", -1)
|
||||
return path.Join(prefix, service, node)
|
||||
}
|
||||
|
||||
func servicePath(s string) string {
|
||||
return path.Join(prefix, strings.Replace(s, "/", "-", -1))
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Init(opts ...registry.Option) error {
|
||||
return configure(e, opts...)
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Options() registry.Options {
|
||||
return e.options
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
// check existing lease cache
|
||||
e.RLock()
|
||||
leaseID, ok := e.leases[s.Name+node.Id]
|
||||
e.RUnlock()
|
||||
|
||||
log := e.options.Logger
|
||||
|
||||
if !ok {
|
||||
// missing lease, check if the key exists
|
||||
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
||||
defer cancel()
|
||||
|
||||
// look for the existing key
|
||||
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get the existing lease
|
||||
for _, kv := range rsp.Kvs {
|
||||
if kv.Lease > 0 {
|
||||
leaseID = clientv3.LeaseID(kv.Lease)
|
||||
|
||||
// decode the existing node
|
||||
srv := decode(kv.Value)
|
||||
if srv == nil || len(srv.Nodes) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// create hash of service; uint64
|
||||
h, err := hash.Hash(srv.Nodes[0], nil)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// save the info
|
||||
e.Lock()
|
||||
e.leases[s.Name+node.Id] = leaseID
|
||||
e.register[s.Name+node.Id] = h
|
||||
e.Unlock()
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var leaseNotFound bool
|
||||
|
||||
// renew the lease if it exists
|
||||
if leaseID > 0 {
|
||||
log.Logf(logger.TraceLevel, "Renewing existing lease for %s %d", s.Name, leaseID)
|
||||
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
|
||||
if err != rpctypes.ErrLeaseNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Logf(logger.TraceLevel, "Lease not found for %s %d", s.Name, leaseID)
|
||||
// lease not found do register
|
||||
leaseNotFound = true
|
||||
}
|
||||
}
|
||||
|
||||
// create hash of service; uint64
|
||||
h, err := hash.Hash(node, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get existing hash for the service node
|
||||
e.Lock()
|
||||
v, ok := e.register[s.Name+node.Id]
|
||||
e.Unlock()
|
||||
|
||||
// the service is unchanged, skip registering
|
||||
if ok && v == h && !leaseNotFound {
|
||||
log.Logf(logger.TraceLevel, "Service %s node %s unchanged skipping registration", s.Name, node.Id)
|
||||
return nil
|
||||
}
|
||||
|
||||
service := ®istry.Service{
|
||||
Name: s.Name,
|
||||
Version: s.Version,
|
||||
Metadata: s.Metadata,
|
||||
Endpoints: s.Endpoints,
|
||||
Nodes: []*registry.Node{node},
|
||||
}
|
||||
|
||||
var options registry.RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
||||
defer cancel()
|
||||
|
||||
var lgr *clientv3.LeaseGrantResponse
|
||||
if options.TTL.Seconds() > 0 {
|
||||
// get a lease used to expire keys since we have a ttl
|
||||
lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Logf(logger.TraceLevel, "Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL)
|
||||
// create an entry for the node
|
||||
if lgr != nil {
|
||||
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID))
|
||||
} else {
|
||||
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.Lock()
|
||||
// save our hash of the service
|
||||
e.register[s.Name+node.Id] = h
|
||||
// save our leaseID of the service
|
||||
if lgr != nil {
|
||||
e.leases[s.Name+node.Id] = lgr.ID
|
||||
}
|
||||
e.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.DeregisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
for _, node := range s.Nodes {
|
||||
e.Lock()
|
||||
// delete our hash of the service
|
||||
delete(e.register, s.Name+node.Id)
|
||||
// delete our lease of the service
|
||||
delete(e.leases, s.Name+node.Id)
|
||||
e.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
||||
defer cancel()
|
||||
|
||||
e.options.Logger.Logf(logger.TraceLevel, "Deregistering %s id %s", s.Name, node.Id)
|
||||
_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
var gerr error
|
||||
|
||||
// register each node individually
|
||||
for _, node := range s.Nodes {
|
||||
err := e.registerNode(s, node, opts...)
|
||||
if err != nil {
|
||||
gerr = err
|
||||
}
|
||||
}
|
||||
|
||||
return gerr
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
||||
defer cancel()
|
||||
|
||||
rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(rsp.Kvs) == 0 {
|
||||
return nil, registry.ErrNotFound
|
||||
}
|
||||
|
||||
serviceMap := map[string]*registry.Service{}
|
||||
|
||||
for _, n := range rsp.Kvs {
|
||||
if sn := decode(n.Value); sn != nil {
|
||||
s, ok := serviceMap[sn.Version]
|
||||
if !ok {
|
||||
s = ®istry.Service{
|
||||
Name: sn.Name,
|
||||
Version: sn.Version,
|
||||
Metadata: sn.Metadata,
|
||||
Endpoints: sn.Endpoints,
|
||||
}
|
||||
serviceMap[s.Version] = s
|
||||
}
|
||||
|
||||
s.Nodes = append(s.Nodes, sn.Nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
services := make([]*registry.Service, 0, len(serviceMap))
|
||||
for _, service := range serviceMap {
|
||||
services = append(services, service)
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
|
||||
versions := make(map[string]*registry.Service)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
||||
defer cancel()
|
||||
|
||||
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(rsp.Kvs) == 0 {
|
||||
return []*registry.Service{}, nil
|
||||
}
|
||||
|
||||
for _, n := range rsp.Kvs {
|
||||
sn := decode(n.Value)
|
||||
if sn == nil {
|
||||
continue
|
||||
}
|
||||
v, ok := versions[sn.Name+sn.Version]
|
||||
if !ok {
|
||||
versions[sn.Name+sn.Version] = sn
|
||||
continue
|
||||
}
|
||||
// append to service:version nodes
|
||||
v.Nodes = append(v.Nodes, sn.Nodes...)
|
||||
}
|
||||
|
||||
services := make([]*registry.Service, 0, len(versions))
|
||||
for _, service := range versions {
|
||||
services = append(services, service)
|
||||
}
|
||||
|
||||
// sort the services
|
||||
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
return newEtcdWatcher(e, e.options.Timeout, opts...)
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) String() string {
|
||||
return "etcd"
|
||||
}
|
37
registry/etcd/options.go
Normal file
37
registry/etcd/options.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go-micro.dev/v5/registry"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type authKey struct{}
|
||||
|
||||
type logConfigKey struct{}
|
||||
|
||||
type authCreds struct {
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
// Auth allows you to specify username/password.
|
||||
func Auth(username, password string) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password})
|
||||
}
|
||||
}
|
||||
|
||||
// LogConfig allows you to set etcd log config.
|
||||
func LogConfig(config *zap.Config) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, logConfigKey{}, config)
|
||||
}
|
||||
}
|
91
registry/etcd/watcher.go
Normal file
91
registry/etcd/watcher.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v5/registry"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
type etcdWatcher struct {
|
||||
stop chan bool
|
||||
w clientv3.WatchChan
|
||||
client *clientv3.Client
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
stop := make(chan bool, 1)
|
||||
|
||||
go func() {
|
||||
<-stop
|
||||
cancel()
|
||||
}()
|
||||
|
||||
watchPath := prefix
|
||||
if len(wo.Service) > 0 {
|
||||
watchPath = servicePath(wo.Service) + "/"
|
||||
}
|
||||
|
||||
return &etcdWatcher{
|
||||
stop: stop,
|
||||
w: r.client.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV()),
|
||||
client: r.client,
|
||||
timeout: timeout,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ew *etcdWatcher) Next() (*registry.Result, error) {
|
||||
for wresp := range ew.w {
|
||||
if wresp.Err() != nil {
|
||||
return nil, wresp.Err()
|
||||
}
|
||||
if wresp.Canceled {
|
||||
return nil, errors.New("could not get next")
|
||||
}
|
||||
for _, ev := range wresp.Events {
|
||||
service := decode(ev.Kv.Value)
|
||||
var action string
|
||||
|
||||
switch ev.Type {
|
||||
case clientv3.EventTypePut:
|
||||
if ev.IsCreate() {
|
||||
action = "create"
|
||||
} else if ev.IsModify() {
|
||||
action = "update"
|
||||
}
|
||||
case clientv3.EventTypeDelete:
|
||||
action = "delete"
|
||||
|
||||
// get service from prevKv
|
||||
service = decode(ev.PrevKv.Value)
|
||||
}
|
||||
|
||||
if service == nil {
|
||||
continue
|
||||
}
|
||||
return ®istry.Result{
|
||||
Action: action,
|
||||
Service: service,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("could not get next")
|
||||
}
|
||||
|
||||
func (ew *etcdWatcher) Stop() {
|
||||
select {
|
||||
case <-ew.stop:
|
||||
return
|
||||
default:
|
||||
close(ew.stop)
|
||||
}
|
||||
}
|
5
registry/mdns/mdns.go
Normal file
5
registry/mdns/mdns.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package mdns
|
||||
|
||||
var (
|
||||
DefaultRegistry = NewMDNSRegistry()
|
||||
)
|
@@ -1,5 +1,5 @@
|
||||
// Package mdns is a multicast dns registry
|
||||
package registry
|
||||
package mdns
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
log "go-micro.dev/v5/logger"
|
||||
"go-micro.dev/v5/registry"
|
||||
"go-micro.dev/v5/util/mdns"
|
||||
)
|
||||
|
||||
@@ -29,7 +30,7 @@ type mdnsTxt struct {
|
||||
Metadata map[string]string
|
||||
Service string
|
||||
Version string
|
||||
Endpoints []*Endpoint
|
||||
Endpoints []*registry.Endpoint
|
||||
}
|
||||
|
||||
type mdnsEntry struct {
|
||||
@@ -38,7 +39,7 @@ type mdnsEntry struct {
|
||||
}
|
||||
|
||||
type mdnsRegistry struct {
|
||||
opts *Options
|
||||
opts *registry.Options
|
||||
services map[string][]*mdnsEntry
|
||||
|
||||
// watchers
|
||||
@@ -55,7 +56,7 @@ type mdnsRegistry struct {
|
||||
}
|
||||
|
||||
type mdnsWatcher struct {
|
||||
wo WatchOptions
|
||||
wo registry.WatchOptions
|
||||
ch chan *mdns.ServiceEntry
|
||||
exit chan struct{}
|
||||
// the registry
|
||||
@@ -127,9 +128,9 @@ func decode(record []string) (*mdnsTxt, error) {
|
||||
|
||||
return txt, nil
|
||||
}
|
||||
func newRegistry(opts ...Option) Registry {
|
||||
mergedOpts := append([]Option{Timeout(time.Millisecond * 100)}, opts...)
|
||||
options := NewOptions(mergedOpts...)
|
||||
func newRegistry(opts ...registry.Option) registry.Registry {
|
||||
mergedOpts := append([]registry.Option{registry.Timeout(time.Millisecond * 100)}, opts...)
|
||||
options := registry.NewOptions(mergedOpts...)
|
||||
|
||||
// set the domain
|
||||
domain := mdnsDomain
|
||||
@@ -147,18 +148,18 @@ func newRegistry(opts ...Option) Registry {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Init(opts ...Option) error {
|
||||
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
|
||||
for _, o := range opts {
|
||||
o(m.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Options() Options {
|
||||
func (m *mdnsRegistry) Options() registry.Options {
|
||||
return *m.opts
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {
|
||||
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
@@ -263,7 +264,7 @@ func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error
|
||||
return gerr
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Deregister(service *Service, opts ...DeregisterOption) error {
|
||||
func (m *mdnsRegistry) Deregister(service *registry.Service, opts ...registry.DeregisterOption) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
@@ -298,9 +299,9 @@ func (m *mdnsRegistry) Deregister(service *Service, opts ...DeregisterOption) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service, error) {
|
||||
func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
||||
logger := m.opts.Logger
|
||||
serviceMap := make(map[string]*Service)
|
||||
serviceMap := make(map[string]*registry.Service)
|
||||
entries := make(chan *mdns.ServiceEntry, 10)
|
||||
done := make(chan bool)
|
||||
|
||||
@@ -340,7 +341,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service
|
||||
|
||||
s, ok := serviceMap[txt.Version]
|
||||
if !ok {
|
||||
s = &Service{
|
||||
s = ®istry.Service{
|
||||
Name: txt.Service,
|
||||
Version: txt.Version,
|
||||
Endpoints: txt.Endpoints,
|
||||
@@ -357,7 +358,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service
|
||||
logger.Logf(log.InfoLevel, "[mdns]: invalid endpoint received: %v", e)
|
||||
continue
|
||||
}
|
||||
s.Nodes = append(s.Nodes, &Node{
|
||||
s.Nodes = append(s.Nodes, ®istry.Node{
|
||||
Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."),
|
||||
Address: addr,
|
||||
Metadata: txt.Metadata,
|
||||
@@ -380,7 +381,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service
|
||||
<-done
|
||||
|
||||
// create list and return
|
||||
services := make([]*Service, 0, len(serviceMap))
|
||||
services := make([]*registry.Service, 0, len(serviceMap))
|
||||
|
||||
for _, service := range serviceMap {
|
||||
services = append(services, service)
|
||||
@@ -389,7 +390,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) ListServices(opts ...ListOption) ([]*Service, error) {
|
||||
func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
|
||||
serviceMap := make(map[string]bool)
|
||||
entries := make(chan *mdns.ServiceEntry, 10)
|
||||
done := make(chan bool)
|
||||
@@ -404,7 +405,7 @@ func (m *mdnsRegistry) ListServices(opts ...ListOption) ([]*Service, error) {
|
||||
// set domain
|
||||
p.Domain = m.domain
|
||||
|
||||
var services []*Service
|
||||
var services []*registry.Service
|
||||
|
||||
go func() {
|
||||
for {
|
||||
@@ -419,7 +420,7 @@ func (m *mdnsRegistry) ListServices(opts ...ListOption) ([]*Service, error) {
|
||||
name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".")
|
||||
if !serviceMap[name] {
|
||||
serviceMap[name] = true
|
||||
services = append(services, &Service{Name: name})
|
||||
services = append(services, ®istry.Service{Name: name})
|
||||
}
|
||||
case <-p.Context.Done():
|
||||
close(done)
|
||||
@@ -439,8 +440,8 @@ func (m *mdnsRegistry) ListServices(opts ...ListOption) ([]*Service, error) {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Watch(opts ...WatchOption) (Watcher, error) {
|
||||
var wo WatchOptions
|
||||
func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
@@ -537,7 +538,7 @@ func (m *mdnsRegistry) String() string {
|
||||
return "mdns"
|
||||
}
|
||||
|
||||
func (m *mdnsWatcher) Next() (*Result, error) {
|
||||
func (m *mdnsWatcher) Next() (*registry.Result, error) {
|
||||
for {
|
||||
select {
|
||||
case e := <-m.ch:
|
||||
@@ -562,7 +563,7 @@ func (m *mdnsWatcher) Next() (*Result, error) {
|
||||
action = "create"
|
||||
}
|
||||
|
||||
service := &Service{
|
||||
service := ®istry.Service{
|
||||
Name: txt.Service,
|
||||
Version: txt.Version,
|
||||
Endpoints: txt.Endpoints,
|
||||
@@ -583,18 +584,18 @@ func (m *mdnsWatcher) Next() (*Result, error) {
|
||||
addr = e.Addr.String()
|
||||
}
|
||||
|
||||
service.Nodes = append(service.Nodes, &Node{
|
||||
service.Nodes = append(service.Nodes, ®istry.Node{
|
||||
Id: strings.TrimSuffix(e.Name, suffix),
|
||||
Address: addr,
|
||||
Metadata: txt.Metadata,
|
||||
})
|
||||
|
||||
return &Result{
|
||||
return ®istry.Result{
|
||||
Action: action,
|
||||
Service: service,
|
||||
}, nil
|
||||
case <-m.exit:
|
||||
return nil, ErrWatcherStopped
|
||||
return nil, registry.ErrWatcherStopped
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -613,6 +614,6 @@ func (m *mdnsWatcher) Stop() {
|
||||
}
|
||||
|
||||
// NewRegistry returns a new default registry which is mdns.
|
||||
func NewRegistry(opts ...Option) Registry {
|
||||
func NewMDNSRegistry(opts ...registry.Option) registry.Registry {
|
||||
return newRegistry(opts...)
|
||||
}
|
@@ -1,9 +1,11 @@
|
||||
package registry
|
||||
package mdns
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
func TestMDNS(t *testing.T) {
|
||||
@@ -12,11 +14,11 @@ func TestMDNS(t *testing.T) {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
testData := []*Service{
|
||||
testData := []*registry.Service{
|
||||
{
|
||||
Name: "test1",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test1-1",
|
||||
Address: "10.0.0.1:10001",
|
||||
@@ -29,7 +31,7 @@ func TestMDNS(t *testing.T) {
|
||||
{
|
||||
Name: "test2",
|
||||
Version: "1.0.2",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test2-1",
|
||||
Address: "10.0.0.2:10002",
|
||||
@@ -42,7 +44,7 @@ func TestMDNS(t *testing.T) {
|
||||
{
|
||||
Name: "test3",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test3-1",
|
||||
Address: "10.0.0.3:10003",
|
||||
@@ -55,7 +57,7 @@ func TestMDNS(t *testing.T) {
|
||||
{
|
||||
Name: "test4",
|
||||
Version: "1.0.4",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test4-1",
|
||||
Address: "[::]:10004",
|
||||
@@ -69,14 +71,14 @@ func TestMDNS(t *testing.T) {
|
||||
|
||||
travis := os.Getenv("TRAVIS")
|
||||
|
||||
var opts []Option
|
||||
var opts []registry.Option
|
||||
|
||||
if travis == "true" {
|
||||
opts = append(opts, Timeout(time.Millisecond*100))
|
||||
opts = append(opts, registry.Timeout(time.Millisecond*100))
|
||||
}
|
||||
|
||||
// new registry
|
||||
r := NewRegistry(opts...)
|
||||
r := NewMDNSRegistry(opts...)
|
||||
|
||||
for _, service := range testData {
|
||||
// register service
|
||||
@@ -156,14 +158,14 @@ func TestEncoding(t *testing.T) {
|
||||
Metadata: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Endpoints: []*Endpoint{
|
||||
Endpoints: []*registry.Endpoint{
|
||||
{
|
||||
Name: "endpoint1",
|
||||
Request: &Value{
|
||||
Request: ®istry.Value{
|
||||
Name: "request",
|
||||
Type: "request",
|
||||
},
|
||||
Response: &Value{
|
||||
Response: ®istry.Value{
|
||||
Name: "response",
|
||||
Type: "response",
|
||||
},
|
||||
@@ -213,11 +215,11 @@ func TestWatcher(t *testing.T) {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
testData := []*Service{
|
||||
testData := []*registry.Service{
|
||||
{
|
||||
Name: "test1",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test1-1",
|
||||
Address: "10.0.0.1:10001",
|
||||
@@ -230,7 +232,7 @@ func TestWatcher(t *testing.T) {
|
||||
{
|
||||
Name: "test2",
|
||||
Version: "1.0.2",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test2-1",
|
||||
Address: "10.0.0.2:10002",
|
||||
@@ -243,7 +245,7 @@ func TestWatcher(t *testing.T) {
|
||||
{
|
||||
Name: "test3",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test3-1",
|
||||
Address: "10.0.0.3:10003",
|
||||
@@ -256,7 +258,7 @@ func TestWatcher(t *testing.T) {
|
||||
{
|
||||
Name: "test4",
|
||||
Version: "1.0.4",
|
||||
Nodes: []*Node{
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test4-1",
|
||||
Address: "[::]:10004",
|
||||
@@ -268,7 +270,7 @@ func TestWatcher(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
testFn := func(service, s *Service) {
|
||||
testFn := func(service, s *registry.Service) {
|
||||
if s == nil {
|
||||
t.Fatalf("Expected one result for %s got nil", service.Name)
|
||||
}
|
||||
@@ -298,14 +300,14 @@ func TestWatcher(t *testing.T) {
|
||||
|
||||
travis := os.Getenv("TRAVIS")
|
||||
|
||||
var opts []Option
|
||||
var opts []registry.Option
|
||||
|
||||
if travis == "true" {
|
||||
opts = append(opts, Timeout(time.Millisecond*100))
|
||||
opts = append(opts, registry.Timeout(time.Millisecond*100))
|
||||
}
|
||||
|
||||
// new registry
|
||||
r := NewRegistry(opts...)
|
||||
r := NewMDNSRegistry(opts...)
|
||||
|
||||
w, err := r.Watch()
|
||||
if err != nil {
|
417
registry/nats/nats.go
Normal file
417
registry/nats/nats.go
Normal file
@@ -0,0 +1,417 @@
|
||||
// Package nats provides a NATS registry using broadcast queries
|
||||
package nats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
type natsRegistry struct {
|
||||
addrs []string
|
||||
opts registry.Options
|
||||
nopts nats.Options
|
||||
queryTopic string
|
||||
watchTopic string
|
||||
registerAction string
|
||||
|
||||
sync.RWMutex
|
||||
conn *nats.Conn
|
||||
services map[string][]*registry.Service
|
||||
listeners map[string]chan bool
|
||||
}
|
||||
|
||||
var (
|
||||
defaultQueryTopic = "micro.nats.query"
|
||||
defaultWatchTopic = "micro.nats.watch"
|
||||
defaultRegisterAction = "create"
|
||||
)
|
||||
|
||||
func configure(n *natsRegistry, opts ...registry.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&n.opts)
|
||||
}
|
||||
|
||||
natsOptions := nats.GetDefaultOptions()
|
||||
if n, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
|
||||
natsOptions = n
|
||||
}
|
||||
|
||||
queryTopic := defaultQueryTopic
|
||||
if qt, ok := n.opts.Context.Value(queryTopicKey{}).(string); ok {
|
||||
queryTopic = qt
|
||||
}
|
||||
|
||||
watchTopic := defaultWatchTopic
|
||||
if wt, ok := n.opts.Context.Value(watchTopicKey{}).(string); ok {
|
||||
watchTopic = wt
|
||||
}
|
||||
|
||||
registerAction := defaultRegisterAction
|
||||
if ra, ok := n.opts.Context.Value(registerActionKey{}).(string); ok {
|
||||
registerAction = ra
|
||||
}
|
||||
|
||||
// Options have higher priority than nats.Options
|
||||
// only if Addrs, Secure or TLSConfig were not set through a Option
|
||||
// we read them from nats.Option
|
||||
if len(n.opts.Addrs) == 0 {
|
||||
n.opts.Addrs = natsOptions.Servers
|
||||
}
|
||||
|
||||
if !n.opts.Secure {
|
||||
n.opts.Secure = natsOptions.Secure
|
||||
}
|
||||
|
||||
if n.opts.TLSConfig == nil {
|
||||
n.opts.TLSConfig = natsOptions.TLSConfig
|
||||
}
|
||||
|
||||
// check & add nats:// prefix (this makes also sure that the addresses
|
||||
// stored in natsaddrs and n.opts.Addrs are identical)
|
||||
n.opts.Addrs = setAddrs(n.opts.Addrs)
|
||||
|
||||
n.addrs = n.opts.Addrs
|
||||
n.nopts = natsOptions
|
||||
n.queryTopic = queryTopic
|
||||
n.watchTopic = watchTopic
|
||||
n.registerAction = registerAction
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setAddrs(addrs []string) []string {
|
||||
var cAddrs []string
|
||||
for _, addr := range addrs {
|
||||
if len(addr) == 0 {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(addr, "nats://") {
|
||||
addr = "nats://" + addr
|
||||
}
|
||||
cAddrs = append(cAddrs, addr)
|
||||
}
|
||||
if len(cAddrs) == 0 {
|
||||
cAddrs = []string{nats.DefaultURL}
|
||||
}
|
||||
return cAddrs
|
||||
}
|
||||
|
||||
func (n *natsRegistry) newConn() (*nats.Conn, error) {
|
||||
opts := n.nopts
|
||||
opts.Servers = n.addrs
|
||||
opts.Secure = n.opts.Secure
|
||||
opts.TLSConfig = n.opts.TLSConfig
|
||||
|
||||
// secure might not be set
|
||||
if opts.TLSConfig != nil {
|
||||
opts.Secure = true
|
||||
}
|
||||
|
||||
return opts.Connect()
|
||||
}
|
||||
|
||||
func (n *natsRegistry) getConn() (*nats.Conn, error) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if n.conn != nil {
|
||||
return n.conn, nil
|
||||
}
|
||||
|
||||
c, err := n.newConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
n.conn = c
|
||||
|
||||
return n.conn, nil
|
||||
}
|
||||
|
||||
func (n *natsRegistry) register(s *registry.Service) error {
|
||||
conn, err := n.getConn()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
// cache service
|
||||
n.services[s.Name] = addServices(n.services[s.Name], cp([]*registry.Service{s}))
|
||||
|
||||
// create query listener
|
||||
if n.listeners[s.Name] == nil {
|
||||
listener := make(chan bool)
|
||||
|
||||
// create a subscriber that responds to queries
|
||||
sub, err := conn.Subscribe(n.queryTopic, func(m *nats.Msg) {
|
||||
var result *registry.Result
|
||||
|
||||
if err := json.Unmarshal(m.Data, &result); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var services []*registry.Service
|
||||
|
||||
switch result.Action {
|
||||
// is this a get query and we own the service?
|
||||
case "get":
|
||||
if result.Service.Name != s.Name {
|
||||
return
|
||||
}
|
||||
n.RLock()
|
||||
services = cp(n.services[s.Name])
|
||||
n.RUnlock()
|
||||
// it's a list request, but we're still only a
|
||||
// subscriber for this service... so just get this service
|
||||
// totally suboptimal
|
||||
case "list":
|
||||
n.RLock()
|
||||
services = cp(n.services[s.Name])
|
||||
n.RUnlock()
|
||||
default:
|
||||
// does not match
|
||||
return
|
||||
}
|
||||
|
||||
// respond to query
|
||||
for _, service := range services {
|
||||
b, err := json.Marshal(service)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
conn.Publish(m.Reply, b)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Unsubscribe if we're told to do so
|
||||
go func() {
|
||||
<-listener
|
||||
sub.Unsubscribe()
|
||||
}()
|
||||
|
||||
n.listeners[s.Name] = listener
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *natsRegistry) deregister(s *registry.Service) error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
services := delServices(n.services[s.Name], cp([]*registry.Service{s}))
|
||||
if len(services) > 0 {
|
||||
n.services[s.Name] = services
|
||||
return nil
|
||||
}
|
||||
|
||||
// delete cached service
|
||||
delete(n.services, s.Name)
|
||||
|
||||
// delete query listener
|
||||
if listener, lexists := n.listeners[s.Name]; lexists {
|
||||
close(listener)
|
||||
delete(n.listeners, s.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *natsRegistry) query(s string, quorum int) ([]*registry.Service, error) {
|
||||
conn, err := n.getConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var action string
|
||||
var service *registry.Service
|
||||
|
||||
if len(s) > 0 {
|
||||
action = "get"
|
||||
service = ®istry.Service{Name: s}
|
||||
} else {
|
||||
action = "list"
|
||||
}
|
||||
|
||||
inbox := nats.NewInbox()
|
||||
|
||||
response := make(chan *registry.Service, 10)
|
||||
|
||||
sub, err := conn.Subscribe(inbox, func(m *nats.Msg) {
|
||||
var service *registry.Service
|
||||
if err := json.Unmarshal(m.Data, &service); err != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case response <- service:
|
||||
case <-time.After(n.opts.Timeout):
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
b, err := json.Marshal(®istry.Result{Action: action, Service: service})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := conn.PublishMsg(&nats.Msg{
|
||||
Subject: n.queryTopic,
|
||||
Reply: inbox,
|
||||
Data: b,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
timeoutChan := time.After(n.opts.Timeout)
|
||||
|
||||
serviceMap := make(map[string]*registry.Service)
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case service := <-response:
|
||||
key := service.Name + "-" + service.Version
|
||||
srv, ok := serviceMap[key]
|
||||
if ok {
|
||||
srv.Nodes = append(srv.Nodes, service.Nodes...)
|
||||
serviceMap[key] = srv
|
||||
} else {
|
||||
serviceMap[key] = service
|
||||
}
|
||||
|
||||
if quorum > 0 && len(serviceMap[key].Nodes) >= quorum {
|
||||
break loop
|
||||
}
|
||||
case <-timeoutChan:
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
var services []*registry.Service
|
||||
for _, service := range serviceMap {
|
||||
services = append(services, service)
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (n *natsRegistry) Init(opts ...registry.Option) error {
|
||||
return configure(n, opts...)
|
||||
}
|
||||
|
||||
func (n *natsRegistry) Options() registry.Options {
|
||||
return n.opts
|
||||
}
|
||||
|
||||
func (n *natsRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
||||
if err := n.register(s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := n.getConn()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := json.Marshal(®istry.Result{Action: n.registerAction, Service: s})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return conn.Publish(n.watchTopic, b)
|
||||
}
|
||||
|
||||
func (n *natsRegistry) Deregister(s *registry.Service, opts ...registry.DeregisterOption) error {
|
||||
if err := n.deregister(s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := n.getConn()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := json.Marshal(®istry.Result{Action: "delete", Service: s})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Publish(n.watchTopic, b)
|
||||
}
|
||||
|
||||
func (n *natsRegistry) GetService(s string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
||||
services, err := n.query(s, getQuorum(n.opts))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (n *natsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
|
||||
s, err := n.query("", 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var services []*registry.Service
|
||||
serviceMap := make(map[string]*registry.Service)
|
||||
|
||||
for _, v := range s {
|
||||
serviceMap[v.Name] = ®istry.Service{Name: v.Name, Version: v.Version}
|
||||
}
|
||||
|
||||
for _, v := range serviceMap {
|
||||
services = append(services, v)
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (n *natsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
conn, err := n.getConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub, err := conn.SubscribeSync(n.watchTopic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
return &natsWatcher{sub, wo}, nil
|
||||
}
|
||||
|
||||
func (n *natsRegistry) String() string {
|
||||
return "nats"
|
||||
}
|
||||
|
||||
func NewNatsRegistry(opts ...registry.Option) registry.Registry {
|
||||
options := registry.Options{
|
||||
Timeout: time.Millisecond * 100,
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
n := &natsRegistry{
|
||||
opts: options,
|
||||
services: make(map[string][]*registry.Service),
|
||||
listeners: make(map[string]chan bool),
|
||||
}
|
||||
configure(n, opts...)
|
||||
return n
|
||||
}
|
18
registry/nats/nats_assert_test.go
Normal file
18
registry/nats/nats_assert_test.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package nats_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func assertNoError(tb testing.TB, actual error) {
|
||||
if actual != nil {
|
||||
tb.Errorf("expected no error, got %v", actual)
|
||||
}
|
||||
}
|
||||
|
||||
func assertEqual(tb testing.TB, expected, actual interface{}) {
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
tb.Errorf("expected %v, got %v", expected, actual)
|
||||
}
|
||||
}
|
69
registry/nats/nats_environment_test.go
Normal file
69
registry/nats/nats_environment_test.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package nats_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
log "go-micro.dev/v5/logger"
|
||||
"go-micro.dev/v5/registry"
|
||||
"go-micro.dev/v5/registry/nats"
|
||||
)
|
||||
|
||||
type environment struct {
|
||||
registryOne registry.Registry
|
||||
registryTwo registry.Registry
|
||||
registryThree registry.Registry
|
||||
|
||||
serviceOne registry.Service
|
||||
serviceTwo registry.Service
|
||||
|
||||
nodeOne registry.Node
|
||||
nodeTwo registry.Node
|
||||
nodeThree registry.Node
|
||||
}
|
||||
|
||||
var e environment
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
natsURL := os.Getenv("NATS_URL")
|
||||
if natsURL == "" {
|
||||
log.Infof("NATS_URL is undefined - skipping tests")
|
||||
return
|
||||
}
|
||||
|
||||
e.registryOne = nats.NewNatsRegistry(registry.Addrs(natsURL), nats.Quorum(1))
|
||||
e.registryTwo = nats.NewNatsRegistry(registry.Addrs(natsURL), nats.Quorum(1))
|
||||
e.registryThree = nats.NewNatsRegistry(registry.Addrs(natsURL), nats.Quorum(1))
|
||||
|
||||
e.serviceOne.Name = "one"
|
||||
e.serviceOne.Version = "default"
|
||||
e.serviceOne.Nodes = []*registry.Node{&e.nodeOne}
|
||||
|
||||
e.serviceTwo.Name = "two"
|
||||
e.serviceTwo.Version = "default"
|
||||
e.serviceTwo.Nodes = []*registry.Node{&e.nodeOne, &e.nodeTwo}
|
||||
|
||||
e.nodeOne.Id = "one"
|
||||
e.nodeTwo.Id = "two"
|
||||
e.nodeThree.Id = "three"
|
||||
|
||||
if err := e.registryOne.Register(&e.serviceOne); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
if err := e.registryOne.Register(&e.serviceTwo); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
result := m.Run()
|
||||
|
||||
if err := e.registryOne.Deregister(&e.serviceOne); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
if err := e.registryOne.Deregister(&e.serviceTwo); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
os.Exit(result)
|
||||
}
|
87
registry/nats/nats_options.go
Normal file
87
registry/nats/nats_options.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
type contextQuorumKey struct{}
|
||||
type optionsKey struct{}
|
||||
type watchTopicKey struct{}
|
||||
type queryTopicKey struct{}
|
||||
type registerActionKey struct{}
|
||||
|
||||
var (
|
||||
DefaultQuorum = 0
|
||||
)
|
||||
|
||||
func getQuorum(o registry.Options) int {
|
||||
if o.Context == nil {
|
||||
return DefaultQuorum
|
||||
}
|
||||
|
||||
value := o.Context.Value(contextQuorumKey{})
|
||||
if v, ok := value.(int); ok {
|
||||
return v
|
||||
} else {
|
||||
return DefaultQuorum
|
||||
}
|
||||
}
|
||||
|
||||
func Quorum(n int) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
o.Context = context.WithValue(o.Context, contextQuorumKey{}, n)
|
||||
}
|
||||
}
|
||||
|
||||
// Options allow to inject a nats.Options struct for configuring
|
||||
// the nats connection.
|
||||
func NatsOptions(nopts nats.Options) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, optionsKey{}, nopts)
|
||||
}
|
||||
}
|
||||
|
||||
// QueryTopic allows to set a custom nats topic on which service registries
|
||||
// query (survey) other services. All registries listen on this topic and
|
||||
// then respond to the query message.
|
||||
func QueryTopic(s string) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, queryTopicKey{}, s)
|
||||
}
|
||||
}
|
||||
|
||||
// WatchTopic allows to set a custom nats topic on which registries broadcast
|
||||
// changes (e.g. when services are added, updated or removed). Since we don't
|
||||
// have a central registry service, each service typically broadcasts in a
|
||||
// determined frequency on this topic.
|
||||
func WatchTopic(s string) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, watchTopicKey{}, s)
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterAction allows to set the action to use when registering to nats.
|
||||
// As of now there are three different options:
|
||||
// - "create" (default) only registers if there is noone already registered under the same key.
|
||||
// - "update" only updates the registration if it already exists.
|
||||
// - "put" creates or updates a registration
|
||||
func RegisterAction(s string) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, registerActionKey{}, s)
|
||||
}
|
||||
}
|
5
registry/nats/nats_registry.go
Normal file
5
registry/nats/nats_registry.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package nats
|
||||
|
||||
var (
|
||||
DefaultRegistry = NewNatsRegistry()
|
||||
)
|
95
registry/nats/nats_test.go
Normal file
95
registry/nats/nats_test.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package nats_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
func TestRegister(t *testing.T) {
|
||||
service := registry.Service{Name: "test"}
|
||||
assertNoError(t, e.registryOne.Register(&service))
|
||||
defer e.registryOne.Deregister(&service)
|
||||
|
||||
services, err := e.registryOne.ListServices()
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 3, len(services))
|
||||
|
||||
services, err = e.registryTwo.ListServices()
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 3, len(services))
|
||||
}
|
||||
|
||||
func TestDeregister(t *testing.T) {
|
||||
service1 := registry.Service{Name: "test-deregister", Version: "v1"}
|
||||
service2 := registry.Service{Name: "test-deregister", Version: "v2"}
|
||||
|
||||
assertNoError(t, e.registryOne.Register(&service1))
|
||||
services, err := e.registryOne.GetService(service1.Name)
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 1, len(services))
|
||||
|
||||
assertNoError(t, e.registryOne.Register(&service2))
|
||||
services, err = e.registryOne.GetService(service2.Name)
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 2, len(services))
|
||||
|
||||
assertNoError(t, e.registryOne.Deregister(&service1))
|
||||
services, err = e.registryOne.GetService(service1.Name)
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 1, len(services))
|
||||
|
||||
assertNoError(t, e.registryOne.Deregister(&service2))
|
||||
services, err = e.registryOne.GetService(service1.Name)
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 0, len(services))
|
||||
}
|
||||
|
||||
func TestGetService(t *testing.T) {
|
||||
services, err := e.registryTwo.GetService("one")
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 1, len(services))
|
||||
assertEqual(t, "one", services[0].Name)
|
||||
assertEqual(t, 1, len(services[0].Nodes))
|
||||
}
|
||||
|
||||
func TestGetServiceWithNoNodes(t *testing.T) {
|
||||
services, err := e.registryOne.GetService("missing")
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 0, len(services))
|
||||
}
|
||||
|
||||
func TestGetServiceFromMultipleNodes(t *testing.T) {
|
||||
services, err := e.registryOne.GetService("two")
|
||||
assertNoError(t, err)
|
||||
assertEqual(t, 1, len(services))
|
||||
assertEqual(t, "two", services[0].Name)
|
||||
assertEqual(t, 2, len(services[0].Nodes))
|
||||
}
|
||||
|
||||
func BenchmarkGetService(b *testing.B) {
|
||||
for n := 0; n < b.N; n++ {
|
||||
services, err := e.registryTwo.GetService("one")
|
||||
assertNoError(b, err)
|
||||
assertEqual(b, 1, len(services))
|
||||
assertEqual(b, "one", services[0].Name)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetServiceWithNoNodes(b *testing.B) {
|
||||
for n := 0; n < b.N; n++ {
|
||||
services, err := e.registryOne.GetService("missing")
|
||||
assertNoError(b, err)
|
||||
assertEqual(b, 0, len(services))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetServiceFromMultipleNodes(b *testing.B) {
|
||||
for n := 0; n < b.N; n++ {
|
||||
services, err := e.registryTwo.GetService("two")
|
||||
assertNoError(b, err)
|
||||
assertEqual(b, 1, len(services))
|
||||
assertEqual(b, "two", services[0].Name)
|
||||
assertEqual(b, 2, len(services[0].Nodes))
|
||||
}
|
||||
}
|
107
registry/nats/nats_util.go
Normal file
107
registry/nats/nats_util.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package nats
|
||||
|
||||
import "go-micro.dev/v5/registry"
|
||||
|
||||
func cp(current []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, service := range current {
|
||||
// copy service
|
||||
s := new(registry.Service)
|
||||
*s = *service
|
||||
|
||||
// copy nodes
|
||||
var nodes []*registry.Node
|
||||
for _, node := range service.Nodes {
|
||||
n := new(registry.Node)
|
||||
*n = *node
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
s.Nodes = nodes
|
||||
|
||||
// copy endpoints
|
||||
var eps []*registry.Endpoint
|
||||
for _, ep := range service.Endpoints {
|
||||
e := new(registry.Endpoint)
|
||||
*e = *ep
|
||||
eps = append(eps, e)
|
||||
}
|
||||
s.Endpoints = eps
|
||||
|
||||
// append service
|
||||
services = append(services, s)
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
|
||||
func addNodes(old, neu []*registry.Node) []*registry.Node {
|
||||
for _, n := range neu {
|
||||
var seen bool
|
||||
for i, o := range old {
|
||||
if o.Id == n.Id {
|
||||
seen = true
|
||||
old[i] = n
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
old = append(old, n)
|
||||
}
|
||||
}
|
||||
return old
|
||||
}
|
||||
|
||||
func addServices(old, neu []*registry.Service) []*registry.Service {
|
||||
for _, s := range neu {
|
||||
var seen bool
|
||||
for i, o := range old {
|
||||
if o.Version == s.Version {
|
||||
s.Nodes = addNodes(o.Nodes, s.Nodes)
|
||||
seen = true
|
||||
old[i] = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
old = append(old, s)
|
||||
}
|
||||
}
|
||||
return old
|
||||
}
|
||||
|
||||
func delNodes(old, del []*registry.Node) []*registry.Node {
|
||||
var nodes []*registry.Node
|
||||
for _, o := range old {
|
||||
var rem bool
|
||||
for _, n := range del {
|
||||
if o.Id == n.Id {
|
||||
rem = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !rem {
|
||||
nodes = append(nodes, o)
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func delServices(old, del []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
for i, o := range old {
|
||||
var rem bool
|
||||
for _, s := range del {
|
||||
if o.Version == s.Version {
|
||||
old[i].Nodes = delNodes(o.Nodes, s.Nodes)
|
||||
if len(old[i].Nodes) == 0 {
|
||||
rem = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !rem {
|
||||
services = append(services, o)
|
||||
}
|
||||
}
|
||||
return services
|
||||
}
|
39
registry/nats/nats_watcher.go
Normal file
39
registry/nats/nats_watcher.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"go-micro.dev/v5/registry"
|
||||
)
|
||||
|
||||
type natsWatcher struct {
|
||||
sub *nats.Subscription
|
||||
wo registry.WatchOptions
|
||||
}
|
||||
|
||||
func (n *natsWatcher) Next() (*registry.Result, error) {
|
||||
var result *registry.Result
|
||||
for {
|
||||
m, err := n.sub.NextMsg(time.Minute)
|
||||
if err != nil && err == nats.ErrTimeout {
|
||||
continue
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := json.Unmarshal(m.Data, &result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(n.wo.Service) > 0 && result.Service.Name != n.wo.Service {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (n *natsWatcher) Stop() {
|
||||
n.sub.Unsubscribe()
|
||||
}
|
166
registry/options_test.go
Normal file
166
registry/options_test.go
Normal file
@@ -0,0 +1,166 @@
|
||||
//go:build nats
|
||||
// +build nats
|
||||
|
||||
package registry
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
var addrTestCases = []struct {
|
||||
name string
|
||||
description string
|
||||
addrs map[string]string // expected address : set address
|
||||
}{
|
||||
{
|
||||
"registryOption",
|
||||
"set registry addresses through a registry.Option in constructor",
|
||||
map[string]string{
|
||||
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||
},
|
||||
{
|
||||
"natsOption",
|
||||
"set registry addresses through the nats.Option in constructor",
|
||||
map[string]string{
|
||||
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||
},
|
||||
{
|
||||
"default",
|
||||
"check if default Address is set correctly",
|
||||
map[string]string{
|
||||
nats.DefaultURL: "",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestInitAddrs(t *testing.T) {
|
||||
for _, tc := range addrTestCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) {
|
||||
var reg Registry
|
||||
var addrs []string
|
||||
|
||||
for _, addr := range tc.addrs {
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
|
||||
switch tc.name {
|
||||
case "registryOption":
|
||||
// we know that there are just two addrs in the dict
|
||||
reg = NewRegistry(Addrs(addrs[0], addrs[1]))
|
||||
case "natsOption":
|
||||
nopts := nats.GetDefaultOptions()
|
||||
nopts.Servers = addrs
|
||||
reg = NewRegistry(Options(nopts))
|
||||
case "default":
|
||||
reg = NewRegistry()
|
||||
}
|
||||
|
||||
// if err := reg.Register(dummyService); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
|
||||
natsRegistry, ok := reg.(*natsRegistry)
|
||||
if !ok {
|
||||
t.Fatal("Expected registry to be of types *natsRegistry")
|
||||
}
|
||||
// check if the same amount of addrs we set has actually been set
|
||||
if len(natsRegistry.addrs) != len(tc.addrs) {
|
||||
t.Errorf("Expected Addr = %v, Actual Addr = %v",
|
||||
natsRegistry.addrs, tc.addrs)
|
||||
t.Errorf("Expected Addr count = %d, Actual Addr count = %d",
|
||||
len(natsRegistry.addrs), len(tc.addrs))
|
||||
}
|
||||
|
||||
for _, addr := range natsRegistry.addrs {
|
||||
_, ok := tc.addrs[addr]
|
||||
if !ok {
|
||||
t.Errorf("Expected Addr = %v, Actual Addr = %v",
|
||||
natsRegistry.addrs, tc.addrs)
|
||||
t.Errorf("Expected '%s' has not been set", addr)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchQueryTopic(t *testing.T) {
|
||||
natsURL := os.Getenv("NATS_URL")
|
||||
if natsURL == "" {
|
||||
log.Println("NATS_URL is undefined - skipping tests")
|
||||
return
|
||||
}
|
||||
|
||||
watchTopic := "custom.test.watch"
|
||||
queryTopic := "custom.test.query"
|
||||
wt := WatchTopic(watchTopic)
|
||||
qt := QueryTopic(queryTopic)
|
||||
|
||||
// connect to NATS and subscribe to the Watch & Query topics where the
|
||||
// registry will publish a msg
|
||||
nopts := nats.GetDefaultOptions()
|
||||
nopts.Servers = setAddrs([]string{natsURL})
|
||||
conn, err := nopts.Connect()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
okCh := make(chan struct{})
|
||||
|
||||
// Wait until we have received something on both topics
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(okCh)
|
||||
}()
|
||||
|
||||
// handler just calls wg.Done()
|
||||
rcvdHdlr := func(m *nats.Msg) {
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
_, err = conn.Subscribe(queryTopic, rcvdHdlr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = conn.Subscribe(watchTopic, rcvdHdlr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dummyService := &Service{
|
||||
Name: "TestInitAddr",
|
||||
Version: "1.0.0",
|
||||
}
|
||||
|
||||
reg := NewRegistry(qt, wt, Addrs(natsURL))
|
||||
|
||||
// trigger registry to send out message on watchTopic
|
||||
if err := reg.Register(dummyService); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// trigger registry to send out message on queryTopic
|
||||
if _, err := reg.ListServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// make sure that we received something on tc.topic
|
||||
select {
|
||||
case <-okCh:
|
||||
// fine - we received on both topics a message from the registry
|
||||
case <-time.After(time.Millisecond * 200):
|
||||
t.Fatal("timeout - no data received on watch topic")
|
||||
}
|
||||
}
|
@@ -6,8 +6,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultRegistry = NewRegistry()
|
||||
|
||||
// Not found error when GetService is called.
|
||||
ErrNotFound = errors.New("service not found")
|
||||
// Watcher stopped error when watcher is stopped.
|
||||
@@ -95,3 +93,7 @@ func Watch(opts ...WatchOption) (Watcher, error) {
|
||||
func String() string {
|
||||
return DefaultRegistry.String()
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultRegistry = NewMemoryRegistry()
|
||||
)
|
||||
|
Reference in New Issue
Block a user