// Package etcd provides an etcd service registry package etcd import ( "context" "crypto/tls" "encoding/json" "errors" "net" "os" "path" "sort" "strings" "sync" "time" "go-micro.dev/v4/cmd" "go-micro.dev/v4/logger" "go-micro.dev/v4/registry" hash "github.com/mitchellh/hashstructure" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) var ( prefix = "/micro/registry/" ) type etcdRegistry struct { client *clientv3.Client options registry.Options sync.RWMutex register map[string]uint64 leases map[string]clientv3.LeaseID } func init() { cmd.DefaultRegistries["etcd"] = NewRegistry } func NewRegistry(opts ...registry.Option) registry.Registry { e := &etcdRegistry{ options: registry.Options{}, register: make(map[string]uint64), leases: make(map[string]clientv3.LeaseID), } username, password := os.Getenv("ETCD_USERNAME"), os.Getenv("ETCD_PASSWORD") if len(username) > 0 && len(password) > 0 { opts = append(opts, Auth(username, password)) } address := os.Getenv("MICRO_REGISTRY_ADDRESS") if len(address) > 0 { opts = append(opts, registry.Addrs(address)) } configure(e, opts...) return e } func configure(e *etcdRegistry, opts ...registry.Option) error { config := clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, } for _, o := range opts { o(&e.options) } if e.options.Timeout == 0 { e.options.Timeout = 5 * time.Second } config.DialTimeout = e.options.Timeout if e.options.Secure || e.options.TLSConfig != nil { tlsConfig := e.options.TLSConfig if tlsConfig == nil { tlsConfig = &tls.Config{ InsecureSkipVerify: true, } } config.TLS = tlsConfig } if e.options.Context != nil { u, ok := e.options.Context.Value(authKey{}).(*authCreds) if ok { config.Username = u.Username config.Password = u.Password } cfg, ok := e.options.Context.Value(logConfigKey{}).(*zap.Config) if ok && cfg != nil { config.LogConfig = cfg } } var cAddrs []string for _, address := range e.options.Addrs { if len(address) == 0 { continue } addr, port, err := net.SplitHostPort(address) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { port = "2379" addr = address cAddrs = append(cAddrs, net.JoinHostPort(addr, port)) } else if err == nil { cAddrs = append(cAddrs, net.JoinHostPort(addr, port)) } } // if we got addrs then we'll update if len(cAddrs) > 0 { config.Endpoints = cAddrs } cli, err := clientv3.New(config) if err != nil { return err } e.client = cli return nil } func encode(s *registry.Service) string { b, _ := json.Marshal(s) return string(b) } func decode(ds []byte) *registry.Service { var s *registry.Service json.Unmarshal(ds, &s) return s } func nodePath(s, id string) string { service := strings.Replace(s, "/", "-", -1) node := strings.Replace(id, "/", "-", -1) return path.Join(prefix, service, node) } func servicePath(s string) string { return path.Join(prefix, strings.Replace(s, "/", "-", -1)) } func (e *etcdRegistry) Init(opts ...registry.Option) error { return configure(e, opts...) } func (e *etcdRegistry) Options() registry.Options { return e.options } func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } // check existing lease cache e.RLock() leaseID, ok := e.leases[s.Name+node.Id] e.RUnlock() if !ok { // missing lease, check if the key exists ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() // look for the existing key rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable()) if err != nil { return err } // get the existing lease for _, kv := range rsp.Kvs { if kv.Lease > 0 { leaseID = clientv3.LeaseID(kv.Lease) // decode the existing node srv := decode(kv.Value) if srv == nil || len(srv.Nodes) == 0 { continue } // create hash of service; uint64 h, err := hash.Hash(srv.Nodes[0], nil) if err != nil { continue } // save the info e.Lock() e.leases[s.Name+node.Id] = leaseID e.register[s.Name+node.Id] = h e.Unlock() break } } } var leaseNotFound bool // renew the lease if it exists if leaseID > 0 { if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID) } if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { if err != rpctypes.ErrLeaseNotFound { return err } if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Lease not found for %s %d", s.Name, leaseID) } // lease not found do register leaseNotFound = true } } // create hash of service; uint64 h, err := hash.Hash(node, nil) if err != nil { return err } // get existing hash for the service node e.Lock() v, ok := e.register[s.Name+node.Id] e.Unlock() // the service is unchanged, skip registering if ok && v == h && !leaseNotFound { if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id) } return nil } service := ®istry.Service{ Name: s.Name, Version: s.Version, Metadata: s.Metadata, Endpoints: s.Endpoints, Nodes: []*registry.Node{node}, } var options registry.RegisterOptions for _, o := range opts { o(&options) } ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() var lgr *clientv3.LeaseGrantResponse if options.TTL.Seconds() > 0 { // get a lease used to expire keys since we have a ttl lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds())) if err != nil { return err } } if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL) } // create an entry for the node if lgr != nil { _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) } else { _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) } if err != nil { return err } e.Lock() // save our hash of the service e.register[s.Name+node.Id] = h // save our leaseID of the service if lgr != nil { e.leases[s.Name+node.Id] = lgr.ID } e.Unlock() return nil } func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.DeregisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } for _, node := range s.Nodes { e.Lock() // delete our hash of the service delete(e.register, s.Name+node.Id) // delete our lease of the service delete(e.leases, s.Name+node.Id) e.Unlock() ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Deregistering %s id %s", s.Name, node.Id) } _, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) if err != nil { return err } } return nil } func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } var gerr error // register each node individually for _, node := range s.Nodes { err := e.registerNode(s, node, opts...) if err != nil { gerr = err } } return gerr } func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) { ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } if len(rsp.Kvs) == 0 { return nil, registry.ErrNotFound } serviceMap := map[string]*registry.Service{} for _, n := range rsp.Kvs { if sn := decode(n.Value); sn != nil { s, ok := serviceMap[sn.Version] if !ok { s = ®istry.Service{ Name: sn.Name, Version: sn.Version, Metadata: sn.Metadata, Endpoints: sn.Endpoints, } serviceMap[s.Version] = s } s.Nodes = append(s.Nodes, sn.Nodes...) } } services := make([]*registry.Service, 0, len(serviceMap)) for _, service := range serviceMap { services = append(services, service) } return services, nil } func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) { versions := make(map[string]*registry.Service) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } if len(rsp.Kvs) == 0 { return []*registry.Service{}, nil } for _, n := range rsp.Kvs { sn := decode(n.Value) if sn == nil { continue } v, ok := versions[sn.Name+sn.Version] if !ok { versions[sn.Name+sn.Version] = sn continue } // append to service:version nodes v.Nodes = append(v.Nodes, sn.Nodes...) } services := make([]*registry.Service, 0, len(versions)) for _, service := range versions { services = append(services, service) } // sort the services sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name }) return services, nil } func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { return newEtcdWatcher(e, e.options.Timeout, opts...) } func (e *etcdRegistry) String() string { return "etcd" }