mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-12 08:23:58 +02:00
4929a7c16e
Remove missing gRPC example from README.md (#2112) Delete docker.yml Delete Dockerfile update plugins version & remove replace (#2118) * update memory registry plugins version & remove replace * update plugins version & remove replace Co-authored-by: 申法宽 <shenfakuan@163.com> update client/grpc plugins version & remove replace (#2119) * update memory registry plugins version & remove replace * update plugins version & remove replace * update plugins/client/grpc/v3 version Co-authored-by: 申法宽 <shenfakuan@163.com> update etcd version (#2120) update mod version update update pulgin registry mod version (#2121) * update etcd version * update mod version * update fix store delete support for tls on http plugin (#2126) improve code quality (#2128) * Fix inefficient string comparison * Fix unnecessary calls to Printf * Canonicalize header key * Replace `t.Sub(time.Now())` with `time.Until` * Remove unnecessary blank (_) identifier * Remove unnecessary use of slice * Remove unnecessary comparison with bool Update README.md Update README.md remove network package update quic go mod remove indirects update etcd mod version Update registry plugins mod version (#2130) * update etcd version * update mod version * update * update etcd mod version Update README.md Update README.md Update README.md fixing etcd stack in getToken (#2145) when provide username and password, etcd will try to get auth token from server if server is unavailble, etcd client will stack in when dial timeout is set, it will return err instead of stack in Update README.md add http demo; http client can call http server; http client can call rpc server (#2149) Add etcd to default registries when plugin is loaded (#2150) Co-authored-by: Andrew Jones <andrew@gotoblink.com> Update README.md make rpcClient compatible with 32bit arm systems (#2156) On ARM, 386, and 32-bit MIPS, it is the caller's responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. Only the first word in an allocated struct can be relied upon to be 64-bit aligned. optimize the process of switching grpc error to micro error (#2158) Fix util/log/log.Infof format didn't work (#2160) Co-authored-by: Cui Gang <cuigang@yunpbx.com> fixing string field contains invalid UTF-8 issue (#2164) fix k8s api memory leak (#2166) fix http No release Broker (#2167) * Update http.go Exit before deregister is executed * Create http.go Exit before deregister is executed fix: "Solve the problem that the resources have not been fully released due to early exit" (#2168) * Update http.go Exit before deregister is executed * Create http.go Exit before deregister is executed * Solve the problem that the resources have not been fully released due to early exit * Optimize some code * Optimize some code fix service default logger (#2171) * Update http.go Exit before deregister is executed * Create http.go Exit before deregister is executed * Solve the problem that the resources have not been fully released due to early exit * Optimize some code * Optimize some code * Optimize some code * fix service default logger Update README.md get k8s pod (#2173) Update README.md fix:field (#2176) * get k8s pod * fix: filed * field Update README.md add rmq message properties (#2177) Co-authored-by: dtitov <dtitov@might24.ru> Update README.md grpc server add RegisterCheck (#2178) fix 404 bug (#2179) fix undefined: err (#2181) Add registry and config/source plugins based on nacos/v2 (#2182) * Add registry plugins implement by nacos/v2 * Add config/source plugins implement by nacos/v2 support hystrix fallback (#2183) Windows event log plugin (#2180) * add rmq message properties * eventlog start * start eventlog * windows event logger * readme * readme Co-authored-by: dtitov <dtitov@might24.ru> support etcd auth with env args (#2184) * support etcd auth with env args set default registry address with env arg instead of 127.0.0.1 * fixing MICRO_REGISTRY_ADDRESS may empty issue update mod version
427 lines
9.6 KiB
Go
427 lines
9.6 KiB
Go
// Package etcd provides an etcd service registry
|
|
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"net"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/asim/go-micro/v3/cmd"
|
|
"github.com/asim/go-micro/v3/logger"
|
|
"github.com/asim/go-micro/v3/registry"
|
|
hash "github.com/mitchellh/hashstructure"
|
|
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
|
"go.etcd.io/etcd/client/v3"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
prefix = "/micro/registry/"
|
|
)
|
|
|
|
type etcdRegistry struct {
|
|
client *clientv3.Client
|
|
options registry.Options
|
|
|
|
sync.RWMutex
|
|
register map[string]uint64
|
|
leases map[string]clientv3.LeaseID
|
|
}
|
|
|
|
func init() {
|
|
cmd.DefaultRegistries["etcd"] = NewRegistry
|
|
}
|
|
|
|
func NewRegistry(opts ...registry.Option) registry.Registry {
|
|
e := &etcdRegistry{
|
|
options: registry.Options{},
|
|
register: make(map[string]uint64),
|
|
leases: make(map[string]clientv3.LeaseID),
|
|
}
|
|
username, password := os.Getenv("ETCD_USERNAME"), os.Getenv("ETCD_PASSWORD")
|
|
if len(username) > 0 && len(password) > 0 {
|
|
opts = append(opts, Auth(username, password))
|
|
}
|
|
address := os.Getenv("MICRO_REGISTRY_ADDRESS")
|
|
if len(address) > 0 {
|
|
opts = append(opts, registry.Addrs(address))
|
|
}
|
|
configure(e, opts...)
|
|
return e
|
|
}
|
|
|
|
func configure(e *etcdRegistry, opts ...registry.Option) error {
|
|
config := clientv3.Config{
|
|
Endpoints: []string{"127.0.0.1:2379"},
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&e.options)
|
|
}
|
|
|
|
if e.options.Timeout == 0 {
|
|
e.options.Timeout = 5 * time.Second
|
|
}
|
|
config.DialTimeout = e.options.Timeout
|
|
|
|
if e.options.Secure || e.options.TLSConfig != nil {
|
|
tlsConfig := e.options.TLSConfig
|
|
if tlsConfig == nil {
|
|
tlsConfig = &tls.Config{
|
|
InsecureSkipVerify: true,
|
|
}
|
|
}
|
|
|
|
config.TLS = tlsConfig
|
|
}
|
|
|
|
if e.options.Context != nil {
|
|
u, ok := e.options.Context.Value(authKey{}).(*authCreds)
|
|
if ok {
|
|
config.Username = u.Username
|
|
config.Password = u.Password
|
|
}
|
|
cfg, ok := e.options.Context.Value(logConfigKey{}).(*zap.Config)
|
|
if ok && cfg != nil {
|
|
config.LogConfig = cfg
|
|
}
|
|
}
|
|
|
|
var cAddrs []string
|
|
|
|
for _, address := range e.options.Addrs {
|
|
if len(address) == 0 {
|
|
continue
|
|
}
|
|
addr, port, err := net.SplitHostPort(address)
|
|
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
|
port = "2379"
|
|
addr = address
|
|
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
|
|
} else if err == nil {
|
|
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
|
|
}
|
|
}
|
|
|
|
// if we got addrs then we'll update
|
|
if len(cAddrs) > 0 {
|
|
config.Endpoints = cAddrs
|
|
}
|
|
|
|
cli, err := clientv3.New(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.client = cli
|
|
return nil
|
|
}
|
|
|
|
func encode(s *registry.Service) string {
|
|
b, _ := json.Marshal(s)
|
|
return string(b)
|
|
}
|
|
|
|
func decode(ds []byte) *registry.Service {
|
|
var s *registry.Service
|
|
json.Unmarshal(ds, &s)
|
|
return s
|
|
}
|
|
|
|
func nodePath(s, id string) string {
|
|
service := strings.Replace(s, "/", "-", -1)
|
|
node := strings.Replace(id, "/", "-", -1)
|
|
return path.Join(prefix, service, node)
|
|
}
|
|
|
|
func servicePath(s string) string {
|
|
return path.Join(prefix, strings.Replace(s, "/", "-", -1))
|
|
}
|
|
|
|
func (e *etcdRegistry) Init(opts ...registry.Option) error {
|
|
return configure(e, opts...)
|
|
}
|
|
|
|
func (e *etcdRegistry) Options() registry.Options {
|
|
return e.options
|
|
}
|
|
|
|
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("Require at least one node")
|
|
}
|
|
|
|
// check existing lease cache
|
|
e.RLock()
|
|
leaseID, ok := e.leases[s.Name+node.Id]
|
|
e.RUnlock()
|
|
|
|
if !ok {
|
|
// missing lease, check if the key exists
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
// look for the existing key
|
|
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get the existing lease
|
|
for _, kv := range rsp.Kvs {
|
|
if kv.Lease > 0 {
|
|
leaseID = clientv3.LeaseID(kv.Lease)
|
|
|
|
// decode the existing node
|
|
srv := decode(kv.Value)
|
|
if srv == nil || len(srv.Nodes) == 0 {
|
|
continue
|
|
}
|
|
|
|
// create hash of service; uint64
|
|
h, err := hash.Hash(srv.Nodes[0], nil)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// save the info
|
|
e.Lock()
|
|
e.leases[s.Name+node.Id] = leaseID
|
|
e.register[s.Name+node.Id] = h
|
|
e.Unlock()
|
|
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
var leaseNotFound bool
|
|
|
|
// renew the lease if it exists
|
|
if leaseID > 0 {
|
|
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
|
logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID)
|
|
}
|
|
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
|
|
if err != rpctypes.ErrLeaseNotFound {
|
|
return err
|
|
}
|
|
|
|
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
|
logger.Tracef("Lease not found for %s %d", s.Name, leaseID)
|
|
}
|
|
// lease not found do register
|
|
leaseNotFound = true
|
|
}
|
|
}
|
|
|
|
// create hash of service; uint64
|
|
h, err := hash.Hash(node, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get existing hash for the service node
|
|
e.Lock()
|
|
v, ok := e.register[s.Name+node.Id]
|
|
e.Unlock()
|
|
|
|
// the service is unchanged, skip registering
|
|
if ok && v == h && !leaseNotFound {
|
|
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
|
logger.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
service := ®istry.Service{
|
|
Name: s.Name,
|
|
Version: s.Version,
|
|
Metadata: s.Metadata,
|
|
Endpoints: s.Endpoints,
|
|
Nodes: []*registry.Node{node},
|
|
}
|
|
|
|
var options registry.RegisterOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
var lgr *clientv3.LeaseGrantResponse
|
|
if options.TTL.Seconds() > 0 {
|
|
// get a lease used to expire keys since we have a ttl
|
|
lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
|
logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL)
|
|
}
|
|
// create an entry for the node
|
|
if lgr != nil {
|
|
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID))
|
|
} else {
|
|
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
e.Lock()
|
|
// save our hash of the service
|
|
e.register[s.Name+node.Id] = h
|
|
// save our leaseID of the service
|
|
if lgr != nil {
|
|
e.leases[s.Name+node.Id] = lgr.ID
|
|
}
|
|
e.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.DeregisterOption) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("Require at least one node")
|
|
}
|
|
|
|
for _, node := range s.Nodes {
|
|
e.Lock()
|
|
// delete our hash of the service
|
|
delete(e.register, s.Name+node.Id)
|
|
// delete our lease of the service
|
|
delete(e.leases, s.Name+node.Id)
|
|
e.Unlock()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
|
logger.Tracef("Deregistering %s id %s", s.Name, node.Id)
|
|
}
|
|
_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("Require at least one node")
|
|
}
|
|
|
|
var gerr error
|
|
|
|
// register each node individually
|
|
for _, node := range s.Nodes {
|
|
err := e.registerNode(s, node, opts...)
|
|
if err != nil {
|
|
gerr = err
|
|
}
|
|
}
|
|
|
|
return gerr
|
|
}
|
|
|
|
func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(rsp.Kvs) == 0 {
|
|
return nil, registry.ErrNotFound
|
|
}
|
|
|
|
serviceMap := map[string]*registry.Service{}
|
|
|
|
for _, n := range rsp.Kvs {
|
|
if sn := decode(n.Value); sn != nil {
|
|
s, ok := serviceMap[sn.Version]
|
|
if !ok {
|
|
s = ®istry.Service{
|
|
Name: sn.Name,
|
|
Version: sn.Version,
|
|
Metadata: sn.Metadata,
|
|
Endpoints: sn.Endpoints,
|
|
}
|
|
serviceMap[s.Version] = s
|
|
}
|
|
|
|
s.Nodes = append(s.Nodes, sn.Nodes...)
|
|
}
|
|
}
|
|
|
|
services := make([]*registry.Service, 0, len(serviceMap))
|
|
for _, service := range serviceMap {
|
|
services = append(services, service)
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
|
|
versions := make(map[string]*registry.Service)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(rsp.Kvs) == 0 {
|
|
return []*registry.Service{}, nil
|
|
}
|
|
|
|
for _, n := range rsp.Kvs {
|
|
sn := decode(n.Value)
|
|
if sn == nil {
|
|
continue
|
|
}
|
|
v, ok := versions[sn.Name+sn.Version]
|
|
if !ok {
|
|
versions[sn.Name+sn.Version] = sn
|
|
continue
|
|
}
|
|
// append to service:version nodes
|
|
v.Nodes = append(v.Nodes, sn.Nodes...)
|
|
}
|
|
|
|
services := make([]*registry.Service, 0, len(versions))
|
|
for _, service := range versions {
|
|
services = append(services, service)
|
|
}
|
|
|
|
// sort the services
|
|
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
|
|
|
|
return services, nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
return newEtcdWatcher(e, e.options.Timeout, opts...)
|
|
}
|
|
|
|
func (e *etcdRegistry) String() string {
|
|
return "etcd"
|
|
}
|