mirror of
https://github.com/go-kratos/kratos.git
synced 2025-03-17 21:07:54 +02:00
bc: apollo unable to get and watch to the properties file (#2269)
This commit is contained in:
parent
2d206076f8
commit
3f31b4d734
@ -7,7 +7,10 @@ import (
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
|
||||
"github.com/apolloconfig/agollo/v4"
|
||||
"github.com/apolloconfig/agollo/v4/constant"
|
||||
apolloConfig "github.com/apolloconfig/agollo/v4/env/config"
|
||||
"github.com/apolloconfig/agollo/v4/extension"
|
||||
"github.com/go-kratos/kratos/v2/encoding"
|
||||
)
|
||||
|
||||
type apollo struct {
|
||||
@ -15,6 +18,13 @@ type apollo struct {
|
||||
opt *options
|
||||
}
|
||||
|
||||
const (
|
||||
yaml = "yaml"
|
||||
yml = "yml"
|
||||
json = "json"
|
||||
properties = "properties"
|
||||
)
|
||||
|
||||
// Option is apollo option
|
||||
type Option func(*options)
|
||||
|
||||
@ -26,6 +36,7 @@ type options struct {
|
||||
namespace string
|
||||
isBackupConfig bool
|
||||
backupPath string
|
||||
originConfig bool
|
||||
}
|
||||
|
||||
// WithAppID with apollo config app id
|
||||
@ -84,6 +95,16 @@ func WithBackupPath(backupPath string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithOriginalConfig use the original configuration file without parse processing
|
||||
func WithOriginalConfig() Option {
|
||||
return func(o *options) {
|
||||
extension.AddFormatParser(constant.JSON, &jsonExtParser{})
|
||||
extension.AddFormatParser(constant.YAML, &yamlExtParser{})
|
||||
extension.AddFormatParser(constant.YML, &yamlExtParser{})
|
||||
o.originConfig = true
|
||||
}
|
||||
}
|
||||
|
||||
func NewSource(opts ...Option) config.Source {
|
||||
op := options{}
|
||||
for _, o := range opts {
|
||||
@ -108,31 +129,78 @@ func NewSource(opts ...Option) config.Source {
|
||||
|
||||
func format(ns string) string {
|
||||
arr := strings.Split(ns, ".")
|
||||
if len(arr) <= 1 {
|
||||
return "json"
|
||||
if len(arr) <= 1 || arr[len(arr)-1] == properties {
|
||||
return json
|
||||
}
|
||||
|
||||
return arr[len(arr)-1]
|
||||
}
|
||||
|
||||
func (e *apollo) load() []*config.KeyValue {
|
||||
kv := make([]*config.KeyValue, 0)
|
||||
kvs := make([]*config.KeyValue, 0)
|
||||
namespaces := strings.Split(e.opt.namespace, ",")
|
||||
|
||||
for _, ns := range namespaces {
|
||||
value, err := e.client.GetConfigCache(ns).Get("content")
|
||||
if err != nil {
|
||||
log.Warnw("apollo get config failed", "err", err)
|
||||
if !e.opt.originConfig {
|
||||
kv, err := e.getConfig(ns)
|
||||
if err != nil {
|
||||
log.Errorf("apollo get config failed,err:%v", err)
|
||||
continue
|
||||
}
|
||||
kvs = append(kvs, kv)
|
||||
continue
|
||||
}
|
||||
if strings.Contains(ns, ".") && !strings.Contains(ns, properties) &&
|
||||
(format(ns) == yaml || format(ns) == yml || format(ns) == json) {
|
||||
kv, err := e.getOriginConfig(ns)
|
||||
if err != nil {
|
||||
log.Errorf("apollo get config failed,err:%v", err)
|
||||
continue
|
||||
}
|
||||
kvs = append(kvs, kv)
|
||||
continue
|
||||
} else {
|
||||
kv, err := e.getConfig(ns)
|
||||
if err != nil {
|
||||
log.Errorf("apollo get config failed,err:%v", err)
|
||||
continue
|
||||
}
|
||||
kvs = append(kvs, kv)
|
||||
}
|
||||
// serialize the namespace content KeyValue into bytes.
|
||||
kv = append(kv, &config.KeyValue{
|
||||
Key: ns,
|
||||
Value: []byte(value.(string)),
|
||||
Format: format(ns),
|
||||
})
|
||||
}
|
||||
return kvs
|
||||
}
|
||||
|
||||
return kv
|
||||
func (e *apollo) getConfig(ns string) (*config.KeyValue, error) {
|
||||
next := map[string]interface{}{}
|
||||
e.client.GetConfigCache(ns).Range(func(key, value interface{}) bool {
|
||||
// all values are out properties format
|
||||
resolve(genKey(ns, key.(string)), value, next)
|
||||
return true
|
||||
})
|
||||
f := format(ns)
|
||||
codec := encoding.GetCodec(f)
|
||||
val, err := codec.Marshal(next)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &config.KeyValue{
|
||||
Key: ns,
|
||||
Value: val,
|
||||
Format: f,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e apollo) getOriginConfig(ns string) (*config.KeyValue, error) {
|
||||
value, err := e.client.GetConfigCache(ns).Get("content")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// serialize the namespace content KeyValue into bytes.
|
||||
return &config.KeyValue{
|
||||
Key: ns,
|
||||
Value: []byte(value.(string)),
|
||||
Format: format(ns),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *apollo) Load() (kv []*config.KeyValue, err error) {
|
||||
@ -146,3 +214,54 @@ func (e *apollo) Watch() (config.Watcher, error) {
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// resolve convert kv pair into one map[string]interface{} by split key into different
|
||||
// map level. such as: app.name = "application" => map[app][name] = "application"
|
||||
func resolve(key string, value interface{}, target map[string]interface{}) {
|
||||
// expand key "aaa.bbb" into map[aaa]map[bbb]interface{}
|
||||
keys := strings.Split(key, ".")
|
||||
last := len(keys) - 1
|
||||
cursor := target
|
||||
|
||||
for i, k := range keys {
|
||||
if i == last {
|
||||
cursor[k] = value
|
||||
break
|
||||
}
|
||||
|
||||
// not the last key, be deeper
|
||||
v, ok := cursor[k]
|
||||
if !ok {
|
||||
// create a new map
|
||||
deeper := make(map[string]interface{})
|
||||
cursor[k] = deeper
|
||||
cursor = deeper
|
||||
continue
|
||||
}
|
||||
|
||||
// current exists, then check existing value type, if it's not map
|
||||
// that means duplicate keys, and at least one is not map instance.
|
||||
if cursor, ok = v.(map[string]interface{}); !ok {
|
||||
log.Warnf("duplicate key: %v\n", strings.Join(keys[:i+1], "."))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// genKey got the key of config.KeyValue pair.
|
||||
// eg: namespace.ext with subKey got namespace.subKey
|
||||
func genKey(ns, sub string) string {
|
||||
arr := strings.Split(ns, ".")
|
||||
if len(arr) < 1 {
|
||||
return sub
|
||||
}
|
||||
|
||||
if len(arr) == 1 {
|
||||
if ns == "" {
|
||||
return sub
|
||||
}
|
||||
return ns + "." + sub
|
||||
}
|
||||
|
||||
return strings.Join(arr[:len(arr)-1], ".") + "." + sub
|
||||
}
|
||||
|
@ -1,10 +1,5 @@
|
||||
package apollo
|
||||
|
||||
import (
|
||||
"github.com/apolloconfig/agollo/v4/constant"
|
||||
"github.com/apolloconfig/agollo/v4/extension"
|
||||
)
|
||||
|
||||
type jsonExtParser struct{}
|
||||
|
||||
func (parser jsonExtParser) Parse(configContent interface{}) (map[string]interface{}, error) {
|
||||
@ -13,13 +8,6 @@ func (parser jsonExtParser) Parse(configContent interface{}) (map[string]interfa
|
||||
|
||||
type yamlExtParser struct{}
|
||||
|
||||
func (parser yamlExtParser) Parse(configContent interface{}) (out map[string]interface{}, err error) {
|
||||
func (parser yamlExtParser) Parse(configContent interface{}) (map[string]interface{}, error) {
|
||||
return map[string]interface{}{"content": configContent}, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
// add json/yaml/yml format
|
||||
extension.AddFormatParser(constant.JSON, &jsonExtParser{})
|
||||
extension.AddFormatParser(constant.YAML, &yamlExtParser{})
|
||||
extension.AddFormatParser(constant.YML, &yamlExtParser{})
|
||||
}
|
||||
|
@ -2,6 +2,9 @@ package apollo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/encoding"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/config"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
@ -23,14 +26,38 @@ type customChangeListener struct {
|
||||
|
||||
func (c *customChangeListener) onChange(namespace string, changes map[string]*storage.ConfigChange) []*config.KeyValue {
|
||||
kv := make([]*config.KeyValue, 0, 2)
|
||||
value, err := c.apollo.client.GetConfigCache(namespace).Get("content")
|
||||
if strings.Contains(namespace, ".") && !strings.Contains(namespace, properties) &&
|
||||
(format(namespace) == yaml || format(namespace) == yml || format(namespace) == json) {
|
||||
value, err := c.apollo.client.GetConfigCache(namespace).Get("content")
|
||||
if err != nil {
|
||||
log.Warnw("apollo get config failed", "err", err)
|
||||
}
|
||||
kv = append(kv, &config.KeyValue{
|
||||
Key: namespace,
|
||||
Value: []byte(value.(string)),
|
||||
Format: format(namespace),
|
||||
})
|
||||
|
||||
return kv
|
||||
}
|
||||
|
||||
next := make(map[string]interface{})
|
||||
|
||||
for key, change := range changes {
|
||||
resolve(genKey(namespace, key), change.NewValue, next)
|
||||
}
|
||||
|
||||
f := format(namespace)
|
||||
codec := encoding.GetCodec(f)
|
||||
val, err := codec.Marshal(next)
|
||||
if err != nil {
|
||||
log.Warnw("apollo get config failed", "err", err)
|
||||
log.Warnf("apollo could not handle namespace %s: %v", namespace, err)
|
||||
return nil
|
||||
}
|
||||
kv = append(kv, &config.KeyValue{
|
||||
Key: namespace,
|
||||
Value: []byte(value.(string)),
|
||||
Format: format(namespace),
|
||||
Value: val,
|
||||
Format: f,
|
||||
})
|
||||
|
||||
return kv
|
||||
|
Loading…
x
Reference in New Issue
Block a user