From cf67d460b761109e22d9e5755b1d6d77d5ad305e Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 12 Apr 2020 11:01:09 +0100 Subject: [PATCH] strip down mdns watcher --- registry/mdns_registry.go | 79 ++++++++++++++++++ registry/mdns_test.go | 142 ++++++++++++++++++++++++++++++++ registry/mdns_watcher.go | 87 -------------------- registry/mdns_watcher_test.go | 149 ---------------------------------- 4 files changed, 221 insertions(+), 236 deletions(-) delete mode 100644 registry/mdns_watcher.go delete mode 100644 registry/mdns_watcher_test.go diff --git a/registry/mdns_registry.go b/registry/mdns_registry.go index dfc8e6f0..18758878 100644 --- a/registry/mdns_registry.go +++ b/registry/mdns_registry.go @@ -54,6 +54,17 @@ type mdnsRegistry struct { listener chan *mdns.ServiceEntry } +type mdnsWatcher struct { + id string + wo WatchOptions + ch chan *mdns.ServiceEntry + exit chan struct{} + // the mdns domain + domain string + // the registry + registry *mdnsRegistry +} + func encode(txt *mdnsTxt) ([]string, error) { b, err := json.Marshal(txt) if err != nil { @@ -534,6 +545,74 @@ func (m *mdnsRegistry) String() string { return "mdns" } +func (m *mdnsWatcher) Next() (*Result, error) { + for { + select { + case e := <-m.ch: + txt, err := decode(e.InfoFields) + if err != nil { + continue + } + + if len(txt.Service) == 0 || len(txt.Version) == 0 { + continue + } + + // Filter watch options + // wo.Service: Only keep services we care about + if len(m.wo.Service) > 0 && txt.Service != m.wo.Service { + continue + } + + var action string + + if e.TTL == 0 { + action = "delete" + } else { + action = "create" + } + + service := &Service{ + Name: txt.Service, + Version: txt.Version, + Endpoints: txt.Endpoints, + } + + // skip anything without the domain we care about + suffix := fmt.Sprintf(".%s.%s.", service.Name, m.domain) + if !strings.HasSuffix(e.Name, suffix) { + continue + } + + service.Nodes = append(service.Nodes, &Node{ + Id: strings.TrimSuffix(e.Name, suffix), + Address: fmt.Sprintf("%s:%d", e.AddrV4.String(), e.Port), + Metadata: txt.Metadata, + }) + + return &Result{ + Action: action, + Service: service, + }, nil + case <-m.exit: + return nil, ErrWatcherStopped + } + } +} + +func (m *mdnsWatcher) Stop() { + select { + case <-m.exit: + return + default: + close(m.exit) + // remove self from the registry + m.registry.mtx.Lock() + delete(m.registry.watchers, m.id) + m.registry.mtx.Unlock() + } +} + // NewRegistry returns a new default registry which is mdns func NewRegistry(opts ...Option) Registry { return newRegistry(opts...) diff --git a/registry/mdns_test.go b/registry/mdns_test.go index c9979d5c..5f5cc617 100644 --- a/registry/mdns_test.go +++ b/registry/mdns_test.go @@ -197,3 +197,145 @@ func TestEncoding(t *testing.T) { } } + +func TestWatcher(t *testing.T) { + if travis := os.Getenv("TRAVIS"); travis == "true" { + t.Skip() + } + + testData := []*Service{ + { + Name: "test1", + Version: "1.0.1", + Nodes: []*Node{ + { + Id: "test1-1", + Address: "10.0.0.1:10001", + Metadata: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + { + Name: "test2", + Version: "1.0.2", + Nodes: []*Node{ + { + Id: "test2-1", + Address: "10.0.0.2:10002", + Metadata: map[string]string{ + "foo2": "bar2", + }, + }, + }, + }, + { + Name: "test3", + Version: "1.0.3", + Nodes: []*Node{ + { + Id: "test3-1", + Address: "10.0.0.3:10003", + Metadata: map[string]string{ + "foo3": "bar3", + }, + }, + }, + }, + } + + testFn := func(service, s *Service) { + if s == nil { + t.Fatalf("Expected one result for %s got nil", service.Name) + + } + + if s.Name != service.Name { + t.Fatalf("Expected name %s got %s", service.Name, s.Name) + } + + if s.Version != service.Version { + t.Fatalf("Expected version %s got %s", service.Version, s.Version) + } + + if len(s.Nodes) != 1 { + t.Fatalf("Expected 1 node, got %d", len(s.Nodes)) + } + + node := s.Nodes[0] + + if node.Id != service.Nodes[0].Id { + t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id) + } + + if node.Address != service.Nodes[0].Address { + t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address) + } + } + + travis := os.Getenv("TRAVIS") + + var opts []Option + + if travis == "true" { + opts = append(opts, Timeout(time.Millisecond*100)) + } + + // new registry + r := NewRegistry(opts...) + + w, err := r.Watch() + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + for _, service := range testData { + // register service + if err := r.Register(service); err != nil { + t.Fatal(err) + } + + for { + res, err := w.Next() + if err != nil { + t.Fatal(err) + } + + if res.Service.Name != service.Name { + continue + } + + if res.Action != "create" { + t.Fatalf("Expected create event got %s for %s", res.Action, res.Service.Name) + } + + testFn(service, res.Service) + break + } + + // deregister + if err := r.Deregister(service); err != nil { + t.Fatal(err) + } + + for { + res, err := w.Next() + if err != nil { + t.Fatal(err) + } + + if res.Service.Name != service.Name { + continue + } + + if res.Action != "delete" { + continue + } + + testFn(service, res.Service) + break + } + } +} diff --git a/registry/mdns_watcher.go b/registry/mdns_watcher.go deleted file mode 100644 index e0ef4a48..00000000 --- a/registry/mdns_watcher.go +++ /dev/null @@ -1,87 +0,0 @@ -package registry - -import ( - "fmt" - "strings" - - "github.com/micro/go-micro/v2/util/mdns" -) - -type mdnsWatcher struct { - id string - wo WatchOptions - ch chan *mdns.ServiceEntry - exit chan struct{} - // the mdns domain - domain string - // the registry - registry *mdnsRegistry -} - -func (m *mdnsWatcher) Next() (*Result, error) { - for { - select { - case e := <-m.ch: - txt, err := decode(e.InfoFields) - if err != nil { - continue - } - - if len(txt.Service) == 0 || len(txt.Version) == 0 { - continue - } - - // Filter watch options - // wo.Service: Only keep services we care about - if len(m.wo.Service) > 0 && txt.Service != m.wo.Service { - continue - } - - var action string - - if e.TTL == 0 { - action = "delete" - } else { - action = "create" - } - - service := &Service{ - Name: txt.Service, - Version: txt.Version, - Endpoints: txt.Endpoints, - } - - // skip anything without the domain we care about - suffix := fmt.Sprintf(".%s.%s.", service.Name, m.domain) - if !strings.HasSuffix(e.Name, suffix) { - continue - } - - service.Nodes = append(service.Nodes, &Node{ - Id: strings.TrimSuffix(e.Name, suffix), - Address: fmt.Sprintf("%s:%d", e.AddrV4.String(), e.Port), - Metadata: txt.Metadata, - }) - - return &Result{ - Action: action, - Service: service, - }, nil - case <-m.exit: - return nil, ErrWatcherStopped - } - } -} - -func (m *mdnsWatcher) Stop() { - select { - case <-m.exit: - return - default: - close(m.exit) - // remove self from the registry - m.registry.mtx.Lock() - delete(m.registry.watchers, m.id) - m.registry.mtx.Unlock() - } -} diff --git a/registry/mdns_watcher_test.go b/registry/mdns_watcher_test.go deleted file mode 100644 index bd837c58..00000000 --- a/registry/mdns_watcher_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package registry - -import ( - "os" - "testing" - "time" -) - -func TestWatcher(t *testing.T) { - if travis := os.Getenv("TRAVIS"); travis == "true" { - t.Skip() - } - - testData := []*Service{ - { - Name: "test1", - Version: "1.0.1", - Nodes: []*Node{ - { - Id: "test1-1", - Address: "10.0.0.1:10001", - Metadata: map[string]string{ - "foo": "bar", - }, - }, - }, - }, - { - Name: "test2", - Version: "1.0.2", - Nodes: []*Node{ - { - Id: "test2-1", - Address: "10.0.0.2:10002", - Metadata: map[string]string{ - "foo2": "bar2", - }, - }, - }, - }, - { - Name: "test3", - Version: "1.0.3", - Nodes: []*Node{ - { - Id: "test3-1", - Address: "10.0.0.3:10003", - Metadata: map[string]string{ - "foo3": "bar3", - }, - }, - }, - }, - } - - testFn := func(service, s *Service) { - if s == nil { - t.Fatalf("Expected one result for %s got nil", service.Name) - - } - - if s.Name != service.Name { - t.Fatalf("Expected name %s got %s", service.Name, s.Name) - } - - if s.Version != service.Version { - t.Fatalf("Expected version %s got %s", service.Version, s.Version) - } - - if len(s.Nodes) != 1 { - t.Fatalf("Expected 1 node, got %d", len(s.Nodes)) - } - - node := s.Nodes[0] - - if node.Id != service.Nodes[0].Id { - t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id) - } - - if node.Address != service.Nodes[0].Address { - t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address) - } - } - - travis := os.Getenv("TRAVIS") - - var opts []Option - - if travis == "true" { - opts = append(opts, Timeout(time.Millisecond*100)) - } - - // new registry - r := NewRegistry(opts...) - - w, err := r.Watch() - if err != nil { - t.Fatal(err) - } - defer w.Stop() - - for _, service := range testData { - // register service - if err := r.Register(service); err != nil { - t.Fatal(err) - } - - for { - res, err := w.Next() - if err != nil { - t.Fatal(err) - } - - if res.Service.Name != service.Name { - continue - } - - if res.Action != "create" { - t.Fatalf("Expected create event got %s for %s", res.Action, res.Service.Name) - } - - testFn(service, res.Service) - break - } - - // deregister - if err := r.Deregister(service); err != nil { - t.Fatal(err) - } - - for { - res, err := w.Next() - if err != nil { - t.Fatal(err) - } - - if res.Service.Name != service.Name { - continue - } - - if res.Action != "delete" { - continue - } - - testFn(service, res.Service) - break - } - } -}