1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-08-10 21:52:01 +02:00
This commit is contained in:
Brian Ketelsen
2025-05-07 13:55:29 -04:00
parent 46db7df218
commit 32942a3f63
16 changed files with 1110 additions and 34 deletions

10
cmd/nats_registry.go Normal file
View File

@@ -0,0 +1,10 @@
//go:build nats
// +build nats
package cmd
import "go-micro.dev/v5/registry"
func init() {
DefaultRegistries["nats"] = registry.NewRegistry
}

24
go.mod
View File

@@ -10,33 +10,37 @@ require (
github.com/fsnotify/fsnotify v1.6.0 github.com/fsnotify/fsnotify v1.6.0
github.com/golang/protobuf v1.5.4 github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.12 github.com/imdario/mergo v0.3.13
github.com/kr/pretty v0.3.0 github.com/kr/pretty v0.3.0
github.com/miekg/dns v1.1.43 github.com/miekg/dns v1.1.50
github.com/nats-io/nats.go v1.42.0
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
github.com/urfave/cli/v2 v2.25.7 github.com/urfave/cli/v2 v2.25.7
go.etcd.io/bbolt v1.4.0 go.etcd.io/bbolt v1.4.0
golang.org/x/crypto v0.21.0 golang.org/x/crypto v0.37.0
golang.org/x/net v0.23.0 golang.org/x/net v0.25.0
golang.org/x/sync v0.10.0 golang.org/x/sync v0.13.0
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.33.0
) )
require ( require (
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect github.com/klauspost/compress v1.18.0 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/sys v0.29.0 // indirect golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

75
go.sum
View File

@@ -12,12 +12,14 @@ github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
@@ -26,8 +28,14 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg= github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
github.com/nats-io/nats.go v1.42.0 h1:ynIMupIOvf/ZWH/b2qda6WGKGNSjwOUutTpWRvAmhaM=
github.com/nats-io/nats.go v1.42.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
@@ -36,8 +44,9 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -48,26 +57,50 @@ github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.4.0 h1:TU77id3TnN/zKr7CO/uk+fBCwF2jGcMuw2B/FMAzYIk= go.etcd.io/bbolt v1.4.0 h1:TU77id3TnN/zKr7CO/uk+fBCwF2jGcMuw2B/FMAzYIk=
go.etcd.io/bbolt v1.4.0/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk= go.etcd.io/bbolt v1.4.0/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -75,8 +108,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

8
registry/mdns.go Normal file
View File

@@ -0,0 +1,8 @@
//go:build !nats
// +build !nats
package registry
var (
DefaultRegistry = NewRegistry()
)

View File

@@ -1,3 +1,6 @@
//go:build !nats
// +build !nats
// Package mdns is a multicast dns registry // Package mdns is a multicast dns registry
package registry package registry

View File

@@ -1,3 +1,6 @@
//go:build !nats
// +build !nats
package registry package registry
import ( import (

419
registry/nats.go Normal file
View File

@@ -0,0 +1,419 @@
//go:build nats
// +build nats
// Package nats provides a NATS registry using broadcast queries
package registry
import (
"context"
"encoding/json"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go"
)
type natsRegistry struct {
addrs []string
opts Options
nopts nats.Options
queryTopic string
watchTopic string
registerAction string
sync.RWMutex
conn *nats.Conn
services map[string][]*Service
listeners map[string]chan bool
}
var (
defaultQueryTopic = "micro.nats.query"
defaultWatchTopic = "micro.nats.watch"
defaultRegisterAction = "create"
)
func configure(n *natsRegistry, opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
natsOptions := nats.GetDefaultOptions()
if n, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
natsOptions = n
}
queryTopic := defaultQueryTopic
if qt, ok := n.opts.Context.Value(queryTopicKey{}).(string); ok {
queryTopic = qt
}
watchTopic := defaultWatchTopic
if wt, ok := n.opts.Context.Value(watchTopicKey{}).(string); ok {
watchTopic = wt
}
registerAction := defaultRegisterAction
if ra, ok := n.opts.Context.Value(registerActionKey{}).(string); ok {
registerAction = ra
}
// Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a Option
// we read them from nats.Option
if len(n.opts.Addrs) == 0 {
n.opts.Addrs = natsOptions.Servers
}
if !n.opts.Secure {
n.opts.Secure = natsOptions.Secure
}
if n.opts.TLSConfig == nil {
n.opts.TLSConfig = natsOptions.TLSConfig
}
// check & add nats:// prefix (this makes also sure that the addresses
// stored in natsaddrs and n.opts.Addrs are identical)
n.opts.Addrs = setAddrs(n.opts.Addrs)
n.addrs = n.opts.Addrs
n.nopts = natsOptions
n.queryTopic = queryTopic
n.watchTopic = watchTopic
n.registerAction = registerAction
return nil
}
func setAddrs(addrs []string) []string {
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
if !strings.HasPrefix(addr, "nats://") {
addr = "nats://" + addr
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{nats.DefaultURL}
}
return cAddrs
}
func (n *natsRegistry) newConn() (*nats.Conn, error) {
opts := n.nopts
opts.Servers = n.addrs
opts.Secure = n.opts.Secure
opts.TLSConfig = n.opts.TLSConfig
// secure might not be set
if opts.TLSConfig != nil {
opts.Secure = true
}
return opts.Connect()
}
func (n *natsRegistry) getConn() (*nats.Conn, error) {
n.Lock()
defer n.Unlock()
if n.conn != nil {
return n.conn, nil
}
c, err := n.newConn()
if err != nil {
return nil, err
}
n.conn = c
return n.conn, nil
}
func (n *natsRegistry) register(s *Service) error {
conn, err := n.getConn()
if err != nil {
return err
}
n.Lock()
defer n.Unlock()
// cache service
n.services[s.Name] = addServices(n.services[s.Name], cp([]*Service{s}))
// create query listener
if n.listeners[s.Name] == nil {
listener := make(chan bool)
// create a subscriber that responds to queries
sub, err := conn.Subscribe(n.queryTopic, func(m *nats.Msg) {
var result *Result
if err := json.Unmarshal(m.Data, &result); err != nil {
return
}
var services []*Service
switch result.Action {
// is this a get query and we own the service?
case "get":
if result.Service.Name != s.Name {
return
}
n.RLock()
services = cp(n.services[s.Name])
n.RUnlock()
// it's a list request, but we're still only a
// subscriber for this service... so just get this service
// totally suboptimal
case "list":
n.RLock()
services = cp(n.services[s.Name])
n.RUnlock()
default:
// does not match
return
}
// respond to query
for _, service := range services {
b, err := json.Marshal(service)
if err != nil {
continue
}
conn.Publish(m.Reply, b)
}
})
if err != nil {
return err
}
// Unsubscribe if we're told to do so
go func() {
<-listener
sub.Unsubscribe()
}()
n.listeners[s.Name] = listener
}
return nil
}
func (n *natsRegistry) deregister(s *Service) error {
n.Lock()
defer n.Unlock()
services := delServices(n.services[s.Name], cp([]*Service{s}))
if len(services) > 0 {
n.services[s.Name] = services
return nil
}
// delete cached service
delete(n.services, s.Name)
// delete query listener
if listener, lexists := n.listeners[s.Name]; lexists {
close(listener)
delete(n.listeners, s.Name)
}
return nil
}
func (n *natsRegistry) query(s string, quorum int) ([]*Service, error) {
conn, err := n.getConn()
if err != nil {
return nil, err
}
var action string
var service *Service
if len(s) > 0 {
action = "get"
service = &Service{Name: s}
} else {
action = "list"
}
inbox := nats.NewInbox()
response := make(chan *Service, 10)
sub, err := conn.Subscribe(inbox, func(m *nats.Msg) {
var service *Service
if err := json.Unmarshal(m.Data, &service); err != nil {
return
}
select {
case response <- service:
case <-time.After(n.opts.Timeout):
}
})
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
b, err := json.Marshal(&Result{Action: action, Service: service})
if err != nil {
return nil, err
}
if err := conn.PublishMsg(&nats.Msg{
Subject: n.queryTopic,
Reply: inbox,
Data: b,
}); err != nil {
return nil, err
}
timeoutChan := time.After(n.opts.Timeout)
serviceMap := make(map[string]*Service)
loop:
for {
select {
case service := <-response:
key := service.Name + "-" + service.Version
srv, ok := serviceMap[key]
if ok {
srv.Nodes = append(srv.Nodes, service.Nodes...)
serviceMap[key] = srv
} else {
serviceMap[key] = service
}
if quorum > 0 && len(serviceMap[key].Nodes) >= quorum {
break loop
}
case <-timeoutChan:
break loop
}
}
var services []*Service
for _, service := range serviceMap {
services = append(services, service)
}
return services, nil
}
func (n *natsRegistry) Init(opts ...Option) error {
return configure(n, opts...)
}
func (n *natsRegistry) Options() Options {
return n.opts
}
func (n *natsRegistry) Register(s *Service, opts ...RegisterOption) error {
if err := n.register(s); err != nil {
return err
}
conn, err := n.getConn()
if err != nil {
return err
}
b, err := json.Marshal(&Result{Action: n.registerAction, Service: s})
if err != nil {
return err
}
return conn.Publish(n.watchTopic, b)
}
func (n *natsRegistry) Deregister(s *Service, opts ...DeregisterOption) error {
if err := n.deregister(s); err != nil {
return err
}
conn, err := n.getConn()
if err != nil {
return err
}
b, err := json.Marshal(&Result{Action: "delete", Service: s})
if err != nil {
return err
}
return conn.Publish(n.watchTopic, b)
}
func (n *natsRegistry) GetService(s string, opts ...GetOption) ([]*Service, error) {
services, err := n.query(s, getQuorum(n.opts))
if err != nil {
return nil, err
}
return services, nil
}
func (n *natsRegistry) ListServices(opts ...ListOption) ([]*Service, error) {
s, err := n.query("", 0)
if err != nil {
return nil, err
}
var services []*Service
serviceMap := make(map[string]*Service)
for _, v := range s {
serviceMap[v.Name] = &Service{Name: v.Name, Version: v.Version}
}
for _, v := range serviceMap {
services = append(services, v)
}
return services, nil
}
func (n *natsRegistry) Watch(opts ...WatchOption) (Watcher, error) {
conn, err := n.getConn()
if err != nil {
return nil, err
}
sub, err := conn.SubscribeSync(n.watchTopic)
if err != nil {
return nil, err
}
var wo WatchOptions
for _, o := range opts {
o(&wo)
}
return &natsWatcher{sub, wo}, nil
}
func (n *natsRegistry) String() string {
return "nats"
}
func NewRegistry(opts ...Option) Registry {
options := Options{
Timeout: time.Millisecond * 100,
Context: context.Background(),
}
n := &natsRegistry{
opts: options,
services: make(map[string][]*Service),
listeners: make(map[string]chan bool),
}
configure(n, opts...)
return n
}

View File

@@ -0,0 +1,21 @@
//go:build nats
// +build nats
package registry_test
import (
"reflect"
"testing"
)
func assertNoError(tb testing.TB, actual error) {
if actual != nil {
tb.Errorf("expected no error, got %v", actual)
}
}
func assertEqual(tb testing.TB, expected, actual interface{}) {
if !reflect.DeepEqual(expected, actual) {
tb.Errorf("expected %v, got %v", expected, actual)
}
}

View File

@@ -0,0 +1,71 @@
//go:build nats
// +build nats
package registry_test
import (
"os"
"testing"
log "go-micro.dev/v5/logger"
"go-micro.dev/v5/registry"
)
type environment struct {
registryOne Registry
registryTwo Registry
registryThree Registry
serviceOne Service
serviceTwo Service
nodeOne Node
nodeTwo Node
nodeThree Node
}
var e environment
func TestMain(m *testing.M) {
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
log.Infof("NATS_URL is undefined - skipping tests")
return
}
e.registryOne = registry.NewRegistry(Addrs(natsURL), registry.Quorum(1))
e.registryTwo = registry.NewRegistry(Addrs(natsURL), registry.Quorum(1))
e.registryThree = registry.NewRegistry(Addrs(natsURL), registry.Quorum(1))
e.serviceOne.Name = "one"
e.serviceOne.Version = "default"
e.serviceOne.Nodes = []*Node{&e.nodeOne}
e.serviceTwo.Name = "two"
e.serviceTwo.Version = "default"
e.serviceTwo.Nodes = []*Node{&e.nodeOne, &e.nodeTwo}
e.nodeOne.Id = "one"
e.nodeTwo.Id = "two"
e.nodeThree.Id = "three"
if err := e.registryOne.Register(&e.serviceOne); err != nil {
log.Fatal(err)
}
if err := e.registryOne.Register(&e.serviceTwo); err != nil {
log.Fatal(err)
}
result := m.Run()
if err := e.registryOne.Deregister(&e.serviceOne); err != nil {
log.Fatal(err)
}
if err := e.registryOne.Deregister(&e.serviceTwo); err != nil {
log.Fatal(err)
}
os.Exit(result)
}

89
registry/nats_options.go Normal file
View File

@@ -0,0 +1,89 @@
//go:build nats
// +build nats
package registry
import (
"context"
"github.com/nats-io/nats.go"
)
type contextQuorumKey struct{}
type optionsKey struct{}
type watchTopicKey struct{}
type queryTopicKey struct{}
type registerActionKey struct{}
var (
DefaultQuorum = 0
)
func getQuorum(o Options) int {
if o.Context == nil {
return DefaultQuorum
}
value := o.Context.Value(contextQuorumKey{})
if v, ok := value.(int); ok {
return v
} else {
return DefaultQuorum
}
}
func Quorum(n int) Option {
return func(o *Options) {
o.Context = context.WithValue(o.Context, contextQuorumKey{}, n)
}
}
// Options allow to inject a nats.Options struct for configuring
// the nats connection.
func NatsOptions(nopts nats.Options) Option {
return func(o *Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, optionsKey{}, nopts)
}
}
// QueryTopic allows to set a custom nats topic on which service registries
// query (survey) other services. All registries listen on this topic and
// then respond to the query message.
func QueryTopic(s string) Option {
return func(o *Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, queryTopicKey{}, s)
}
}
// WatchTopic allows to set a custom nats topic on which registries broadcast
// changes (e.g. when services are added, updated or removed). Since we don't
// have a central registry service, each service typically broadcasts in a
// determined frequency on this topic.
func WatchTopic(s string) Option {
return func(o *Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, watchTopicKey{}, s)
}
}
// RegisterAction allows to set the action to use when registering to nats.
// As of now there are three different options:
// - "create" (default) only registers if there is noone already registered under the same key.
// - "update" only updates the registration if it already exists.
// - "put" creates or updates a registration
func RegisterAction(s string) Option {
return func(o *Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, registerActionKey{}, s)
}
}

View File

@@ -0,0 +1,8 @@
//go:build nats
// +build nats
package registry
var (
DefaultRegistry = NewRegistry()
)

96
registry/nats_test.go Normal file
View File

@@ -0,0 +1,96 @@
//go:build nats
// +build nats
package registry_test
import (
"testing"
)
func TestRegister(t *testing.T) {
service := Service{Name: "test"}
assertNoError(t, e.registryOne.Register(&service))
defer e.registryOne.Deregister(&service)
services, err := e.registryOne.ListServices()
assertNoError(t, err)
assertEqual(t, 3, len(services))
services, err = e.registryTwo.ListServices()
assertNoError(t, err)
assertEqual(t, 3, len(services))
}
func TestDeregister(t *testing.T) {
service1 := Service{Name: "test-deregister", Version: "v1"}
service2 := Service{Name: "test-deregister", Version: "v2"}
assertNoError(t, e.registryOne.Register(&service1))
services, err := e.registryOne.GetService(service1.Name)
assertNoError(t, err)
assertEqual(t, 1, len(services))
assertNoError(t, e.registryOne.Register(&service2))
services, err = e.registryOne.GetService(service2.Name)
assertNoError(t, err)
assertEqual(t, 2, len(services))
assertNoError(t, e.registryOne.Deregister(&service1))
services, err = e.registryOne.GetService(service1.Name)
assertNoError(t, err)
assertEqual(t, 1, len(services))
assertNoError(t, e.registryOne.Deregister(&service2))
services, err = e.registryOne.GetService(service1.Name)
assertNoError(t, err)
assertEqual(t, 0, len(services))
}
func TestGetService(t *testing.T) {
services, err := e.registryTwo.GetService("one")
assertNoError(t, err)
assertEqual(t, 1, len(services))
assertEqual(t, "one", services[0].Name)
assertEqual(t, 1, len(services[0].Nodes))
}
func TestGetServiceWithNoNodes(t *testing.T) {
services, err := e.registryOne.GetService("missing")
assertNoError(t, err)
assertEqual(t, 0, len(services))
}
func TestGetServiceFromMultipleNodes(t *testing.T) {
services, err := e.registryOne.GetService("two")
assertNoError(t, err)
assertEqual(t, 1, len(services))
assertEqual(t, "two", services[0].Name)
assertEqual(t, 2, len(services[0].Nodes))
}
func BenchmarkGetService(b *testing.B) {
for n := 0; n < b.N; n++ {
services, err := e.registryTwo.GetService("one")
assertNoError(b, err)
assertEqual(b, 1, len(services))
assertEqual(b, "one", services[0].Name)
}
}
func BenchmarkGetServiceWithNoNodes(b *testing.B) {
for n := 0; n < b.N; n++ {
services, err := e.registryOne.GetService("missing")
assertNoError(b, err)
assertEqual(b, 0, len(services))
}
}
func BenchmarkGetServiceFromMultipleNodes(b *testing.B) {
for n := 0; n < b.N; n++ {
services, err := e.registryTwo.GetService("two")
assertNoError(b, err)
assertEqual(b, 1, len(services))
assertEqual(b, "two", services[0].Name)
assertEqual(b, 2, len(services[0].Nodes))
}
}

108
registry/nats_util.go Normal file
View File

@@ -0,0 +1,108 @@
//go:build nats
// +build nats
package registry
func cp(current []*Service) []*Service {
var services []*Service
for _, service := range current {
// copy service
s := new(Service)
*s = *service
// copy nodes
var nodes []*Node
for _, node := range service.Nodes {
n := new(Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*Endpoint
for _, ep := range service.Endpoints {
e := new(Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func addNodes(old, neu []*Node) []*Node {
for _, n := range neu {
var seen bool
for i, o := range old {
if o.Id == n.Id {
seen = true
old[i] = n
break
}
}
if !seen {
old = append(old, n)
}
}
return old
}
func addServices(old, neu []*Service) []*Service {
for _, s := range neu {
var seen bool
for i, o := range old {
if o.Version == s.Version {
s.Nodes = addNodes(o.Nodes, s.Nodes)
seen = true
old[i] = s
break
}
}
if !seen {
old = append(old, s)
}
}
return old
}
func delNodes(old, del []*Node) []*Node {
var nodes []*Node
for _, o := range old {
var rem bool
for _, n := range del {
if o.Id == n.Id {
rem = true
break
}
}
if !rem {
nodes = append(nodes, o)
}
}
return nodes
}
func delServices(old, del []*Service) []*Service {
var services []*Service
for i, o := range old {
var rem bool
for _, s := range del {
if o.Version == s.Version {
old[i].Nodes = delNodes(o.Nodes, s.Nodes)
if len(old[i].Nodes) == 0 {
rem = true
}
}
}
if !rem {
services = append(services, o)
}
}
return services
}

41
registry/nats_watcher.go Normal file
View File

@@ -0,0 +1,41 @@
//go:build nats
// +build nats
package registry
import (
"encoding/json"
"time"
"github.com/nats-io/nats.go"
)
type natsWatcher struct {
sub *nats.Subscription
wo WatchOptions
}
func (n *natsWatcher) Next() (*Result, error) {
var result *Result
for {
m, err := n.sub.NextMsg(time.Minute)
if err != nil && err == nats.ErrTimeout {
continue
} else if err != nil {
return nil, err
}
if err := json.Unmarshal(m.Data, &result); err != nil {
return nil, err
}
if len(n.wo.Service) > 0 && result.Service.Name != n.wo.Service {
continue
}
break
}
return result, nil
}
func (n *natsWatcher) Stop() {
n.sub.Unsubscribe()
}

166
registry/options_test.go Normal file
View File

@@ -0,0 +1,166 @@
//go:build nats
// +build nats
package registry
import (
"fmt"
"log"
"os"
"sync"
"testing"
"time"
"github.com/nats-io/nats.go"
)
var addrTestCases = []struct {
name string
description string
addrs map[string]string // expected address : set address
}{
{
"registryOption",
"set registry addresses through a registry.Option in constructor",
map[string]string{
"nats://192.168.10.1:5222": "192.168.10.1:5222",
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
},
{
"natsOption",
"set registry addresses through the nats.Option in constructor",
map[string]string{
"nats://192.168.10.1:5222": "192.168.10.1:5222",
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
},
{
"default",
"check if default Address is set correctly",
map[string]string{
nats.DefaultURL: "",
},
},
}
func TestInitAddrs(t *testing.T) {
for _, tc := range addrTestCases {
t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) {
var reg Registry
var addrs []string
for _, addr := range tc.addrs {
addrs = append(addrs, addr)
}
switch tc.name {
case "registryOption":
// we know that there are just two addrs in the dict
reg = NewRegistry(Addrs(addrs[0], addrs[1]))
case "natsOption":
nopts := nats.GetDefaultOptions()
nopts.Servers = addrs
reg = NewRegistry(Options(nopts))
case "default":
reg = NewRegistry()
}
// if err := reg.Register(dummyService); err != nil {
// t.Fatal(err)
// }
natsRegistry, ok := reg.(*natsRegistry)
if !ok {
t.Fatal("Expected registry to be of types *natsRegistry")
}
// check if the same amount of addrs we set has actually been set
if len(natsRegistry.addrs) != len(tc.addrs) {
t.Errorf("Expected Addr = %v, Actual Addr = %v",
natsRegistry.addrs, tc.addrs)
t.Errorf("Expected Addr count = %d, Actual Addr count = %d",
len(natsRegistry.addrs), len(tc.addrs))
}
for _, addr := range natsRegistry.addrs {
_, ok := tc.addrs[addr]
if !ok {
t.Errorf("Expected Addr = %v, Actual Addr = %v",
natsRegistry.addrs, tc.addrs)
t.Errorf("Expected '%s' has not been set", addr)
}
}
})
}
}
func TestWatchQueryTopic(t *testing.T) {
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
log.Println("NATS_URL is undefined - skipping tests")
return
}
watchTopic := "custom.test.watch"
queryTopic := "custom.test.query"
wt := WatchTopic(watchTopic)
qt := QueryTopic(queryTopic)
// connect to NATS and subscribe to the Watch & Query topics where the
// registry will publish a msg
nopts := nats.GetDefaultOptions()
nopts.Servers = setAddrs([]string{natsURL})
conn, err := nopts.Connect()
if err != nil {
t.Fatal(err)
}
wg := sync.WaitGroup{}
wg.Add(2)
okCh := make(chan struct{})
// Wait until we have received something on both topics
go func() {
wg.Wait()
close(okCh)
}()
// handler just calls wg.Done()
rcvdHdlr := func(m *nats.Msg) {
wg.Done()
}
_, err = conn.Subscribe(queryTopic, rcvdHdlr)
if err != nil {
t.Fatal(err)
}
_, err = conn.Subscribe(watchTopic, rcvdHdlr)
if err != nil {
t.Fatal(err)
}
dummyService := &Service{
Name: "TestInitAddr",
Version: "1.0.0",
}
reg := NewRegistry(qt, wt, Addrs(natsURL))
// trigger registry to send out message on watchTopic
if err := reg.Register(dummyService); err != nil {
t.Fatal(err)
}
// trigger registry to send out message on queryTopic
if _, err := reg.ListServices(); err != nil {
t.Fatal(err)
}
// make sure that we received something on tc.topic
select {
case <-okCh:
// fine - we received on both topics a message from the registry
case <-time.After(time.Millisecond * 200):
t.Fatal("timeout - no data received on watch topic")
}
}

View File

@@ -6,8 +6,6 @@ import (
) )
var ( var (
DefaultRegistry = NewRegistry()
// Not found error when GetService is called. // Not found error when GetService is called.
ErrNotFound = errors.New("service not found") ErrNotFound = errors.New("service not found")
// Watcher stopped error when watcher is stopped. // Watcher stopped error when watcher is stopped.