mirror of
https://github.com/go-micro/go-micro.git
synced 2025-11-29 21:47:44 +02:00
move selector
This commit is contained in:
47
selector/common_test.go
Normal file
47
selector/common_test.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
)
|
||||
|
||||
var (
|
||||
// mock data
|
||||
testData = map[string][]*registry.Service{
|
||||
"foo": {
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.0-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
{
|
||||
Id: "foo-1.0.0-321",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.1-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.3-345",
|
||||
Address: "localhost:8888",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
109
selector/default.go
Normal file
109
selector/default.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
"github.com/micro/go-micro/v2/registry/cache"
|
||||
)
|
||||
|
||||
type registrySelector struct {
|
||||
so Options
|
||||
rc cache.Cache
|
||||
}
|
||||
|
||||
func (c *registrySelector) newCache() cache.Cache {
|
||||
ropts := []cache.Option{}
|
||||
if c.so.Context != nil {
|
||||
if t, ok := c.so.Context.Value("selector_ttl").(time.Duration); ok {
|
||||
ropts = append(ropts, cache.WithTTL(t))
|
||||
}
|
||||
}
|
||||
return cache.New(c.so.Registry, ropts...)
|
||||
}
|
||||
|
||||
func (c *registrySelector) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&c.so)
|
||||
}
|
||||
|
||||
c.rc.Stop()
|
||||
c.rc = c.newCache()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *registrySelector) Options() Options {
|
||||
return c.so
|
||||
}
|
||||
|
||||
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
|
||||
sopts := SelectOptions{
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
// get the service
|
||||
// try the cache first
|
||||
// if that fails go directly to the registry
|
||||
services, err := c.rc.GetService(service)
|
||||
if err != nil {
|
||||
if err == registry.ErrNotFound {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// apply the filters
|
||||
for _, filter := range sopts.Filters {
|
||||
services = filter(services)
|
||||
}
|
||||
|
||||
// if there's nothing left, return
|
||||
if len(services) == 0 {
|
||||
return nil, ErrNoneAvailable
|
||||
}
|
||||
|
||||
return sopts.Strategy(services), nil
|
||||
}
|
||||
|
||||
func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
|
||||
}
|
||||
|
||||
func (c *registrySelector) Reset(service string) {
|
||||
}
|
||||
|
||||
// Close stops the watcher and destroys the cache
|
||||
func (c *registrySelector) Close() error {
|
||||
c.rc.Stop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *registrySelector) String() string {
|
||||
return "registry"
|
||||
}
|
||||
|
||||
func NewSelector(opts ...Option) Selector {
|
||||
sopts := Options{
|
||||
Strategy: Random,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
if sopts.Registry == nil {
|
||||
sopts.Registry = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
s := ®istrySelector{
|
||||
so: sopts,
|
||||
}
|
||||
s.rc = s.newCache()
|
||||
|
||||
return s
|
||||
}
|
||||
32
selector/default_test.go
Normal file
32
selector/default_test.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry/memory"
|
||||
)
|
||||
|
||||
func TestRegistrySelector(t *testing.T) {
|
||||
counts := map[string]int{}
|
||||
|
||||
r := memory.NewRegistry(memory.Services(testData))
|
||||
cache := NewSelector(Registry(r))
|
||||
|
||||
next, err := cache.Select("foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error calling cache select: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
node, err := next()
|
||||
if err != nil {
|
||||
t.Errorf("Expected node err, got err: %v", err)
|
||||
}
|
||||
counts[node.Id]++
|
||||
}
|
||||
|
||||
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
|
||||
t.Logf("Selector Counts %v", counts)
|
||||
}
|
||||
}
|
||||
124
selector/dns/dns.go
Normal file
124
selector/dns/dns.go
Normal file
@@ -0,0 +1,124 @@
|
||||
// Package dns provides a dns SRV selector
|
||||
package dns
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
"github.com/micro/go-micro/v2/selector"
|
||||
)
|
||||
|
||||
type dnsSelector struct {
|
||||
options selector.Options
|
||||
domain string
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultDomain = "local"
|
||||
)
|
||||
|
||||
func (d *dnsSelector) Init(opts ...selector.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&d.options)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dnsSelector) Options() selector.Options {
|
||||
return d.options
|
||||
}
|
||||
|
||||
func (d *dnsSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
var srv []*net.SRV
|
||||
|
||||
// check if its host:port
|
||||
host, port, err := net.SplitHostPort(service)
|
||||
// not host:port
|
||||
if err != nil {
|
||||
// lookup the SRV record
|
||||
_, srvs, err := net.LookupSRV(service, "tcp", d.domain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// set SRV records
|
||||
srv = srvs
|
||||
// got host:port
|
||||
} else {
|
||||
p, _ := strconv.Atoi(port)
|
||||
|
||||
// lookup the A record
|
||||
ips, err := net.LookupHost(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create SRV records
|
||||
for _, ip := range ips {
|
||||
srv = append(srv, &net.SRV{
|
||||
Target: ip,
|
||||
Port: uint16(p),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
nodes := make([]*registry.Node, 0, len(srv))
|
||||
for _, node := range srv {
|
||||
nodes = append(nodes, ®istry.Node{
|
||||
Id: node.Target,
|
||||
Address: fmt.Sprintf("%s:%d", node.Target, node.Port),
|
||||
})
|
||||
}
|
||||
|
||||
services := []*registry.Service{
|
||||
{
|
||||
Name: service,
|
||||
Nodes: nodes,
|
||||
},
|
||||
}
|
||||
|
||||
sopts := selector.SelectOptions{
|
||||
Strategy: d.options.Strategy,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
// apply the filters
|
||||
for _, filter := range sopts.Filters {
|
||||
services = filter(services)
|
||||
}
|
||||
|
||||
// if there's nothing left, return
|
||||
if len(services) == 0 {
|
||||
return nil, selector.ErrNoneAvailable
|
||||
}
|
||||
|
||||
return sopts.Strategy(services), nil
|
||||
}
|
||||
|
||||
func (d *dnsSelector) Mark(service string, node *registry.Node, err error) {}
|
||||
|
||||
func (d *dnsSelector) Reset(service string) {}
|
||||
|
||||
func (d *dnsSelector) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dnsSelector) String() string {
|
||||
return "dns"
|
||||
}
|
||||
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
options := selector.Options{
|
||||
Strategy: selector.Random,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return &dnsSelector{options: options, domain: DefaultDomain}
|
||||
}
|
||||
73
selector/filter.go
Normal file
73
selector/filter.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
)
|
||||
|
||||
// FilterEndpoint is an endpoint based Select Filter which will
|
||||
// only return services with the endpoint specified.
|
||||
func FilterEndpoint(name string) Filter {
|
||||
return func(old []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, service := range old {
|
||||
for _, ep := range service.Endpoints {
|
||||
if ep.Name == name {
|
||||
services = append(services, service)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
}
|
||||
|
||||
// FilterLabel is a label based Select Filter which will
|
||||
// only return services with the label specified.
|
||||
func FilterLabel(key, val string) Filter {
|
||||
return func(old []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, service := range old {
|
||||
serv := new(registry.Service)
|
||||
var nodes []*registry.Node
|
||||
|
||||
for _, node := range service.Nodes {
|
||||
if node.Metadata == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if node.Metadata[key] == val {
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
}
|
||||
|
||||
// only add service if there's some nodes
|
||||
if len(nodes) > 0 {
|
||||
// copy
|
||||
*serv = *service
|
||||
serv.Nodes = nodes
|
||||
services = append(services, serv)
|
||||
}
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
}
|
||||
|
||||
// FilterVersion is a version based Select Filter which will
|
||||
// only return services with the version specified.
|
||||
func FilterVersion(version string) Filter {
|
||||
return func(old []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, service := range old {
|
||||
if service.Version == version {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
}
|
||||
239
selector/filter_test.go
Normal file
239
selector/filter_test.go
Normal file
@@ -0,0 +1,239 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
)
|
||||
|
||||
func TestFilterEndpoint(t *testing.T) {
|
||||
testData := []struct {
|
||||
services []*registry.Service
|
||||
endpoint string
|
||||
count int
|
||||
}{
|
||||
{
|
||||
services: []*registry.Service{
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.0.0",
|
||||
Endpoints: []*registry.Endpoint{
|
||||
{
|
||||
Name: "Foo.Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.1.0",
|
||||
Endpoints: []*registry.Endpoint{
|
||||
{
|
||||
Name: "Baz.Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
endpoint: "Foo.Bar",
|
||||
count: 1,
|
||||
},
|
||||
{
|
||||
services: []*registry.Service{
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.0.0",
|
||||
Endpoints: []*registry.Endpoint{
|
||||
{
|
||||
Name: "Foo.Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.1.0",
|
||||
Endpoints: []*registry.Endpoint{
|
||||
{
|
||||
Name: "Foo.Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
endpoint: "Bar.Baz",
|
||||
count: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
filter := FilterEndpoint(data.endpoint)
|
||||
services := filter(data.services)
|
||||
|
||||
if len(services) != data.count {
|
||||
t.Fatalf("Expected %d services, got %d", data.count, len(services))
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
var seen bool
|
||||
|
||||
for _, ep := range service.Endpoints {
|
||||
if ep.Name == data.endpoint {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !seen && data.count > 0 {
|
||||
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterLabel(t *testing.T) {
|
||||
testData := []struct {
|
||||
services []*registry.Service
|
||||
label [2]string
|
||||
count int
|
||||
}{
|
||||
{
|
||||
services: []*registry.Service{
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test-1",
|
||||
Address: "localhost",
|
||||
Metadata: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.1.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test-2",
|
||||
Address: "localhost",
|
||||
Metadata: map[string]string{
|
||||
"foo": "baz",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
label: [2]string{"foo", "bar"},
|
||||
count: 1,
|
||||
},
|
||||
{
|
||||
services: []*registry.Service{
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test-1",
|
||||
Address: "localhost",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.1.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test-2",
|
||||
Address: "localhost",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
label: [2]string{"foo", "bar"},
|
||||
count: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
filter := FilterLabel(data.label[0], data.label[1])
|
||||
services := filter(data.services)
|
||||
|
||||
if len(services) != data.count {
|
||||
t.Fatalf("Expected %d services, got %d", data.count, len(services))
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
var seen bool
|
||||
|
||||
for _, node := range service.Nodes {
|
||||
if node.Metadata[data.label[0]] != data.label[1] {
|
||||
t.Fatalf("Expected %s=%s but got %s=%s for service %+v node %+v",
|
||||
data.label[0], data.label[1], data.label[0], node.Metadata[data.label[0]], service, node)
|
||||
}
|
||||
seen = true
|
||||
}
|
||||
|
||||
if !seen {
|
||||
t.Fatalf("Expected node for %s=%s but saw none; results %+v", data.label[0], data.label[1], service)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterVersion(t *testing.T) {
|
||||
testData := []struct {
|
||||
services []*registry.Service
|
||||
version string
|
||||
count int
|
||||
}{
|
||||
{
|
||||
services: []*registry.Service{
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.0.0",
|
||||
},
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.1.0",
|
||||
},
|
||||
},
|
||||
version: "1.0.0",
|
||||
count: 1,
|
||||
},
|
||||
{
|
||||
services: []*registry.Service{
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.0.0",
|
||||
},
|
||||
{
|
||||
Name: "test",
|
||||
Version: "1.1.0",
|
||||
},
|
||||
},
|
||||
version: "2.0.0",
|
||||
count: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
filter := FilterVersion(data.version)
|
||||
services := filter(data.services)
|
||||
|
||||
if len(services) != data.count {
|
||||
t.Fatalf("Expected %d services, got %d", data.count, len(services))
|
||||
}
|
||||
|
||||
var seen bool
|
||||
|
||||
for _, service := range services {
|
||||
if service.Version != data.version {
|
||||
t.Fatalf("Expected version %s, got %s", data.version, service.Version)
|
||||
}
|
||||
seen = true
|
||||
}
|
||||
|
||||
if !seen && data.count > 0 {
|
||||
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
|
||||
}
|
||||
}
|
||||
}
|
||||
60
selector/options.go
Normal file
60
selector/options.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Registry registry.Registry
|
||||
Strategy Strategy
|
||||
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type SelectOptions struct {
|
||||
Filters []Filter
|
||||
Strategy Strategy
|
||||
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// Option used to initialise the selector
|
||||
type Option func(*Options)
|
||||
|
||||
// SelectOption used when making a select call
|
||||
type SelectOption func(*SelectOptions)
|
||||
|
||||
// Registry sets the registry used by the selector
|
||||
func Registry(r registry.Registry) Option {
|
||||
return func(o *Options) {
|
||||
o.Registry = r
|
||||
}
|
||||
}
|
||||
|
||||
// SetStrategy sets the default strategy for the selector
|
||||
func SetStrategy(fn Strategy) Option {
|
||||
return func(o *Options) {
|
||||
o.Strategy = fn
|
||||
}
|
||||
}
|
||||
|
||||
// WithFilter adds a filter function to the list of filters
|
||||
// used during the Select call.
|
||||
func WithFilter(fn ...Filter) SelectOption {
|
||||
return func(o *SelectOptions) {
|
||||
o.Filters = append(o.Filters, fn...)
|
||||
}
|
||||
}
|
||||
|
||||
// Strategy sets the selector strategy
|
||||
func WithStrategy(fn Strategy) SelectOption {
|
||||
return func(o *SelectOptions) {
|
||||
o.Strategy = fn
|
||||
}
|
||||
}
|
||||
18
selector/registry/options.go
Normal file
18
selector/registry/options.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v2/selector"
|
||||
)
|
||||
|
||||
// Set the registry cache ttl
|
||||
func TTL(t time.Duration) selector.Option {
|
||||
return func(o *selector.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "selector_ttl", t)
|
||||
}
|
||||
}
|
||||
11
selector/registry/registry.go
Normal file
11
selector/registry/registry.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// Package registry uses the go-micro registry for selection
|
||||
package registry
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/v2/selector"
|
||||
)
|
||||
|
||||
// NewSelector returns a new registry selector
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
return selector.NewSelector(opts...)
|
||||
}
|
||||
139
selector/router/router.go
Normal file
139
selector/router/router.go
Normal file
@@ -0,0 +1,139 @@
|
||||
// Package router is a network/router selector
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/micro/go-micro/v2/network/router"
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
"github.com/micro/go-micro/v2/selector"
|
||||
)
|
||||
|
||||
type routerSelector struct {
|
||||
opts selector.Options
|
||||
|
||||
// the router
|
||||
r router.Router
|
||||
}
|
||||
|
||||
type routerKey struct{}
|
||||
|
||||
func (r *routerSelector) Init(opts ...selector.Option) error {
|
||||
// no op
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *routerSelector) Options() selector.Options {
|
||||
return r.opts
|
||||
}
|
||||
|
||||
func (r *routerSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
// TODO: pull routes asynchronously and cache
|
||||
routes, err := r.r.Lookup(
|
||||
router.QueryService(service),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// no routes return not found error
|
||||
if len(routes) == 0 {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
// TODO: apply filters by pseudo constructing service
|
||||
|
||||
// sort the routes based on metric
|
||||
sort.Slice(routes, func(i, j int) bool {
|
||||
return routes[i].Metric < routes[j].Metric
|
||||
})
|
||||
|
||||
// roundrobin assuming routes are in metric preference order
|
||||
var i int
|
||||
var mtx sync.Mutex
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
// get index and increment counter with every call to next
|
||||
mtx.Lock()
|
||||
idx := i
|
||||
i++
|
||||
mtx.Unlock()
|
||||
|
||||
// get route based on idx
|
||||
route := routes[idx%len(routes)]
|
||||
|
||||
// defaults to gateway and no port
|
||||
address := route.Address
|
||||
if len(route.Gateway) > 0 {
|
||||
address = route.Gateway
|
||||
}
|
||||
|
||||
// return as a node
|
||||
return ®istry.Node{
|
||||
// TODO: add id and metadata if we can
|
||||
Address: address,
|
||||
}, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *routerSelector) Mark(service string, node *registry.Node, err error) {
|
||||
// TODO: pass back metrics or information to the router
|
||||
}
|
||||
|
||||
func (r *routerSelector) Reset(service string) {
|
||||
// TODO: reset the metrics or information at the router
|
||||
}
|
||||
|
||||
func (r *routerSelector) Close() error {
|
||||
// stop the router advertisements
|
||||
return r.r.Stop()
|
||||
}
|
||||
|
||||
func (r *routerSelector) String() string {
|
||||
return "router"
|
||||
}
|
||||
|
||||
// NewSelector returns a new router based selector
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
options := selector.Options{
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// set default registry if not set
|
||||
if options.Registry == nil {
|
||||
options.Registry = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
// try get router from the context
|
||||
r, ok := options.Context.Value(routerKey{}).(router.Router)
|
||||
if !ok {
|
||||
// TODO: Use router.DefaultRouter?
|
||||
r = router.NewRouter(
|
||||
router.Registry(options.Registry),
|
||||
)
|
||||
}
|
||||
|
||||
go r.Advertise()
|
||||
|
||||
return &routerSelector{
|
||||
opts: options,
|
||||
// set the internal router
|
||||
r: r,
|
||||
}
|
||||
}
|
||||
|
||||
// WithRouter sets the router as an option
|
||||
func WithRouter(r router.Router) selector.Option {
|
||||
return func(o *selector.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, routerKey{}, r)
|
||||
}
|
||||
}
|
||||
43
selector/selector.go
Normal file
43
selector/selector.go
Normal file
@@ -0,0 +1,43 @@
|
||||
// Package selector is a way to pick a list of service nodes
|
||||
package selector
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
)
|
||||
|
||||
// Selector builds on the registry as a mechanism to pick nodes
|
||||
// and mark their status. This allows host pools and other things
|
||||
// to be built using various algorithms.
|
||||
type Selector interface {
|
||||
Init(opts ...Option) error
|
||||
Options() Options
|
||||
// Select returns a function which should return the next node
|
||||
Select(service string, opts ...SelectOption) (Next, error)
|
||||
// Mark sets the success/error against a node
|
||||
Mark(service string, node *registry.Node, err error)
|
||||
// Reset returns state back to zero for a service
|
||||
Reset(service string)
|
||||
// Close renders the selector unusable
|
||||
Close() error
|
||||
// Name of the selector
|
||||
String() string
|
||||
}
|
||||
|
||||
// Next is a function that returns the next node
|
||||
// based on the selector's strategy
|
||||
type Next func() (*registry.Node, error)
|
||||
|
||||
// Filter is used to filter a service during the selection process
|
||||
type Filter func([]*registry.Service) []*registry.Service
|
||||
|
||||
// Strategy is a selection strategy e.g random, round robin
|
||||
type Strategy func([]*registry.Service) Next
|
||||
|
||||
var (
|
||||
DefaultSelector = NewSelector()
|
||||
|
||||
ErrNotFound = errors.New("not found")
|
||||
ErrNoneAvailable = errors.New("none available")
|
||||
)
|
||||
58
selector/static/static.go
Normal file
58
selector/static/static.go
Normal file
@@ -0,0 +1,58 @@
|
||||
// Package static provides a static resolver which returns the name/ip passed in without any change
|
||||
package static
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
"github.com/micro/go-micro/v2/selector"
|
||||
)
|
||||
|
||||
// staticSelector is a static selector
|
||||
type staticSelector struct {
|
||||
opts selector.Options
|
||||
}
|
||||
|
||||
func (s *staticSelector) Init(opts ...selector.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&s.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *staticSelector) Options() selector.Options {
|
||||
return s.opts
|
||||
}
|
||||
|
||||
func (s *staticSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
return func() (*registry.Node, error) {
|
||||
return ®istry.Node{
|
||||
Id: service,
|
||||
Address: service,
|
||||
}, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *staticSelector) Mark(service string, node *registry.Node, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *staticSelector) Reset(service string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *staticSelector) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *staticSelector) String() string {
|
||||
return "static"
|
||||
}
|
||||
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
var options selector.Options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &staticSelector{
|
||||
opts: options,
|
||||
}
|
||||
}
|
||||
56
selector/strategy.go
Normal file
56
selector/strategy.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// Random is a random strategy algorithm for node selection
|
||||
func Random(services []*registry.Service) Next {
|
||||
nodes := make([]*registry.Node, 0, len(services))
|
||||
|
||||
for _, service := range services {
|
||||
nodes = append(nodes, service.Nodes...)
|
||||
}
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
if len(nodes) == 0 {
|
||||
return nil, ErrNoneAvailable
|
||||
}
|
||||
|
||||
i := rand.Int() % len(nodes)
|
||||
return nodes[i], nil
|
||||
}
|
||||
}
|
||||
|
||||
// RoundRobin is a roundrobin strategy algorithm for node selection
|
||||
func RoundRobin(services []*registry.Service) Next {
|
||||
nodes := make([]*registry.Node, 0, len(services))
|
||||
|
||||
for _, service := range services {
|
||||
nodes = append(nodes, service.Nodes...)
|
||||
}
|
||||
|
||||
var i = rand.Int()
|
||||
var mtx sync.Mutex
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
if len(nodes) == 0 {
|
||||
return nil, ErrNoneAvailable
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
node := nodes[i%len(nodes)]
|
||||
i++
|
||||
mtx.Unlock()
|
||||
|
||||
return node, nil
|
||||
}
|
||||
}
|
||||
58
selector/strategy_test.go
Normal file
58
selector/strategy_test.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
)
|
||||
|
||||
func TestStrategies(t *testing.T) {
|
||||
testData := []*registry.Service{
|
||||
{
|
||||
Name: "test1",
|
||||
Version: "latest",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test1-1",
|
||||
Address: "10.0.0.1:1001",
|
||||
},
|
||||
{
|
||||
Id: "test1-2",
|
||||
Address: "10.0.0.2:1002",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "test1",
|
||||
Version: "default",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "test1-3",
|
||||
Address: "10.0.0.3:1003",
|
||||
},
|
||||
{
|
||||
Id: "test1-4",
|
||||
Address: "10.0.0.4:1004",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, strategy := range map[string]Strategy{"random": Random, "roundrobin": RoundRobin} {
|
||||
next := strategy(testData)
|
||||
counts := make(map[string]int)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
node, err := next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
counts[node.Id]++
|
||||
}
|
||||
|
||||
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
|
||||
t.Logf("%s: %+v\n", name, counts)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user