1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-12 08:23:58 +02:00
go-micro/plugins/config/source/nacos/watcher.go
2021-10-12 12:55:53 +01:00

73 lines
1.4 KiB
Go

package nacos
import (
"time"
"go-micro.dev/v4/config/encoder"
"go-micro.dev/v4/config/source"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
type watcher struct {
configClient config_client.IConfigClient
e encoder.Encoder
name string
group, dataId string
ch chan *source.ChangeSet
exit chan bool
}
func newConfigWatcher(cc config_client.IConfigClient, e encoder.Encoder, name, group, dataId string) (source.Watcher, error) {
w := &watcher{
e: e,
name: name,
configClient: cc,
group: group,
dataId: dataId,
ch: make(chan *source.ChangeSet),
exit: make(chan bool),
}
err := w.configClient.ListenConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
OnChange: w.callback,
})
return w, err
}
func (w *watcher) callback(namespace, group, dataId, data string) {
cs := &source.ChangeSet{
Timestamp: time.Now(),
Format: w.e.String(),
Source: w.name,
Data: []byte(data),
}
cs.Checksum = cs.Sum()
w.ch <- cs
}
func (w *watcher) Next() (*source.ChangeSet, error) {
select {
case cs := <-w.ch:
return cs, nil
case <-w.exit:
return nil, source.ErrWatcherStopped
}
}
func (w *watcher) Stop() error {
select {
case <-w.exit:
return nil
default:
close(w.exit)
}
return nil
}