From 32942a3f635a59dd44d740925272402a532abe3d Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Wed, 7 May 2025 13:55:29 -0400 Subject: [PATCH] WIP --- cmd/nats_registry.go | 10 + go.mod | 24 +- go.sum | 75 ++++-- registry/mdns.go | 8 + registry/mdns_registry.go | 3 + registry/mdns_test.go | 3 + registry/nats.go | 419 ++++++++++++++++++++++++++++++ registry/nats_assert_test.go | 21 ++ registry/nats_environment_test.go | 71 +++++ registry/nats_options.go | 89 +++++++ registry/nats_registry.go | 8 + registry/nats_test.go | 96 +++++++ registry/nats_util.go | 108 ++++++++ registry/nats_watcher.go | 41 +++ registry/options_test.go | 166 ++++++++++++ registry/registry.go | 2 - 16 files changed, 1110 insertions(+), 34 deletions(-) create mode 100644 cmd/nats_registry.go create mode 100644 registry/mdns.go create mode 100644 registry/nats.go create mode 100644 registry/nats_assert_test.go create mode 100644 registry/nats_environment_test.go create mode 100644 registry/nats_options.go create mode 100644 registry/nats_registry.go create mode 100644 registry/nats_test.go create mode 100644 registry/nats_util.go create mode 100644 registry/nats_watcher.go create mode 100644 registry/options_test.go diff --git a/cmd/nats_registry.go b/cmd/nats_registry.go new file mode 100644 index 00000000..340eb6cf --- /dev/null +++ b/cmd/nats_registry.go @@ -0,0 +1,10 @@ +//go:build nats +// +build nats + +package cmd + +import "go-micro.dev/v5/registry" + +func init() { + DefaultRegistries["nats"] = registry.NewRegistry +} diff --git a/go.mod b/go.mod index 5b3f9a15..8e8691eb 100644 --- a/go.mod +++ b/go.mod @@ -10,33 +10,37 @@ require ( github.com/fsnotify/fsnotify v1.6.0 github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.3.0 - github.com/imdario/mergo v0.3.12 + github.com/imdario/mergo v0.3.13 github.com/kr/pretty v0.3.0 - github.com/miekg/dns v1.1.43 + github.com/miekg/dns v1.1.50 + github.com/nats-io/nats.go v1.42.0 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.10.0 github.com/urfave/cli/v2 v2.25.7 go.etcd.io/bbolt v1.4.0 - golang.org/x/crypto v0.21.0 - golang.org/x/net v0.23.0 - golang.org/x/sync v0.10.0 + golang.org/x/crypto v0.37.0 + golang.org/x/net v0.25.0 + golang.org/x/sync v0.13.0 google.golang.org/protobuf v1.33.0 ) require ( github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect - github.com/google/go-cmp v0.5.9 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/kr/text v0.2.0 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.6.1 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index c4d35383..d0eba04b 100644 --- a/go.sum +++ b/go.sum @@ -12,12 +12,14 @@ github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4 github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= -github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= +github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -26,8 +28,14 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg= -github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= +github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= +github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= +github.com/nats-io/nats.go v1.42.0 h1:ynIMupIOvf/ZWH/b2qda6WGKGNSjwOUutTpWRvAmhaM= +github.com/nats-io/nats.go v1.42.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= @@ -36,8 +44,9 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -48,26 +57,50 @@ github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.4.0 h1:TU77id3TnN/zKr7CO/uk+fBCwF2jGcMuw2B/FMAzYIk= go.etcd.io/bbolt v1.4.0/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -75,8 +108,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/registry/mdns.go b/registry/mdns.go new file mode 100644 index 00000000..c83e7f97 --- /dev/null +++ b/registry/mdns.go @@ -0,0 +1,8 @@ +//go:build !nats +// +build !nats + +package registry + +var ( + DefaultRegistry = NewRegistry() +) diff --git a/registry/mdns_registry.go b/registry/mdns_registry.go index a02ab009..3622be25 100644 --- a/registry/mdns_registry.go +++ b/registry/mdns_registry.go @@ -1,3 +1,6 @@ +//go:build !nats +// +build !nats + // Package mdns is a multicast dns registry package registry diff --git a/registry/mdns_test.go b/registry/mdns_test.go index de5a6bca..c425e8d9 100644 --- a/registry/mdns_test.go +++ b/registry/mdns_test.go @@ -1,3 +1,6 @@ +//go:build !nats +// +build !nats + package registry import ( diff --git a/registry/nats.go b/registry/nats.go new file mode 100644 index 00000000..e2c9684b --- /dev/null +++ b/registry/nats.go @@ -0,0 +1,419 @@ +//go:build nats +// +build nats + +// Package nats provides a NATS registry using broadcast queries +package registry + +import ( + "context" + "encoding/json" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" +) + +type natsRegistry struct { + addrs []string + opts Options + nopts nats.Options + queryTopic string + watchTopic string + registerAction string + + sync.RWMutex + conn *nats.Conn + services map[string][]*Service + listeners map[string]chan bool +} + +var ( + defaultQueryTopic = "micro.nats.query" + defaultWatchTopic = "micro.nats.watch" + defaultRegisterAction = "create" +) + +func configure(n *natsRegistry, opts ...Option) error { + for _, o := range opts { + o(&n.opts) + } + + natsOptions := nats.GetDefaultOptions() + if n, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok { + natsOptions = n + } + + queryTopic := defaultQueryTopic + if qt, ok := n.opts.Context.Value(queryTopicKey{}).(string); ok { + queryTopic = qt + } + + watchTopic := defaultWatchTopic + if wt, ok := n.opts.Context.Value(watchTopicKey{}).(string); ok { + watchTopic = wt + } + + registerAction := defaultRegisterAction + if ra, ok := n.opts.Context.Value(registerActionKey{}).(string); ok { + registerAction = ra + } + + // Options have higher priority than nats.Options + // only if Addrs, Secure or TLSConfig were not set through a Option + // we read them from nats.Option + if len(n.opts.Addrs) == 0 { + n.opts.Addrs = natsOptions.Servers + } + + if !n.opts.Secure { + n.opts.Secure = natsOptions.Secure + } + + if n.opts.TLSConfig == nil { + n.opts.TLSConfig = natsOptions.TLSConfig + } + + // check & add nats:// prefix (this makes also sure that the addresses + // stored in natsaddrs and n.opts.Addrs are identical) + n.opts.Addrs = setAddrs(n.opts.Addrs) + + n.addrs = n.opts.Addrs + n.nopts = natsOptions + n.queryTopic = queryTopic + n.watchTopic = watchTopic + n.registerAction = registerAction + + return nil +} + +func setAddrs(addrs []string) []string { + var cAddrs []string + for _, addr := range addrs { + if len(addr) == 0 { + continue + } + if !strings.HasPrefix(addr, "nats://") { + addr = "nats://" + addr + } + cAddrs = append(cAddrs, addr) + } + if len(cAddrs) == 0 { + cAddrs = []string{nats.DefaultURL} + } + return cAddrs +} + +func (n *natsRegistry) newConn() (*nats.Conn, error) { + opts := n.nopts + opts.Servers = n.addrs + opts.Secure = n.opts.Secure + opts.TLSConfig = n.opts.TLSConfig + + // secure might not be set + if opts.TLSConfig != nil { + opts.Secure = true + } + + return opts.Connect() +} + +func (n *natsRegistry) getConn() (*nats.Conn, error) { + n.Lock() + defer n.Unlock() + + if n.conn != nil { + return n.conn, nil + } + + c, err := n.newConn() + if err != nil { + return nil, err + } + n.conn = c + + return n.conn, nil +} + +func (n *natsRegistry) register(s *Service) error { + conn, err := n.getConn() + if err != nil { + return err + } + + n.Lock() + defer n.Unlock() + + // cache service + n.services[s.Name] = addServices(n.services[s.Name], cp([]*Service{s})) + + // create query listener + if n.listeners[s.Name] == nil { + listener := make(chan bool) + + // create a subscriber that responds to queries + sub, err := conn.Subscribe(n.queryTopic, func(m *nats.Msg) { + var result *Result + + if err := json.Unmarshal(m.Data, &result); err != nil { + return + } + + var services []*Service + + switch result.Action { + // is this a get query and we own the service? + case "get": + if result.Service.Name != s.Name { + return + } + n.RLock() + services = cp(n.services[s.Name]) + n.RUnlock() + // it's a list request, but we're still only a + // subscriber for this service... so just get this service + // totally suboptimal + case "list": + n.RLock() + services = cp(n.services[s.Name]) + n.RUnlock() + default: + // does not match + return + } + + // respond to query + for _, service := range services { + b, err := json.Marshal(service) + if err != nil { + continue + } + conn.Publish(m.Reply, b) + } + }) + if err != nil { + return err + } + + // Unsubscribe if we're told to do so + go func() { + <-listener + sub.Unsubscribe() + }() + + n.listeners[s.Name] = listener + } + + return nil +} + +func (n *natsRegistry) deregister(s *Service) error { + n.Lock() + defer n.Unlock() + + services := delServices(n.services[s.Name], cp([]*Service{s})) + if len(services) > 0 { + n.services[s.Name] = services + return nil + } + + // delete cached service + delete(n.services, s.Name) + + // delete query listener + if listener, lexists := n.listeners[s.Name]; lexists { + close(listener) + delete(n.listeners, s.Name) + } + + return nil +} + +func (n *natsRegistry) query(s string, quorum int) ([]*Service, error) { + conn, err := n.getConn() + if err != nil { + return nil, err + } + + var action string + var service *Service + + if len(s) > 0 { + action = "get" + service = &Service{Name: s} + } else { + action = "list" + } + + inbox := nats.NewInbox() + + response := make(chan *Service, 10) + + sub, err := conn.Subscribe(inbox, func(m *nats.Msg) { + var service *Service + if err := json.Unmarshal(m.Data, &service); err != nil { + return + } + select { + case response <- service: + case <-time.After(n.opts.Timeout): + } + }) + if err != nil { + return nil, err + } + defer sub.Unsubscribe() + + b, err := json.Marshal(&Result{Action: action, Service: service}) + if err != nil { + return nil, err + } + + if err := conn.PublishMsg(&nats.Msg{ + Subject: n.queryTopic, + Reply: inbox, + Data: b, + }); err != nil { + return nil, err + } + + timeoutChan := time.After(n.opts.Timeout) + + serviceMap := make(map[string]*Service) + +loop: + for { + select { + case service := <-response: + key := service.Name + "-" + service.Version + srv, ok := serviceMap[key] + if ok { + srv.Nodes = append(srv.Nodes, service.Nodes...) + serviceMap[key] = srv + } else { + serviceMap[key] = service + } + + if quorum > 0 && len(serviceMap[key].Nodes) >= quorum { + break loop + } + case <-timeoutChan: + break loop + } + } + + var services []*Service + for _, service := range serviceMap { + services = append(services, service) + } + return services, nil +} + +func (n *natsRegistry) Init(opts ...Option) error { + return configure(n, opts...) +} + +func (n *natsRegistry) Options() Options { + return n.opts +} + +func (n *natsRegistry) Register(s *Service, opts ...RegisterOption) error { + if err := n.register(s); err != nil { + return err + } + + conn, err := n.getConn() + if err != nil { + return err + } + + b, err := json.Marshal(&Result{Action: n.registerAction, Service: s}) + if err != nil { + return err + } + + return conn.Publish(n.watchTopic, b) +} + +func (n *natsRegistry) Deregister(s *Service, opts ...DeregisterOption) error { + if err := n.deregister(s); err != nil { + return err + } + + conn, err := n.getConn() + if err != nil { + return err + } + + b, err := json.Marshal(&Result{Action: "delete", Service: s}) + if err != nil { + return err + } + return conn.Publish(n.watchTopic, b) +} + +func (n *natsRegistry) GetService(s string, opts ...GetOption) ([]*Service, error) { + services, err := n.query(s, getQuorum(n.opts)) + if err != nil { + return nil, err + } + return services, nil +} + +func (n *natsRegistry) ListServices(opts ...ListOption) ([]*Service, error) { + s, err := n.query("", 0) + if err != nil { + return nil, err + } + + var services []*Service + serviceMap := make(map[string]*Service) + + for _, v := range s { + serviceMap[v.Name] = &Service{Name: v.Name, Version: v.Version} + } + + for _, v := range serviceMap { + services = append(services, v) + } + + return services, nil +} + +func (n *natsRegistry) Watch(opts ...WatchOption) (Watcher, error) { + conn, err := n.getConn() + if err != nil { + return nil, err + } + + sub, err := conn.SubscribeSync(n.watchTopic) + if err != nil { + return nil, err + } + + var wo WatchOptions + for _, o := range opts { + o(&wo) + } + + return &natsWatcher{sub, wo}, nil +} + +func (n *natsRegistry) String() string { + return "nats" +} + +func NewRegistry(opts ...Option) Registry { + options := Options{ + Timeout: time.Millisecond * 100, + Context: context.Background(), + } + + n := &natsRegistry{ + opts: options, + services: make(map[string][]*Service), + listeners: make(map[string]chan bool), + } + configure(n, opts...) + return n +} diff --git a/registry/nats_assert_test.go b/registry/nats_assert_test.go new file mode 100644 index 00000000..4bdb08a0 --- /dev/null +++ b/registry/nats_assert_test.go @@ -0,0 +1,21 @@ +//go:build nats +// +build nats + +package registry_test + +import ( + "reflect" + "testing" +) + +func assertNoError(tb testing.TB, actual error) { + if actual != nil { + tb.Errorf("expected no error, got %v", actual) + } +} + +func assertEqual(tb testing.TB, expected, actual interface{}) { + if !reflect.DeepEqual(expected, actual) { + tb.Errorf("expected %v, got %v", expected, actual) + } +} diff --git a/registry/nats_environment_test.go b/registry/nats_environment_test.go new file mode 100644 index 00000000..c744a8d2 --- /dev/null +++ b/registry/nats_environment_test.go @@ -0,0 +1,71 @@ +//go:build nats +// +build nats + +package registry_test + +import ( + "os" + "testing" + + log "go-micro.dev/v5/logger" + "go-micro.dev/v5/registry" +) + +type environment struct { + registryOne Registry + registryTwo Registry + registryThree Registry + + serviceOne Service + serviceTwo Service + + nodeOne Node + nodeTwo Node + nodeThree Node +} + +var e environment + +func TestMain(m *testing.M) { + natsURL := os.Getenv("NATS_URL") + if natsURL == "" { + log.Infof("NATS_URL is undefined - skipping tests") + return + } + + e.registryOne = registry.NewRegistry(Addrs(natsURL), registry.Quorum(1)) + e.registryTwo = registry.NewRegistry(Addrs(natsURL), registry.Quorum(1)) + e.registryThree = registry.NewRegistry(Addrs(natsURL), registry.Quorum(1)) + + e.serviceOne.Name = "one" + e.serviceOne.Version = "default" + e.serviceOne.Nodes = []*Node{&e.nodeOne} + + e.serviceTwo.Name = "two" + e.serviceTwo.Version = "default" + e.serviceTwo.Nodes = []*Node{&e.nodeOne, &e.nodeTwo} + + e.nodeOne.Id = "one" + e.nodeTwo.Id = "two" + e.nodeThree.Id = "three" + + if err := e.registryOne.Register(&e.serviceOne); err != nil { + log.Fatal(err) + } + + if err := e.registryOne.Register(&e.serviceTwo); err != nil { + log.Fatal(err) + } + + result := m.Run() + + if err := e.registryOne.Deregister(&e.serviceOne); err != nil { + log.Fatal(err) + } + + if err := e.registryOne.Deregister(&e.serviceTwo); err != nil { + log.Fatal(err) + } + + os.Exit(result) +} diff --git a/registry/nats_options.go b/registry/nats_options.go new file mode 100644 index 00000000..68fb8510 --- /dev/null +++ b/registry/nats_options.go @@ -0,0 +1,89 @@ +//go:build nats +// +build nats + +package registry + +import ( + "context" + + "github.com/nats-io/nats.go" +) + +type contextQuorumKey struct{} +type optionsKey struct{} +type watchTopicKey struct{} +type queryTopicKey struct{} +type registerActionKey struct{} + +var ( + DefaultQuorum = 0 +) + +func getQuorum(o Options) int { + if o.Context == nil { + return DefaultQuorum + } + + value := o.Context.Value(contextQuorumKey{}) + if v, ok := value.(int); ok { + return v + } else { + return DefaultQuorum + } +} + +func Quorum(n int) Option { + return func(o *Options) { + o.Context = context.WithValue(o.Context, contextQuorumKey{}, n) + } +} + +// Options allow to inject a nats.Options struct for configuring +// the nats connection. +func NatsOptions(nopts nats.Options) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, optionsKey{}, nopts) + } +} + +// QueryTopic allows to set a custom nats topic on which service registries +// query (survey) other services. All registries listen on this topic and +// then respond to the query message. +func QueryTopic(s string) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, queryTopicKey{}, s) + } +} + +// WatchTopic allows to set a custom nats topic on which registries broadcast +// changes (e.g. when services are added, updated or removed). Since we don't +// have a central registry service, each service typically broadcasts in a +// determined frequency on this topic. +func WatchTopic(s string) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, watchTopicKey{}, s) + } +} + +// RegisterAction allows to set the action to use when registering to nats. +// As of now there are three different options: +// - "create" (default) only registers if there is noone already registered under the same key. +// - "update" only updates the registration if it already exists. +// - "put" creates or updates a registration +func RegisterAction(s string) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, registerActionKey{}, s) + } +} diff --git a/registry/nats_registry.go b/registry/nats_registry.go new file mode 100644 index 00000000..367bf34d --- /dev/null +++ b/registry/nats_registry.go @@ -0,0 +1,8 @@ +//go:build nats +// +build nats + +package registry + +var ( + DefaultRegistry = NewRegistry() +) diff --git a/registry/nats_test.go b/registry/nats_test.go new file mode 100644 index 00000000..0bd17af0 --- /dev/null +++ b/registry/nats_test.go @@ -0,0 +1,96 @@ +//go:build nats +// +build nats + +package registry_test + +import ( + "testing" +) + +func TestRegister(t *testing.T) { + service := Service{Name: "test"} + assertNoError(t, e.registryOne.Register(&service)) + defer e.registryOne.Deregister(&service) + + services, err := e.registryOne.ListServices() + assertNoError(t, err) + assertEqual(t, 3, len(services)) + + services, err = e.registryTwo.ListServices() + assertNoError(t, err) + assertEqual(t, 3, len(services)) +} + +func TestDeregister(t *testing.T) { + service1 := Service{Name: "test-deregister", Version: "v1"} + service2 := Service{Name: "test-deregister", Version: "v2"} + + assertNoError(t, e.registryOne.Register(&service1)) + services, err := e.registryOne.GetService(service1.Name) + assertNoError(t, err) + assertEqual(t, 1, len(services)) + + assertNoError(t, e.registryOne.Register(&service2)) + services, err = e.registryOne.GetService(service2.Name) + assertNoError(t, err) + assertEqual(t, 2, len(services)) + + assertNoError(t, e.registryOne.Deregister(&service1)) + services, err = e.registryOne.GetService(service1.Name) + assertNoError(t, err) + assertEqual(t, 1, len(services)) + + assertNoError(t, e.registryOne.Deregister(&service2)) + services, err = e.registryOne.GetService(service1.Name) + assertNoError(t, err) + assertEqual(t, 0, len(services)) +} + +func TestGetService(t *testing.T) { + services, err := e.registryTwo.GetService("one") + assertNoError(t, err) + assertEqual(t, 1, len(services)) + assertEqual(t, "one", services[0].Name) + assertEqual(t, 1, len(services[0].Nodes)) +} + +func TestGetServiceWithNoNodes(t *testing.T) { + services, err := e.registryOne.GetService("missing") + assertNoError(t, err) + assertEqual(t, 0, len(services)) +} + +func TestGetServiceFromMultipleNodes(t *testing.T) { + services, err := e.registryOne.GetService("two") + assertNoError(t, err) + assertEqual(t, 1, len(services)) + assertEqual(t, "two", services[0].Name) + assertEqual(t, 2, len(services[0].Nodes)) +} + +func BenchmarkGetService(b *testing.B) { + for n := 0; n < b.N; n++ { + services, err := e.registryTwo.GetService("one") + assertNoError(b, err) + assertEqual(b, 1, len(services)) + assertEqual(b, "one", services[0].Name) + } +} + +func BenchmarkGetServiceWithNoNodes(b *testing.B) { + for n := 0; n < b.N; n++ { + services, err := e.registryOne.GetService("missing") + assertNoError(b, err) + assertEqual(b, 0, len(services)) + } +} + +func BenchmarkGetServiceFromMultipleNodes(b *testing.B) { + for n := 0; n < b.N; n++ { + services, err := e.registryTwo.GetService("two") + assertNoError(b, err) + assertEqual(b, 1, len(services)) + assertEqual(b, "two", services[0].Name) + assertEqual(b, 2, len(services[0].Nodes)) + } +} diff --git a/registry/nats_util.go b/registry/nats_util.go new file mode 100644 index 00000000..466a61aa --- /dev/null +++ b/registry/nats_util.go @@ -0,0 +1,108 @@ +//go:build nats +// +build nats + +package registry + +func cp(current []*Service) []*Service { + var services []*Service + + for _, service := range current { + // copy service + s := new(Service) + *s = *service + + // copy nodes + var nodes []*Node + for _, node := range service.Nodes { + n := new(Node) + *n = *node + nodes = append(nodes, n) + } + s.Nodes = nodes + + // copy endpoints + var eps []*Endpoint + for _, ep := range service.Endpoints { + e := new(Endpoint) + *e = *ep + eps = append(eps, e) + } + s.Endpoints = eps + + // append service + services = append(services, s) + } + + return services +} + +func addNodes(old, neu []*Node) []*Node { + for _, n := range neu { + var seen bool + for i, o := range old { + if o.Id == n.Id { + seen = true + old[i] = n + break + } + } + if !seen { + old = append(old, n) + } + } + return old +} + +func addServices(old, neu []*Service) []*Service { + for _, s := range neu { + var seen bool + for i, o := range old { + if o.Version == s.Version { + s.Nodes = addNodes(o.Nodes, s.Nodes) + seen = true + old[i] = s + break + } + } + if !seen { + old = append(old, s) + } + } + return old +} + +func delNodes(old, del []*Node) []*Node { + var nodes []*Node + for _, o := range old { + var rem bool + for _, n := range del { + if o.Id == n.Id { + rem = true + break + } + } + if !rem { + nodes = append(nodes, o) + } + } + return nodes +} + +func delServices(old, del []*Service) []*Service { + var services []*Service + for i, o := range old { + var rem bool + for _, s := range del { + if o.Version == s.Version { + old[i].Nodes = delNodes(o.Nodes, s.Nodes) + if len(old[i].Nodes) == 0 { + rem = true + } + } + } + if !rem { + services = append(services, o) + } + } + return services +} diff --git a/registry/nats_watcher.go b/registry/nats_watcher.go new file mode 100644 index 00000000..86b3ccae --- /dev/null +++ b/registry/nats_watcher.go @@ -0,0 +1,41 @@ +//go:build nats +// +build nats + +package registry + +import ( + "encoding/json" + "time" + + "github.com/nats-io/nats.go" +) + +type natsWatcher struct { + sub *nats.Subscription + wo WatchOptions +} + +func (n *natsWatcher) Next() (*Result, error) { + var result *Result + for { + m, err := n.sub.NextMsg(time.Minute) + if err != nil && err == nats.ErrTimeout { + continue + } else if err != nil { + return nil, err + } + if err := json.Unmarshal(m.Data, &result); err != nil { + return nil, err + } + if len(n.wo.Service) > 0 && result.Service.Name != n.wo.Service { + continue + } + break + } + + return result, nil +} + +func (n *natsWatcher) Stop() { + n.sub.Unsubscribe() +} diff --git a/registry/options_test.go b/registry/options_test.go new file mode 100644 index 00000000..aeb726c8 --- /dev/null +++ b/registry/options_test.go @@ -0,0 +1,166 @@ +//go:build nats +// +build nats + +package registry + +import ( + "fmt" + "log" + "os" + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +var addrTestCases = []struct { + name string + description string + addrs map[string]string // expected address : set address +}{ + { + "registryOption", + "set registry addresses through a registry.Option in constructor", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "natsOption", + "set registry addresses through the nats.Option in constructor", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "default", + "check if default Address is set correctly", + map[string]string{ + nats.DefaultURL: "", + }, + }, +} + +func TestInitAddrs(t *testing.T) { + for _, tc := range addrTestCases { + t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) { + var reg Registry + var addrs []string + + for _, addr := range tc.addrs { + addrs = append(addrs, addr) + } + + switch tc.name { + case "registryOption": + // we know that there are just two addrs in the dict + reg = NewRegistry(Addrs(addrs[0], addrs[1])) + case "natsOption": + nopts := nats.GetDefaultOptions() + nopts.Servers = addrs + reg = NewRegistry(Options(nopts)) + case "default": + reg = NewRegistry() + } + + // if err := reg.Register(dummyService); err != nil { + // t.Fatal(err) + // } + + natsRegistry, ok := reg.(*natsRegistry) + if !ok { + t.Fatal("Expected registry to be of types *natsRegistry") + } + // check if the same amount of addrs we set has actually been set + if len(natsRegistry.addrs) != len(tc.addrs) { + t.Errorf("Expected Addr = %v, Actual Addr = %v", + natsRegistry.addrs, tc.addrs) + t.Errorf("Expected Addr count = %d, Actual Addr count = %d", + len(natsRegistry.addrs), len(tc.addrs)) + } + + for _, addr := range natsRegistry.addrs { + _, ok := tc.addrs[addr] + if !ok { + t.Errorf("Expected Addr = %v, Actual Addr = %v", + natsRegistry.addrs, tc.addrs) + t.Errorf("Expected '%s' has not been set", addr) + } + } + }) + } +} + +func TestWatchQueryTopic(t *testing.T) { + natsURL := os.Getenv("NATS_URL") + if natsURL == "" { + log.Println("NATS_URL is undefined - skipping tests") + return + } + + watchTopic := "custom.test.watch" + queryTopic := "custom.test.query" + wt := WatchTopic(watchTopic) + qt := QueryTopic(queryTopic) + + // connect to NATS and subscribe to the Watch & Query topics where the + // registry will publish a msg + nopts := nats.GetDefaultOptions() + nopts.Servers = setAddrs([]string{natsURL}) + conn, err := nopts.Connect() + if err != nil { + t.Fatal(err) + } + + wg := sync.WaitGroup{} + wg.Add(2) + + okCh := make(chan struct{}) + + // Wait until we have received something on both topics + go func() { + wg.Wait() + close(okCh) + }() + + // handler just calls wg.Done() + rcvdHdlr := func(m *nats.Msg) { + wg.Done() + } + + _, err = conn.Subscribe(queryTopic, rcvdHdlr) + if err != nil { + t.Fatal(err) + } + + _, err = conn.Subscribe(watchTopic, rcvdHdlr) + if err != nil { + t.Fatal(err) + } + + dummyService := &Service{ + Name: "TestInitAddr", + Version: "1.0.0", + } + + reg := NewRegistry(qt, wt, Addrs(natsURL)) + + // trigger registry to send out message on watchTopic + if err := reg.Register(dummyService); err != nil { + t.Fatal(err) + } + + // trigger registry to send out message on queryTopic + if _, err := reg.ListServices(); err != nil { + t.Fatal(err) + } + + // make sure that we received something on tc.topic + select { + case <-okCh: + // fine - we received on both topics a message from the registry + case <-time.After(time.Millisecond * 200): + t.Fatal("timeout - no data received on watch topic") + } +} diff --git a/registry/registry.go b/registry/registry.go index 2ba5d56d..5b7d3b47 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -6,8 +6,6 @@ import ( ) var ( - DefaultRegistry = NewRegistry() - // Not found error when GetService is called. ErrNotFound = errors.New("service not found") // Watcher stopped error when watcher is stopped.