1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-08-04 21:42:57 +02:00

fix: add more

This commit is contained in:
Brian Ketelsen
2025-05-07 20:43:54 -04:00
parent c189a81196
commit 6ec68af100
29 changed files with 3441 additions and 42 deletions

418
registry/etcd/etcd.go Normal file
View File

@ -0,0 +1,418 @@
// Package etcd provides an etcd service registry
package etcd
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"net"
"os"
"path"
"sort"
"strings"
"sync"
"time"
hash "github.com/mitchellh/hashstructure"
"go-micro.dev/v5/logger"
"go-micro.dev/v5/registry"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
var (
prefix = "/micro/registry/"
)
type etcdRegistry struct {
client *clientv3.Client
options registry.Options
sync.RWMutex
register map[string]uint64
leases map[string]clientv3.LeaseID
}
func NewEtcdRegistry(opts ...registry.Option) registry.Registry {
e := &etcdRegistry{
options: registry.Options{},
register: make(map[string]uint64),
leases: make(map[string]clientv3.LeaseID),
}
username, password := os.Getenv("ETCD_USERNAME"), os.Getenv("ETCD_PASSWORD")
if len(username) > 0 && len(password) > 0 {
opts = append(opts, Auth(username, password))
}
address := os.Getenv("MICRO_REGISTRY_ADDRESS")
if len(address) > 0 {
opts = append(opts, registry.Addrs(address))
}
configure(e, opts...)
return e
}
func configure(e *etcdRegistry, opts ...registry.Option) error {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
}
for _, o := range opts {
o(&e.options)
}
if e.options.Timeout == 0 {
e.options.Timeout = 5 * time.Second
}
if e.options.Logger == nil {
e.options.Logger = logger.DefaultLogger
}
config.DialTimeout = e.options.Timeout
if e.options.Secure || e.options.TLSConfig != nil {
tlsConfig := e.options.TLSConfig
if tlsConfig == nil {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
config.TLS = tlsConfig
}
if e.options.Context != nil {
u, ok := e.options.Context.Value(authKey{}).(*authCreds)
if ok {
config.Username = u.Username
config.Password = u.Password
}
cfg, ok := e.options.Context.Value(logConfigKey{}).(*zap.Config)
if ok && cfg != nil {
config.LogConfig = cfg
}
}
var cAddrs []string
for _, address := range e.options.Addrs {
if len(address) == 0 {
continue
}
addr, port, err := net.SplitHostPort(address)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "2379"
addr = address
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
} else if err == nil {
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
}
}
// if we got addrs then we'll update
if len(cAddrs) > 0 {
config.Endpoints = cAddrs
}
cli, err := clientv3.New(config)
if err != nil {
return err
}
e.client = cli
return nil
}
func encode(s *registry.Service) string {
b, _ := json.Marshal(s)
return string(b)
}
func decode(ds []byte) *registry.Service {
var s *registry.Service
json.Unmarshal(ds, &s)
return s
}
func nodePath(s, id string) string {
service := strings.Replace(s, "/", "-", -1)
node := strings.Replace(id, "/", "-", -1)
return path.Join(prefix, service, node)
}
func servicePath(s string) string {
return path.Join(prefix, strings.Replace(s, "/", "-", -1))
}
func (e *etcdRegistry) Init(opts ...registry.Option) error {
return configure(e, opts...)
}
func (e *etcdRegistry) Options() registry.Options {
return e.options
}
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
// check existing lease cache
e.RLock()
leaseID, ok := e.leases[s.Name+node.Id]
e.RUnlock()
log := e.options.Logger
if !ok {
// missing lease, check if the key exists
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
// look for the existing key
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable())
if err != nil {
return err
}
// get the existing lease
for _, kv := range rsp.Kvs {
if kv.Lease > 0 {
leaseID = clientv3.LeaseID(kv.Lease)
// decode the existing node
srv := decode(kv.Value)
if srv == nil || len(srv.Nodes) == 0 {
continue
}
// create hash of service; uint64
h, err := hash.Hash(srv.Nodes[0], nil)
if err != nil {
continue
}
// save the info
e.Lock()
e.leases[s.Name+node.Id] = leaseID
e.register[s.Name+node.Id] = h
e.Unlock()
break
}
}
}
var leaseNotFound bool
// renew the lease if it exists
if leaseID > 0 {
log.Logf(logger.TraceLevel, "Renewing existing lease for %s %d", s.Name, leaseID)
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
if err != rpctypes.ErrLeaseNotFound {
return err
}
log.Logf(logger.TraceLevel, "Lease not found for %s %d", s.Name, leaseID)
// lease not found do register
leaseNotFound = true
}
}
// create hash of service; uint64
h, err := hash.Hash(node, nil)
if err != nil {
return err
}
// get existing hash for the service node
e.Lock()
v, ok := e.register[s.Name+node.Id]
e.Unlock()
// the service is unchanged, skip registering
if ok && v == h && !leaseNotFound {
log.Logf(logger.TraceLevel, "Service %s node %s unchanged skipping registration", s.Name, node.Id)
return nil
}
service := &registry.Service{
Name: s.Name,
Version: s.Version,
Metadata: s.Metadata,
Endpoints: s.Endpoints,
Nodes: []*registry.Node{node},
}
var options registry.RegisterOptions
for _, o := range opts {
o(&options)
}
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
var lgr *clientv3.LeaseGrantResponse
if options.TTL.Seconds() > 0 {
// get a lease used to expire keys since we have a ttl
lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds()))
if err != nil {
return err
}
}
log.Logf(logger.TraceLevel, "Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL)
// create an entry for the node
if lgr != nil {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID))
} else {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
}
if err != nil {
return err
}
e.Lock()
// save our hash of the service
e.register[s.Name+node.Id] = h
// save our leaseID of the service
if lgr != nil {
e.leases[s.Name+node.Id] = lgr.ID
}
e.Unlock()
return nil
}
func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.DeregisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
for _, node := range s.Nodes {
e.Lock()
// delete our hash of the service
delete(e.register, s.Name+node.Id)
// delete our lease of the service
delete(e.leases, s.Name+node.Id)
e.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
e.options.Logger.Logf(logger.TraceLevel, "Deregistering %s id %s", s.Name, node.Id)
_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id))
if err != nil {
return err
}
}
return nil
}
func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
var gerr error
// register each node individually
for _, node := range s.Nodes {
err := e.registerNode(s, node, opts...)
if err != nil {
gerr = err
}
}
return gerr
}
func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
if len(rsp.Kvs) == 0 {
return nil, registry.ErrNotFound
}
serviceMap := map[string]*registry.Service{}
for _, n := range rsp.Kvs {
if sn := decode(n.Value); sn != nil {
s, ok := serviceMap[sn.Version]
if !ok {
s = &registry.Service{
Name: sn.Name,
Version: sn.Version,
Metadata: sn.Metadata,
Endpoints: sn.Endpoints,
}
serviceMap[s.Version] = s
}
s.Nodes = append(s.Nodes, sn.Nodes...)
}
}
services := make([]*registry.Service, 0, len(serviceMap))
for _, service := range serviceMap {
services = append(services, service)
}
return services, nil
}
func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
versions := make(map[string]*registry.Service)
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
if len(rsp.Kvs) == 0 {
return []*registry.Service{}, nil
}
for _, n := range rsp.Kvs {
sn := decode(n.Value)
if sn == nil {
continue
}
v, ok := versions[sn.Name+sn.Version]
if !ok {
versions[sn.Name+sn.Version] = sn
continue
}
// append to service:version nodes
v.Nodes = append(v.Nodes, sn.Nodes...)
}
services := make([]*registry.Service, 0, len(versions))
for _, service := range versions {
services = append(services, service)
}
// sort the services
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
return services, nil
}
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
return newEtcdWatcher(e, e.options.Timeout, opts...)
}
func (e *etcdRegistry) String() string {
return "etcd"
}

37
registry/etcd/options.go Normal file
View File

@ -0,0 +1,37 @@
package etcd
import (
"context"
"go-micro.dev/v5/registry"
"go.uber.org/zap"
)
type authKey struct{}
type logConfigKey struct{}
type authCreds struct {
Username string
Password string
}
// Auth allows you to specify username/password.
func Auth(username, password string) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password})
}
}
// LogConfig allows you to set etcd log config.
func LogConfig(config *zap.Config) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, logConfigKey{}, config)
}
}

91
registry/etcd/watcher.go Normal file
View File

@ -0,0 +1,91 @@
package etcd
import (
"context"
"errors"
"time"
"go-micro.dev/v5/registry"
clientv3 "go.etcd.io/etcd/client/v3"
)
type etcdWatcher struct {
stop chan bool
w clientv3.WatchChan
client *clientv3.Client
timeout time.Duration
}
func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
ctx, cancel := context.WithCancel(context.Background())
stop := make(chan bool, 1)
go func() {
<-stop
cancel()
}()
watchPath := prefix
if len(wo.Service) > 0 {
watchPath = servicePath(wo.Service) + "/"
}
return &etcdWatcher{
stop: stop,
w: r.client.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV()),
client: r.client,
timeout: timeout,
}, nil
}
func (ew *etcdWatcher) Next() (*registry.Result, error) {
for wresp := range ew.w {
if wresp.Err() != nil {
return nil, wresp.Err()
}
if wresp.Canceled {
return nil, errors.New("could not get next")
}
for _, ev := range wresp.Events {
service := decode(ev.Kv.Value)
var action string
switch ev.Type {
case clientv3.EventTypePut:
if ev.IsCreate() {
action = "create"
} else if ev.IsModify() {
action = "update"
}
case clientv3.EventTypeDelete:
action = "delete"
// get service from prevKv
service = decode(ev.PrevKv.Value)
}
if service == nil {
continue
}
return &registry.Result{
Action: action,
Service: service,
}, nil
}
}
return nil, errors.New("could not get next")
}
func (ew *etcdWatcher) Stop() {
select {
case <-ew.stop:
return
default:
close(ew.stop)
}
}