diff --git a/cmd/cmd.go b/cmd/cmd.go index 1dc00fce..dfd034a5 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -24,7 +24,7 @@ import ( // registries "github.com/myodc/go-micro/registry/consul" "github.com/myodc/go-micro/registry/etcd" - "github.com/myodc/go-micro/registry/kubernetes" + "github.com/myodc/go-micro/registry/memory" // transport thttp "github.com/myodc/go-micro/transport/http" @@ -71,7 +71,7 @@ var ( Name: "registry", EnvVar: "MICRO_REGISTRY", Value: "consul", - Usage: "Registry for discovery. kubernetes, consul, etc", + Usage: "Registry for discovery. mdns, consul, etc", }, cli.StringFlag{ Name: "registry_address", @@ -128,9 +128,9 @@ var ( } Registries = map[string]func([]string, ...registry.Option) registry.Registry{ - "kubernetes": kubernetes.NewRegistry, - "consul": consul.NewRegistry, - "etcd": etcd.NewRegistry, + "consul": consul.NewRegistry, + "etcd": etcd.NewRegistry, + "memory": memory.NewRegistry, } Transports = map[string]func([]string, ...transport.Option) transport.Transport{ @@ -141,6 +141,18 @@ var ( ) func Setup(c *cli.Context) error { + os.Args = os.Args[:1] + + flag.Set("logtostderr", fmt.Sprintf("%v", c.Bool("logtostderr"))) + flag.Set("alsologtostderr", fmt.Sprintf("%v", c.Bool("alsologtostderr"))) + flag.Set("stderrthreshold", c.String("stderrthreshold")) + flag.Set("log_backtrace_at", c.String("log_backtrace_at")) + flag.Set("log_dir", c.String("log_dir")) + flag.Set("vmodule", c.String("vmodule")) + flag.Set("v", c.String("v")) + + flag.Parse() + if b, ok := Brokers[c.String("broker")]; ok { broker.DefaultBroker = b(strings.Split(c.String("broker_address"), ",")) } @@ -173,18 +185,6 @@ func Setup(c *cli.Context) error { client.DefaultClient = client.NewClient() - os.Args = os.Args[:1] - - flag.Set("logtostderr", fmt.Sprintf("%v", c.Bool("logtostderr"))) - flag.Set("alsologtostderr", fmt.Sprintf("%v", c.Bool("alsologtostderr"))) - flag.Set("stderrthreshold", c.String("stderrthreshold")) - flag.Set("log_backtrace_at", c.String("log_backtrace_at")) - flag.Set("log_dir", c.String("log_dir")) - flag.Set("vmodule", c.String("vmodule")) - flag.Set("v", c.String("v")) - - flag.Parse() - return nil } diff --git a/registry/kubernetes/kubernetes.go b/registry/kubernetes/kubernetes.go deleted file mode 100644 index f3fba084..00000000 --- a/registry/kubernetes/kubernetes.go +++ /dev/null @@ -1,117 +0,0 @@ -package kubernetes - -import ( - "fmt" - "os" - "sync" - - "github.com/myodc/go-micro/registry" - - k8s "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/labels" -) - -type kregistry struct { - client *k8s.Client - namespace string - - mtx sync.RWMutex - services map[string]*registry.Service -} - -func (c *kregistry) Deregister(s *registry.Service) error { - return nil -} - -func (c *kregistry) Register(s *registry.Service) error { - return nil -} - -func (c *kregistry) GetService(name string) (*registry.Service, error) { - c.mtx.RLock() - svc, ok := c.services[name] - c.mtx.RUnlock() - - if ok { - return svc, nil - } - - selector := labels.SelectorFromSet(labels.Set{"name": name}) - - services, err := c.client.Services(c.namespace).List(selector) - if err != nil { - return nil, err - } - - if len(services.Items) == 0 { - return nil, fmt.Errorf("Service not found") - } - - ks := ®istry.Service{ - Name: name, - } - - for _, item := range services.Items { - ks.Nodes = append(ks.Nodes, ®istry.Node{ - Address: item.Spec.ClusterIP, - Port: item.Spec.Ports[0].Port, - }) - } - - return ks, nil -} - -func (c *kregistry) ListServices() ([]*registry.Service, error) { - c.mtx.RLock() - serviceMap := c.services - c.mtx.RUnlock() - - var services []*registry.Service - - if len(serviceMap) > 0 { - for _, service := range serviceMap { - services = append(services, service) - } - return services, nil - } - - rsp, err := c.client.Services(c.namespace).List(labels.Everything()) - if err != nil { - return nil, err - } - - for _, svc := range rsp.Items { - if len(svc.ObjectMeta.Labels["name"]) == 0 { - continue - } - - services = append(services, ®istry.Service{ - Name: svc.ObjectMeta.Labels["name"], - }) - } - - return services, nil -} - -func (c *kregistry) Watch() (registry.Watcher, error) { - return newWatcher(c) -} - -func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { - host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") - if len(addrs) > 0 { - host = addrs[0] - } - - client, _ := k8s.New(&k8s.Config{ - Host: host, - }) - - kr := &kregistry{ - client: client, - namespace: "default", - services: make(map[string]*registry.Service), - } - - return kr -} diff --git a/registry/kubernetes/watcher.go b/registry/kubernetes/watcher.go deleted file mode 100644 index cbb4a158..00000000 --- a/registry/kubernetes/watcher.go +++ /dev/null @@ -1,91 +0,0 @@ -package kubernetes - -import ( - "net" - - "github.com/myodc/go-micro/registry" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/watch" -) - -type watcher struct { - registry *kregistry - watcher watch.Interface -} - -func (k *watcher) update(event watch.Event) { - if event.Object == nil { - return - } - - var service *api.Service - switch obj := event.Object.(type) { - case *api.Service: - service = obj - default: - return - } - - name, exists := service.ObjectMeta.Labels["name"] - if !exists { - return - } - - switch event.Type { - case watch.Added, watch.Modified: - case watch.Deleted: - k.registry.mtx.Lock() - delete(k.registry.services, name) - k.registry.mtx.Unlock() - return - default: - return - } - - serviceIP := net.ParseIP(service.Spec.ClusterIP) - - k.registry.mtx.Lock() - k.registry.services[name] = ®istry.Service{ - Name: name, - Nodes: []*registry.Node{ - ®istry.Node{ - Address: serviceIP.String(), - Port: service.Spec.Ports[0].Port, - }, - }, - } - k.registry.mtx.Unlock() -} - -func (k *watcher) Stop() { - k.watcher.Stop() -} - -func newWatcher(kr *kregistry) (registry.Watcher, error) { - svi := kr.client.Services(api.NamespaceAll) - - services, err := svi.List(labels.Everything()) - if err != nil { - return nil, err - } - - watch, err := svi.Watch(labels.Everything(), fields.Everything(), services.ResourceVersion) - if err != nil { - return nil, err - } - - w := &watcher{ - registry: kr, - watcher: watch, - } - - go func() { - for event := range watch.ResultChan() { - w.update(event) - } - }() - - return w, nil -} diff --git a/registry/memory/memory.go b/registry/memory/memory.go new file mode 100644 index 00000000..9867dbbf --- /dev/null +++ b/registry/memory/memory.go @@ -0,0 +1,277 @@ +package memory + +import ( + "encoding/json" + "fmt" + "os" + "sync" + + log "github.com/golang/glog" + "github.com/hashicorp/memberlist" + "github.com/myodc/go-micro/registry" + "github.com/pborman/uuid" +) + +type action int + +const ( + addAction action = iota + delAction + syncAction +) + +type broadcast struct { + msg []byte + notify chan<- struct{} +} + +type delegate struct { + broadcasts *memberlist.TransmitLimitedQueue + updates chan *update +} + +type memoryRegistry struct { + broadcasts *memberlist.TransmitLimitedQueue + updates chan *update + + sync.RWMutex + services map[string]*registry.Service +} + +type update struct { + Action action + Service *registry.Service + sync chan *registry.Service +} + +type watcher struct{} + +func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { + return false +} + +func (b *broadcast) Message() []byte { + return b.msg +} + +func (b *broadcast) Finished() { + if b.notify != nil { + close(b.notify) + } +} + +func (d *delegate) NodeMeta(limit int) []byte { + return []byte{} +} + +func (d *delegate) NotifyMsg(b []byte) { + if len(b) == 0 { + return + } + + buf := make([]byte, len(b)) + copy(buf, b) + + go func() { + switch buf[0] { + case 'd': // data + var updates []*update + if err := json.Unmarshal(buf[1:], &updates); err != nil { + return + } + for _, u := range updates { + d.updates <- u + } + } + }() +} + +func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { + return d.broadcasts.GetBroadcasts(overhead, limit) +} + +func (d *delegate) LocalState(join bool) []byte { + if !join { + return []byte{} + } + + syncCh := make(chan *registry.Service, 1) + m := map[string]*registry.Service{} + + d.updates <- &update{ + Action: syncAction, + sync: syncCh, + } + + for s := range syncCh { + m[s.Name] = s + } + + b, _ := json.Marshal(m) + return b +} + +func (d *delegate) MergeRemoteState(buf []byte, join bool) { + if len(buf) == 0 { + return + } + if !join { + return + } + + var m map[string]*registry.Service + if err := json.Unmarshal(buf, &m); err != nil { + return + } + + for _, service := range m { + d.updates <- &update{ + Action: addAction, + Service: service, + sync: nil, + } + } +} + +func (m *memoryRegistry) run() { + for u := range m.updates { + switch u.Action { + case addAction: + m.Lock() + m.services[u.Service.Name] = u.Service + m.Unlock() + case delAction: + m.Lock() + delete(m.services, u.Service.Name) + m.Unlock() + case syncAction: + if u.sync == nil { + continue + } + m.RLock() + for _, service := range m.services { + u.sync <- service + } + m.RUnlock() + close(u.sync) + } + } +} + +func (m *memoryRegistry) Register(s *registry.Service) error { + m.Lock() + m.services[s.Name] = s + m.Unlock() + + b, _ := json.Marshal([]*update{ + &update{ + Action: addAction, + Service: s, + }, + }) + + m.broadcasts.QueueBroadcast(&broadcast{ + msg: append([]byte("d"), b...), + notify: nil, + }) + + return nil +} + +func (m *memoryRegistry) Deregister(s *registry.Service) error { + m.Lock() + delete(m.services, s.Name) + m.Unlock() + + b, _ := json.Marshal([]*update{ + &update{ + Action: delAction, + Service: s, + }, + }) + + m.broadcasts.QueueBroadcast(&broadcast{ + msg: append([]byte("d"), b...), + notify: nil, + }) + + return nil +} + +func (m *memoryRegistry) GetService(name string) (*registry.Service, error) { + m.RLock() + service, ok := m.services[name] + m.RUnlock() + if !ok { + return nil, fmt.Errorf("Service %s not found", name) + } + return service, nil +} + +func (m *memoryRegistry) ListServices() ([]*registry.Service, error) { + var services []*registry.Service + m.RLock() + for _, service := range m.services { + services = append(services, service) + } + m.RUnlock() + return services, nil +} + +func (m *memoryRegistry) Watch() (registry.Watcher, error) { + return &watcher{}, nil +} + +func (w *watcher) Stop() { + return +} + +func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { + cAddrs := []string{} + hostname, _ := os.Hostname() + updates := make(chan *update, 100) + + for _, addr := range addrs { + if len(addr) > 0 { + cAddrs = append(cAddrs, addr) + } + } + + broadcasts := &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(cAddrs) + }, + RetransmitMult: 3, + } + + mr := &memoryRegistry{ + broadcasts: broadcasts, + services: make(map[string]*registry.Service), + updates: updates, + } + + go mr.run() + + c := memberlist.DefaultLocalConfig() + c.BindPort = 0 + c.Name = hostname + "-" + uuid.NewUUID().String() + c.Delegate = &delegate{ + updates: updates, + broadcasts: broadcasts, + } + + m, err := memberlist.Create(c) + if err != nil { + log.Fatalf("Error creating memberlist: %v", err) + } + + if len(cAddrs) > 0 { + _, err := m.Join(cAddrs) + if err != nil { + log.Fatalf("Error joining members: %v", err) + } + } + + log.Infof("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port) + return mr +}