diff --git a/plugins/config/source/nats/README.md b/plugins/config/source/nats/README.md new file mode 100644 index 00000000..b3d1c636 --- /dev/null +++ b/plugins/config/source/nats/README.md @@ -0,0 +1,56 @@ +# Nats Source + +The nats source reads config from nats key/values + +## Nats Format + +The nats source expects keys under the default bucket `default` default key `micro_config` + +Values are expected to be json + +``` +nats kv put default micro_config '{"nats": {"address": "10.0.0.1", "port": 8488}}' +``` + +``` +conf.Get("nats") +``` + +## New Source + +Specify source with data + +```go +natsSource := nats.NewSource( + nats.WithUrl("127.0.0.1:4222"), + nats.WithBucket("my_bucket"), + nats.WithKey("my_key"), +) +``` + +## Load Source + +Load the source into config + +```go +// Create new config +conf := config.NewConfig() + +// Load nats source +conf.Load(natsSource) +``` + +## Watch + +```go +wh, _ := natsSource.Watch() + +for { + v, err := watcher.Next() + if err != nil { + log.Fatalf("err %v", err) + } + + log.Infof("data %v", string(v.Data)) +} +``` diff --git a/plugins/config/source/nats/go.mod b/plugins/config/source/nats/go.mod new file mode 100644 index 00000000..c783072b --- /dev/null +++ b/plugins/config/source/nats/go.mod @@ -0,0 +1,29 @@ +module github.com/asim/go-micro/plugins/config/source/nats/v4 + +go 1.17 + +require ( + github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc + go-micro.dev/v4 v4.5.0 +) + +require ( + github.com/bitly/go-simplejson v0.5.0 // indirect + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/uuid v1.2.0 // indirect + github.com/imdario/mergo v0.3.12 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/miekg/dns v1.1.43 // indirect + github.com/nats-io/nats-server/v2 v2.6.6 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect + golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect + golang.org/x/text v0.3.6 // indirect + google.golang.org/protobuf v1.27.1 // indirect +) diff --git a/plugins/config/source/nats/go.sum b/plugins/config/source/nats/go.sum new file mode 100644 index 00000000..1caee8b2 --- /dev/null +++ b/plugins/config/source/nats/go.sum @@ -0,0 +1,102 @@ +github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= +github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= +github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg= +github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE= +github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.6.6 h1:t6LcqHuMXhylQ/j8078zDUSc7sE0FBMcN8jwObAriTc= +github.com/nats-io/nats-server/v2 v2.6.6/go.mod h1:9sdEkBhyZMQG1M9TevnlYUwMusRACn2vlgOeqoHKwVo= +github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU= +github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= +github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +go-micro.dev/v4 v4.5.0 h1:0MQxFupE1pxhiamf8FvOyGL0N+ezxM73czZbBy3S/Kg= +go-micro.dev/v4 v4.5.0/go.mod h1:hSBUne6gtYTfYmnNxGQmaNmRQ6z8LqGrAVNmL/ae0lY= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I= +golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= diff --git a/plugins/config/source/nats/nats.go b/plugins/config/source/nats/nats.go new file mode 100644 index 00000000..b7e4fedb --- /dev/null +++ b/plugins/config/source/nats/nats.go @@ -0,0 +1,134 @@ +package nats + +import ( + "fmt" + "net" + "strings" + "time" + + natsgo "github.com/nats-io/nats.go" + "go-micro.dev/v4/config/source" + log "go-micro.dev/v4/logger" +) + +type nats struct { + url string + bucket string + key string + kv natsgo.KeyValue + opts source.Options +} + +// DefaultBucket is the bucket that nats keys will be assumed to have if you +// haven't specified one +var ( + DefaultBucket = "default" + DefaultKey = "micro_config" +) + +func (n *nats) Read() (*source.ChangeSet, error) { + e, err := n.kv.Get(n.key) + if err != nil { + if err == natsgo.ErrKeyNotFound { + return nil, nil + } + return nil, err + } + + if e.Value() == nil || len(e.Value()) == 0 { + return nil, fmt.Errorf("source not found: %s", n.key) + } + + cs := &source.ChangeSet{ + Data: e.Value(), + Format: n.opts.Encoder.String(), + Source: n.String(), + Timestamp: time.Now(), + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func (n *nats) Write(cs *source.ChangeSet) error { + _, err := n.kv.Put(n.key, cs.Data) + if err != nil { + return err + } + + return nil +} + +func (n *nats) String() string { + return "nats" +} + +func (n *nats) Watch() (source.Watcher, error) { + return newWatcher(n.kv, n.bucket, n.key, n.String(), n.opts.Encoder) +} + +func NewSource(opts ...source.Option) source.Source { + options := source.NewOptions(opts...) + + config := natsgo.DefaultOptions + + urls, ok := options.Context.Value(urlKey{}).([]string) + endpoints := []string{} + if ok { + for _, u := range urls { + addr, port, err := net.SplitHostPort(u) + if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { + port = "4222" + addr = u + endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) + } else if err == nil { + endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) + } + } + } + if len(endpoints) == 0 { + endpoints = append(endpoints, "127.0.0.1:4222") + } + + bucket, ok := options.Context.Value(bucketKey{}).(string) + if !ok { + bucket = DefaultBucket + } + + key, ok := options.Context.Value(keyKey{}).(string) + if !ok { + key = DefaultKey + } + + config.Url = strings.Join(endpoints, ",") + + nc, err := natsgo.Connect(config.Url) + if err != nil { + log.Error(err) + } + + js, err := nc.JetStream(natsgo.MaxWait(10 * time.Second)) + if err != nil { + log.Error(err) + } + + kv, err := js.KeyValue(bucket) + if err == natsgo.ErrBucketNotFound || err == natsgo.ErrKeyNotFound { + kv, err = js.CreateKeyValue(&natsgo.KeyValueConfig{Bucket: bucket}) + if err != nil { + log.Error(err) + } + } + + if err != nil { + log.Error(err) + } + + return &nats{ + url: config.Url, + bucket: bucket, + key: key, + kv: kv, + opts: options, + } +} diff --git a/plugins/config/source/nats/options.go b/plugins/config/source/nats/options.go new file mode 100644 index 00000000..0027405f --- /dev/null +++ b/plugins/config/source/nats/options.go @@ -0,0 +1,54 @@ +package nats + +import ( + "context" + "time" + + natsgo "github.com/nats-io/nats.go" + "go-micro.dev/v4/config/source" +) + +type ( + urlKey struct{} + bucketKey struct{} + keyKey struct{} +) + +// WithUrl sets the nats url +func WithUrl(a string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, urlKey{}, a) + } +} + +// WithBucket sets the nats key +func WithBucket(a string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, bucketKey{}, a) + } +} + +// WithKey sets the nats key +func WithKey(a string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, keyKey{}, a) + } +} + +func Client(url string) (natsgo.JetStreamContext, error) { + nc, err := natsgo.Connect(url) + if err != nil { + return nil, err + } + + return nc.JetStream(natsgo.MaxWait(10 * time.Second)) +} diff --git a/plugins/config/source/nats/watcher.go b/plugins/config/source/nats/watcher.go new file mode 100644 index 00000000..722905dd --- /dev/null +++ b/plugins/config/source/nats/watcher.go @@ -0,0 +1,79 @@ +package nats + +import ( + "time" + + natsgo "github.com/nats-io/nats.go" + "go-micro.dev/v4/config/encoder" + "go-micro.dev/v4/config/source" +) + +type watcher struct { + e encoder.Encoder + name string + bucket string + key string + + ch chan *source.ChangeSet + exit chan bool +} + +func newWatcher(kv natsgo.KeyValue, bucket, key, name string, e encoder.Encoder) (source.Watcher, error) { + w := &watcher{ + e: e, + name: name, + bucket: bucket, + key: key, + ch: make(chan *source.ChangeSet), + exit: make(chan bool), + } + + wh, _ := kv.Watch(key) + + go func() { + for { + select { + case v := <-wh.Updates(): + if v != nil { + w.handle(v.Value()) + } + case <-w.exit: + _ = wh.Stop() + return + } + } + }() + return w, nil +} + +func (w *watcher) handle(data []byte) { + cs := &source.ChangeSet{ + Timestamp: time.Now(), + Format: w.e.String(), + Source: w.name, + Data: data, + } + cs.Checksum = cs.Sum() + + w.ch <- cs +} + +func (w *watcher) Next() (*source.ChangeSet, error) { + select { + case cs := <-w.ch: + return cs, nil + case <-w.exit: + return nil, source.ErrWatcherStopped + } +} + +func (w *watcher) Stop() error { + select { + case <-w.exit: + return nil + default: + close(w.exit) + } + + return nil +}