mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-05 10:20:53 +02:00
de34f259ba
fixing test failed issue change back error type change registry.ErrNotFound back to selector.ErrNotFound change back error type change registry.ErrNotFound back to selector.ErrNotFound remove the single node tunnel test Fix read yaml config from memory package main import ( "fmt" "github.com/micro/go-micro/config" "github.com/micro/go-micro/config/source/memory" ) var configData = []byte(` --- a: 1234 `) func main() { memorySource := memory.NewSource( memory.WithYAML(configData), ) // Create new config conf := config.NewConfig() // Load file source conf.Load(memorySource) fmt.Println(string(conf.Bytes())) }
110 lines
1.9 KiB
Go
110 lines
1.9 KiB
Go
package selector
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/registry"
|
|
"github.com/micro/go-micro/registry/cache"
|
|
)
|
|
|
|
type registrySelector struct {
|
|
so Options
|
|
rc cache.Cache
|
|
}
|
|
|
|
func (c *registrySelector) newCache() cache.Cache {
|
|
ropts := []cache.Option{}
|
|
if c.so.Context != nil {
|
|
if t, ok := c.so.Context.Value("selector_ttl").(time.Duration); ok {
|
|
ropts = append(ropts, cache.WithTTL(t))
|
|
}
|
|
}
|
|
return cache.New(c.so.Registry, ropts...)
|
|
}
|
|
|
|
func (c *registrySelector) Init(opts ...Option) error {
|
|
for _, o := range opts {
|
|
o(&c.so)
|
|
}
|
|
|
|
c.rc.Stop()
|
|
c.rc = c.newCache()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *registrySelector) Options() Options {
|
|
return c.so
|
|
}
|
|
|
|
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
|
|
sopts := SelectOptions{
|
|
Strategy: c.so.Strategy,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(&sopts)
|
|
}
|
|
|
|
// get the service
|
|
// try the cache first
|
|
// if that fails go directly to the registry
|
|
services, err := c.rc.GetService(service)
|
|
if err != nil {
|
|
if err == registry.ErrNotFound {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// apply the filters
|
|
for _, filter := range sopts.Filters {
|
|
services = filter(services)
|
|
}
|
|
|
|
// if there's nothing left, return
|
|
if len(services) == 0 {
|
|
return nil, ErrNoneAvailable
|
|
}
|
|
|
|
return sopts.Strategy(services), nil
|
|
}
|
|
|
|
func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
|
|
}
|
|
|
|
func (c *registrySelector) Reset(service string) {
|
|
}
|
|
|
|
// Close stops the watcher and destroys the cache
|
|
func (c *registrySelector) Close() error {
|
|
c.rc.Stop()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *registrySelector) String() string {
|
|
return "registry"
|
|
}
|
|
|
|
func NewSelector(opts ...Option) Selector {
|
|
sopts := Options{
|
|
Strategy: Random,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(&sopts)
|
|
}
|
|
|
|
if sopts.Registry == nil {
|
|
sopts.Registry = registry.DefaultRegistry
|
|
}
|
|
|
|
s := ®istrySelector{
|
|
so: sopts,
|
|
}
|
|
s.rc = s.newCache()
|
|
|
|
return s
|
|
}
|