mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-22 03:38:41 +02:00
f50b731aa3
* fix proto codec
145 lines
2.8 KiB
Go
145 lines
2.8 KiB
Go
package config
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kratos/kratos/v2/encoding"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
)
|
|
|
|
var (
|
|
// ErrNotFound is key not found.
|
|
ErrNotFound = errors.New("key not found")
|
|
// ErrTypeAssert is type assert error.
|
|
ErrTypeAssert = errors.New("type assert error")
|
|
|
|
_ Config = (*config)(nil)
|
|
|
|
codec = encoding.GetCodec("json")
|
|
)
|
|
|
|
// Observer is config observer.
|
|
type Observer func(string, Value)
|
|
|
|
// Config is a config interface.
|
|
type Config interface {
|
|
Load() error
|
|
Scan(v interface{}) error
|
|
Value(key string) Value
|
|
Watch(key string, o Observer) error
|
|
Close() error
|
|
}
|
|
|
|
type config struct {
|
|
opts options
|
|
reader Reader
|
|
cached sync.Map
|
|
observers sync.Map
|
|
watchers []Watcher
|
|
log *log.Helper
|
|
}
|
|
|
|
// New new a config with options.
|
|
func New(opts ...Option) Config {
|
|
options := options{
|
|
logger: log.DefaultLogger,
|
|
decoder: func(kv *KeyValue, v map[string]interface{}) error {
|
|
return json.Unmarshal(kv.Value, &v)
|
|
},
|
|
}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
return &config{
|
|
opts: options,
|
|
reader: newReader(options),
|
|
log: log.NewHelper("config", options.logger),
|
|
}
|
|
}
|
|
|
|
func (c *config) watch(w Watcher) {
|
|
for {
|
|
kvs, err := w.Next()
|
|
if err != nil {
|
|
time.Sleep(time.Second)
|
|
c.log.Errorf("Failed to watch next config: %v", err)
|
|
continue
|
|
}
|
|
if err := c.reader.Merge(kvs...); err != nil {
|
|
c.log.Errorf("Failed to merge next config: %v", err)
|
|
continue
|
|
}
|
|
c.cached.Range(func(key, value interface{}) bool {
|
|
k := key.(string)
|
|
v := value.(Value)
|
|
if n, ok := c.reader.Value(k); ok && !reflect.DeepEqual(n.Load(), v.Load()) {
|
|
v.Store(n.Load())
|
|
if o, ok := c.observers.Load(k); ok {
|
|
o.(Observer)(k, v)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
|
|
func (c *config) Load() error {
|
|
for _, src := range c.opts.sources {
|
|
kvs, err := src.Load()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := c.reader.Merge(kvs...); err != nil {
|
|
c.log.Errorf("Failed to merge config source: %v", err)
|
|
return err
|
|
}
|
|
w, err := src.Watch()
|
|
if err != nil {
|
|
c.log.Errorf("Failed to watch config source: %v", err)
|
|
return err
|
|
}
|
|
go c.watch(w)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *config) Value(key string) Value {
|
|
if v, ok := c.cached.Load(key); ok {
|
|
return v.(Value)
|
|
}
|
|
if v, ok := c.reader.Value(key); ok {
|
|
c.cached.Store(key, v)
|
|
return v
|
|
}
|
|
return &errValue{err: ErrNotFound}
|
|
}
|
|
|
|
func (c *config) Scan(v interface{}) error {
|
|
data, err := c.reader.Source()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return codec.Unmarshal(data, v)
|
|
}
|
|
|
|
func (c *config) Watch(key string, o Observer) error {
|
|
if v := c.Value(key); v.Load() == nil {
|
|
return ErrNotFound
|
|
}
|
|
c.observers.Store(key, o)
|
|
return nil
|
|
}
|
|
|
|
func (c *config) Close() error {
|
|
for _, w := range c.watchers {
|
|
if err := w.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|