1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-08-10 21:52:01 +02:00

Implement MDNS Registry (#2767)

- Removed existing mDNS test file and replaced it with a new implementation.
- Added a new `mdns_registry.go` file that contains the MDNS registry logic.
- Updated the default registry to use the new MDNS registry.
- Refactored tests to accommodate the new MDNS registry implementation.
- Implemented service registration, deregistration, and service discovery using mDNS.
- Added encoding and decoding functions for mDNS TXT records.
- Implemented a watcher for monitoring service changes in the MDNS registry.
This commit is contained in:
BombartSimon
2025-05-16 20:03:36 +02:00
committed by GitHub
parent 97275d3db9
commit 1fe2638298
5 changed files with 49 additions and 57 deletions

View File

@@ -25,7 +25,6 @@ import (
"go-micro.dev/v5/registry" "go-micro.dev/v5/registry"
"go-micro.dev/v5/registry/consul" "go-micro.dev/v5/registry/consul"
"go-micro.dev/v5/registry/etcd" "go-micro.dev/v5/registry/etcd"
"go-micro.dev/v5/registry/mdns"
"go-micro.dev/v5/registry/nats" "go-micro.dev/v5/registry/nats"
"go-micro.dev/v5/selector" "go-micro.dev/v5/selector"
"go-micro.dev/v5/server" "go-micro.dev/v5/server"
@@ -249,7 +248,7 @@ var (
"consul": consul.NewConsulRegistry, "consul": consul.NewConsulRegistry,
"memory": registry.NewMemoryRegistry, "memory": registry.NewMemoryRegistry,
"nats": nats.NewNatsRegistry, "nats": nats.NewNatsRegistry,
"mdns": mdns.NewMDNSRegistry, "mdns": registry.NewMDNSRegistry,
"etcd": etcd.NewEtcdRegistry, "etcd": etcd.NewEtcdRegistry,
} }

View File

@@ -1,5 +0,0 @@
package mdns
var (
DefaultRegistry = NewMDNSRegistry()
)

View File

@@ -1,5 +1,5 @@
// Package mdns is a multicast dns registry // Package mdns is a multicast dns registry
package mdns package registry
import ( import (
"bytes" "bytes"
@@ -17,7 +17,6 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
log "go-micro.dev/v5/logger" log "go-micro.dev/v5/logger"
"go-micro.dev/v5/registry"
"go-micro.dev/v5/util/mdns" "go-micro.dev/v5/util/mdns"
) )
@@ -30,7 +29,7 @@ type mdnsTxt struct {
Metadata map[string]string Metadata map[string]string
Service string Service string
Version string Version string
Endpoints []*registry.Endpoint Endpoints []*Endpoint
} }
type mdnsEntry struct { type mdnsEntry struct {
@@ -39,7 +38,7 @@ type mdnsEntry struct {
} }
type mdnsRegistry struct { type mdnsRegistry struct {
opts *registry.Options opts *Options
services map[string][]*mdnsEntry services map[string][]*mdnsEntry
// watchers // watchers
@@ -56,7 +55,7 @@ type mdnsRegistry struct {
} }
type mdnsWatcher struct { type mdnsWatcher struct {
wo registry.WatchOptions wo WatchOptions
ch chan *mdns.ServiceEntry ch chan *mdns.ServiceEntry
exit chan struct{} exit chan struct{}
// the registry // the registry
@@ -128,9 +127,9 @@ func decode(record []string) (*mdnsTxt, error) {
return txt, nil return txt, nil
} }
func newRegistry(opts ...registry.Option) registry.Registry { func newRegistry(opts ...Option) Registry {
mergedOpts := append([]registry.Option{registry.Timeout(time.Millisecond * 100)}, opts...) mergedOpts := append([]Option{Timeout(time.Millisecond * 100)}, opts...)
options := registry.NewOptions(mergedOpts...) options := NewOptions(mergedOpts...)
// set the domain // set the domain
domain := mdnsDomain domain := mdnsDomain
@@ -148,18 +147,18 @@ func newRegistry(opts ...registry.Option) registry.Registry {
} }
} }
func (m *mdnsRegistry) Init(opts ...registry.Option) error { func (m *mdnsRegistry) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
o(m.opts) o(m.opts)
} }
return nil return nil
} }
func (m *mdnsRegistry) Options() registry.Options { func (m *mdnsRegistry) Options() Options {
return *m.opts return *m.opts
} }
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error { func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@@ -264,7 +263,7 @@ func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.Regi
return gerr return gerr
} }
func (m *mdnsRegistry) Deregister(service *registry.Service, opts ...registry.DeregisterOption) error { func (m *mdnsRegistry) Deregister(service *Service, opts ...DeregisterOption) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@@ -299,9 +298,9 @@ func (m *mdnsRegistry) Deregister(service *registry.Service, opts ...registry.De
return nil return nil
} }
func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) { func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service, error) {
logger := m.opts.Logger logger := m.opts.Logger
serviceMap := make(map[string]*registry.Service) serviceMap := make(map[string]*Service)
entries := make(chan *mdns.ServiceEntry, 10) entries := make(chan *mdns.ServiceEntry, 10)
done := make(chan bool) done := make(chan bool)
@@ -341,7 +340,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
s, ok := serviceMap[txt.Version] s, ok := serviceMap[txt.Version]
if !ok { if !ok {
s = &registry.Service{ s = &Service{
Name: txt.Service, Name: txt.Service,
Version: txt.Version, Version: txt.Version,
Endpoints: txt.Endpoints, Endpoints: txt.Endpoints,
@@ -358,7 +357,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
logger.Logf(log.InfoLevel, "[mdns]: invalid endpoint received: %v", e) logger.Logf(log.InfoLevel, "[mdns]: invalid endpoint received: %v", e)
continue continue
} }
s.Nodes = append(s.Nodes, &registry.Node{ s.Nodes = append(s.Nodes, &Node{
Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."), Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."),
Address: addr, Address: addr,
Metadata: txt.Metadata, Metadata: txt.Metadata,
@@ -381,7 +380,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
<-done <-done
// create list and return // create list and return
services := make([]*registry.Service, 0, len(serviceMap)) services := make([]*Service, 0, len(serviceMap))
for _, service := range serviceMap { for _, service := range serviceMap {
services = append(services, service) services = append(services, service)
@@ -390,7 +389,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
return services, nil return services, nil
} }
func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) { func (m *mdnsRegistry) ListServices(opts ...ListOption) ([]*Service, error) {
serviceMap := make(map[string]bool) serviceMap := make(map[string]bool)
entries := make(chan *mdns.ServiceEntry, 10) entries := make(chan *mdns.ServiceEntry, 10)
done := make(chan bool) done := make(chan bool)
@@ -405,7 +404,7 @@ func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
// set domain // set domain
p.Domain = m.domain p.Domain = m.domain
var services []*registry.Service var services []*Service
go func() { go func() {
for { for {
@@ -420,7 +419,7 @@ func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".") name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".")
if !serviceMap[name] { if !serviceMap[name] {
serviceMap[name] = true serviceMap[name] = true
services = append(services, &registry.Service{Name: name}) services = append(services, &Service{Name: name})
} }
case <-p.Context.Done(): case <-p.Context.Done():
close(done) close(done)
@@ -440,8 +439,8 @@ func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
return services, nil return services, nil
} }
func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { func (m *mdnsRegistry) Watch(opts ...WatchOption) (Watcher, error) {
var wo registry.WatchOptions var wo WatchOptions
for _, o := range opts { for _, o := range opts {
o(&wo) o(&wo)
} }
@@ -538,7 +537,7 @@ func (m *mdnsRegistry) String() string {
return "mdns" return "mdns"
} }
func (m *mdnsWatcher) Next() (*registry.Result, error) { func (m *mdnsWatcher) Next() (*Result, error) {
for { for {
select { select {
case e := <-m.ch: case e := <-m.ch:
@@ -563,7 +562,7 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
action = "create" action = "create"
} }
service := &registry.Service{ service := &Service{
Name: txt.Service, Name: txt.Service,
Version: txt.Version, Version: txt.Version,
Endpoints: txt.Endpoints, Endpoints: txt.Endpoints,
@@ -584,18 +583,18 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
addr = e.Addr.String() addr = e.Addr.String()
} }
service.Nodes = append(service.Nodes, &registry.Node{ service.Nodes = append(service.Nodes, &Node{
Id: strings.TrimSuffix(e.Name, suffix), Id: strings.TrimSuffix(e.Name, suffix),
Address: addr, Address: addr,
Metadata: txt.Metadata, Metadata: txt.Metadata,
}) })
return &registry.Result{ return &Result{
Action: action, Action: action,
Service: service, Service: service,
}, nil }, nil
case <-m.exit: case <-m.exit:
return nil, registry.ErrWatcherStopped return nil, ErrWatcherStopped
} }
} }
} }
@@ -607,6 +606,7 @@ func (m *mdnsWatcher) Stop() {
default: default:
close(m.exit) close(m.exit)
// remove self from the registry // remove self from the registry
m.registry.mtx.Lock() m.registry.mtx.Lock()
delete(m.registry.watchers, m.id) delete(m.registry.watchers, m.id)
m.registry.mtx.Unlock() m.registry.mtx.Unlock()
@@ -614,6 +614,6 @@ func (m *mdnsWatcher) Stop() {
} }
// NewRegistry returns a new default registry which is mdns. // NewRegistry returns a new default registry which is mdns.
func NewMDNSRegistry(opts ...registry.Option) registry.Registry { func NewMDNSRegistry(opts ...Option) Registry {
return newRegistry(opts...) return newRegistry(opts...)
} }

View File

@@ -1,11 +1,9 @@
package mdns package registry
import ( import (
"os" "os"
"testing" "testing"
"time" "time"
"go-micro.dev/v5/registry"
) )
func TestMDNS(t *testing.T) { func TestMDNS(t *testing.T) {
@@ -14,11 +12,11 @@ func TestMDNS(t *testing.T) {
t.Skip() t.Skip()
} }
testData := []*registry.Service{ testData := []*Service{
{ {
Name: "test1", Name: "test1",
Version: "1.0.1", Version: "1.0.1",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test1-1", Id: "test1-1",
Address: "10.0.0.1:10001", Address: "10.0.0.1:10001",
@@ -31,7 +29,7 @@ func TestMDNS(t *testing.T) {
{ {
Name: "test2", Name: "test2",
Version: "1.0.2", Version: "1.0.2",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test2-1", Id: "test2-1",
Address: "10.0.0.2:10002", Address: "10.0.0.2:10002",
@@ -44,7 +42,7 @@ func TestMDNS(t *testing.T) {
{ {
Name: "test3", Name: "test3",
Version: "1.0.3", Version: "1.0.3",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test3-1", Id: "test3-1",
Address: "10.0.0.3:10003", Address: "10.0.0.3:10003",
@@ -57,7 +55,7 @@ func TestMDNS(t *testing.T) {
{ {
Name: "test4", Name: "test4",
Version: "1.0.4", Version: "1.0.4",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test4-1", Id: "test4-1",
Address: "[::]:10004", Address: "[::]:10004",
@@ -71,10 +69,10 @@ func TestMDNS(t *testing.T) {
travis := os.Getenv("TRAVIS") travis := os.Getenv("TRAVIS")
var opts []registry.Option var opts []Option
if travis == "true" { if travis == "true" {
opts = append(opts, registry.Timeout(time.Millisecond*100)) opts = append(opts, Timeout(time.Millisecond*100))
} }
// new registry // new registry
@@ -158,14 +156,14 @@ func TestEncoding(t *testing.T) {
Metadata: map[string]string{ Metadata: map[string]string{
"foo": "bar", "foo": "bar",
}, },
Endpoints: []*registry.Endpoint{ Endpoints: []*Endpoint{
{ {
Name: "endpoint1", Name: "endpoint1",
Request: &registry.Value{ Request: &Value{
Name: "request", Name: "request",
Type: "request", Type: "request",
}, },
Response: &registry.Value{ Response: &Value{
Name: "response", Name: "response",
Type: "response", Type: "response",
}, },
@@ -215,11 +213,11 @@ func TestWatcher(t *testing.T) {
t.Skip() t.Skip()
} }
testData := []*registry.Service{ testData := []*Service{
{ {
Name: "test1", Name: "test1",
Version: "1.0.1", Version: "1.0.1",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test1-1", Id: "test1-1",
Address: "10.0.0.1:10001", Address: "10.0.0.1:10001",
@@ -232,7 +230,7 @@ func TestWatcher(t *testing.T) {
{ {
Name: "test2", Name: "test2",
Version: "1.0.2", Version: "1.0.2",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test2-1", Id: "test2-1",
Address: "10.0.0.2:10002", Address: "10.0.0.2:10002",
@@ -245,7 +243,7 @@ func TestWatcher(t *testing.T) {
{ {
Name: "test3", Name: "test3",
Version: "1.0.3", Version: "1.0.3",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test3-1", Id: "test3-1",
Address: "10.0.0.3:10003", Address: "10.0.0.3:10003",
@@ -258,7 +256,7 @@ func TestWatcher(t *testing.T) {
{ {
Name: "test4", Name: "test4",
Version: "1.0.4", Version: "1.0.4",
Nodes: []*registry.Node{ Nodes: []*Node{
{ {
Id: "test4-1", Id: "test4-1",
Address: "[::]:10004", Address: "[::]:10004",
@@ -270,7 +268,7 @@ func TestWatcher(t *testing.T) {
}, },
} }
testFn := func(service, s *registry.Service) { testFn := func(service, s *Service) {
if s == nil { if s == nil {
t.Fatalf("Expected one result for %s got nil", service.Name) t.Fatalf("Expected one result for %s got nil", service.Name)
} }
@@ -300,10 +298,10 @@ func TestWatcher(t *testing.T) {
travis := os.Getenv("TRAVIS") travis := os.Getenv("TRAVIS")
var opts []registry.Option var opts []Option
if travis == "true" { if travis == "true" {
opts = append(opts, registry.Timeout(time.Millisecond*100)) opts = append(opts, Timeout(time.Millisecond*100))
} }
// new registry // new registry

View File

@@ -95,5 +95,5 @@ func String() string {
} }
var ( var (
DefaultRegistry = NewMemoryRegistry() DefaultRegistry = NewMDNSRegistry()
) )