1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-17 17:44:30 +02:00
go-micro/config/default.go

303 lines
4.4 KiB
Go
Raw Normal View History

2019-05-30 23:11:13 +01:00
package config
import (
"bytes"
"sync"
"time"
2021-10-12 12:55:53 +01:00
"go-micro.dev/v4/config/loader"
"go-micro.dev/v4/config/loader/memory"
"go-micro.dev/v4/config/reader"
"go-micro.dev/v4/config/reader/json"
"go-micro.dev/v4/config/source"
2019-05-30 23:11:13 +01:00
)
type config struct {
exit chan bool
opts Options
sync.RWMutex
// the current snapshot
snap *loader.Snapshot
// the current values
vals reader.Values
}
type watcher struct {
lw loader.Watcher
rd reader.Reader
path []string
value reader.Value
}
2020-01-19 16:31:02 +08:00
func newConfig(opts ...Option) (Config, error) {
var c config
err := c.Init(opts...)
if err != nil {
return nil, err
}
go c.run()
return &c, nil
}
func (c *config) Init(opts ...Option) error {
c.opts = Options{
2019-05-30 23:11:13 +01:00
Reader: json.NewReader(),
}
c.exit = make(chan bool)
2019-05-30 23:11:13 +01:00
for _, o := range opts {
o(&c.opts)
2019-05-30 23:11:13 +01:00
}
// default loader uses the configured reader
if c.opts.Loader == nil {
c.opts.Loader = memory.NewLoader(memory.WithReader(c.opts.Reader))
}
err := c.opts.Loader.Load(c.opts.Source...)
2020-01-19 16:42:05 +08:00
if err != nil {
return err
2020-01-19 16:42:05 +08:00
}
c.snap, err = c.opts.Loader.Snapshot()
2020-01-19 16:42:05 +08:00
if err != nil {
return err
2020-01-19 16:42:05 +08:00
}
2019-05-30 23:11:13 +01:00
c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
if err != nil {
return err
2019-05-30 23:11:13 +01:00
}
return nil
2019-05-30 23:11:13 +01:00
}
2020-03-31 17:13:21 +01:00
func (c *config) Options() Options {
return c.opts
}
2019-05-30 23:11:13 +01:00
func (c *config) run() {
watch := func(w loader.Watcher) error {
for {
// get changeset
snap, err := w.Next()
if err != nil {
return err
}
c.Lock()
if c.snap.Version >= snap.Version {
c.Unlock()
continue
}
2019-05-30 23:11:13 +01:00
// save
c.snap = snap
// set values
c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
c.Unlock()
}
}
for {
w, err := c.opts.Loader.Watch()
if err != nil {
time.Sleep(time.Second)
continue
}
done := make(chan bool)
// the stop watch func
go func() {
select {
case <-done:
case <-c.exit:
}
w.Stop()
}()
// block watch
if err := watch(w); err != nil {
// do something better
time.Sleep(time.Second)
}
// close done chan
close(done)
// if the config is closed exit
select {
case <-c.exit:
return
default:
}
}
}
func (c *config) Map() map[string]interface{} {
c.RLock()
defer c.RUnlock()
return c.vals.Map()
}
func (c *config) Scan(v interface{}) error {
c.RLock()
defer c.RUnlock()
return c.vals.Scan(v)
}
// sync loads all the sources, calls the parser and updates the config
func (c *config) Sync() error {
if err := c.opts.Loader.Sync(); err != nil {
return err
}
snap, err := c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.snap = snap
vals, err := c.opts.Reader.Values(snap.ChangeSet)
if err != nil {
return err
}
c.vals = vals
return nil
}
func (c *config) Close() error {
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (c *config) Get(path ...string) reader.Value {
c.RLock()
defer c.RUnlock()
// did sync actually work?
if c.vals != nil {
return c.vals.Get(path...)
}
// no value
return newValue()
}
func (c *config) Set(val interface{}, path ...string) {
c.Lock()
defer c.Unlock()
if c.vals != nil {
c.vals.Set(val, path...)
}
return
}
func (c *config) Del(path ...string) {
c.Lock()
defer c.Unlock()
if c.vals != nil {
c.vals.Del(path...)
}
return
}
2019-05-30 23:11:13 +01:00
func (c *config) Bytes() []byte {
c.RLock()
defer c.RUnlock()
if c.vals == nil {
return []byte{}
}
return c.vals.Bytes()
}
func (c *config) Load(sources ...source.Source) error {
if err := c.opts.Loader.Load(sources...); err != nil {
return err
}
snap, err := c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.snap = snap
vals, err := c.opts.Reader.Values(snap.ChangeSet)
if err != nil {
return err
}
c.vals = vals
return nil
}
func (c *config) Watch(path ...string) (Watcher, error) {
value := c.Get(path...)
w, err := c.opts.Loader.Watch(path...)
if err != nil {
return nil, err
}
return &watcher{
lw: w,
rd: c.opts.Reader,
path: path,
value: value,
}, nil
}
func (c *config) String() string {
return "config"
}
func (w *watcher) Next() (reader.Value, error) {
for {
s, err := w.lw.Next()
if err != nil {
return nil, err
}
// only process changes
if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
continue
}
v, err := w.rd.Values(s.ChangeSet)
if err != nil {
return nil, err
}
w.value = v.Get()
return w.value, nil
}
}
func (w *watcher) Stop() error {
return w.lw.Stop()
}