From eefb9c53d4af08ec662e6dd3b5ff6eb0a89b4143 Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 9 Dec 2015 19:23:16 +0000 Subject: [PATCH] Move to a selector package --- client/options.go | 9 +- client/rpc_client.go | 13 +-- examples/client/dc_filter/dc_filter.go | 8 +- examples/client/dc_selector/dc_selector.go | 16 ++-- examples/client/selector/selector.go | 17 ++-- registry/mock/mock.go | 70 ++++++++++++++ registry/mock_registry.go | 62 ------------- registry/registry.go | 7 -- registry/selector.go | 44 --------- .../blacklist}/black_list_selector.go | 31 ++++--- .../blacklist}/black_list_selector_test.go | 13 ++- selector/options.go | 34 +++++++ selector/random/random.go | 7 ++ {registry => selector}/random_selector.go | 20 ++-- .../random_selector_test.go | 12 ++- .../roundrobin}/round_robin_selector.go | 27 +++--- .../roundrobin}/round_robin_selector_test.go | 9 +- selector/selector.go | 93 +++++++++++++++++++ 18 files changed, 304 insertions(+), 188 deletions(-) create mode 100644 registry/mock/mock.go delete mode 100644 registry/mock_registry.go delete mode 100644 registry/selector.go rename {registry => selector/blacklist}/black_list_selector.go (75%) rename {registry => selector/blacklist}/black_list_selector_test.go (82%) create mode 100644 selector/options.go create mode 100644 selector/random/random.go rename {registry => selector}/random_selector.go (76%) rename {registry => selector}/random_selector_test.go (69%) rename {registry => selector/roundrobin}/round_robin_selector.go (58%) rename {registry => selector/roundrobin}/round_robin_selector_test.go (73%) create mode 100644 selector/selector.go diff --git a/client/options.go b/client/options.go index f529d0a5..ca0f6fc5 100644 --- a/client/options.go +++ b/client/options.go @@ -4,6 +4,7 @@ import ( "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" "github.com/micro/go-micro/transport" ) @@ -12,13 +13,13 @@ type options struct { broker broker.Broker codecs map[string]codec.NewCodec registry registry.Registry - selector registry.Selector + selector selector.Selector transport transport.Transport wrappers []Wrapper } type callOptions struct { - selectOptions []registry.SelectOption + selectOptions []selector.SelectOption } type publishOptions struct{} @@ -59,7 +60,7 @@ func Transport(t transport.Transport) Option { } // Select is used to select a node to route a request to -func Selector(s registry.Selector) Option { +func Selector(s selector.Selector) Option { return func(o *options) { o.selector = s } @@ -74,7 +75,7 @@ func Wrap(w Wrapper) Option { // Call Options -func WithSelectOption(so registry.SelectOption) CallOption { +func WithSelectOption(so selector.SelectOption) CallOption { return func(o *callOptions) { o.selectOptions = append(o.selectOptions, so) } diff --git a/client/rpc_client.go b/client/rpc_client.go index e40bee02..e8e0bbeb 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -10,6 +10,7 @@ import ( c "github.com/micro/go-micro/context" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" "github.com/micro/go-micro/transport" "golang.org/x/net/context" @@ -44,8 +45,8 @@ func newRpcClient(opt ...Option) Client { } if opts.selector == nil { - opts.selector = registry.NewRandomSelector( - registry.SelectorRegistry(opts.registry), + opts.selector = selector.NewSelector( + selector.Registry(opts.registry), ) } @@ -156,14 +157,14 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac } next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) - if err != nil && err == registry.ErrNotFound { + if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } node, err := next() - if err != nil && err == registry.ErrNotFound { + if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) @@ -190,14 +191,14 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in } next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) - if err != nil && err == registry.ErrNotFound { + if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } node, err := next() - if err != nil && err == registry.ErrNotFound { + if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) diff --git a/examples/client/dc_filter/dc_filter.go b/examples/client/dc_filter/dc_filter.go index 1795edc5..cd2c7cec 100644 --- a/examples/client/dc_filter/dc_filter.go +++ b/examples/client/dc_filter/dc_filter.go @@ -8,9 +8,11 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" c "github.com/micro/go-micro/context" - example "github.com/micro/go-micro/examples/server/proto/example" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" "golang.org/x/net/context" + + example "github.com/micro/go-micro/examples/server/proto/example" ) func init() { @@ -38,7 +40,9 @@ func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface return services } - callOptions := append(opts, client.WithSelectOption(registry.SelectFilter(filter))) + callOptions := append(opts, client.WithSelectOption( + selector.Filter(filter), + )) fmt.Printf("[DC Wrapper] filtering for datacenter %s\n", md["datacenter"]) return dc.Client.Call(ctx, req, rsp, callOptions...) diff --git a/examples/client/dc_selector/dc_selector.go b/examples/client/dc_selector/dc_selector.go index a4d6c3f6..ecc4a3f7 100644 --- a/examples/client/dc_selector/dc_selector.go +++ b/examples/client/dc_selector/dc_selector.go @@ -8,14 +8,16 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" - example "github.com/micro/go-micro/examples/server/proto/example" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" "golang.org/x/net/context" + + example "github.com/micro/go-micro/examples/server/proto/example" ) // Built in random hashed node selector type dcSelector struct { - opts registry.SelectorOptions + opts selector.Options } var ( @@ -26,14 +28,14 @@ func init() { rand.Seed(time.Now().Unix()) } -func (n *dcSelector) Select(service string, opts ...registry.SelectOption) (registry.SelectNext, error) { +func (n *dcSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { services, err := n.opts.Registry.GetService(service) if err != nil { return nil, err } if len(services) == 0 { - return nil, registry.ErrNotFound + return nil, selector.ErrNotFound } var nodes []*registry.Node @@ -48,7 +50,7 @@ func (n *dcSelector) Select(service string, opts ...registry.SelectOption) (regi } if len(nodes) == 0 { - return nil, registry.ErrNotFound + return nil, selector.ErrNotFound } var i int @@ -75,8 +77,8 @@ func (n *dcSelector) Close() error { } // Return a new first node selector -func DCSelector(opts ...registry.SelectorOption) registry.Selector { - var sopts registry.SelectorOptions +func DCSelector(opts ...selector.Option) selector.Selector { + var sopts selector.Options for _, opt := range opts { opt(&sopts) } diff --git a/examples/client/selector/selector.go b/examples/client/selector/selector.go index 7bb0e220..a2ef1c20 100644 --- a/examples/client/selector/selector.go +++ b/examples/client/selector/selector.go @@ -9,6 +9,7 @@ import ( "github.com/micro/go-micro/cmd" example "github.com/micro/go-micro/examples/server/proto/example" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" "golang.org/x/net/context" ) @@ -18,20 +19,20 @@ func init() { // Built in random hashed node selector type firstNodeSelector struct { - opts registry.SelectorOptions + opts selector.Options } -func (n *firstNodeSelector) Select(service string, opts ...registry.SelectOption) (registry.SelectNext, error) { +func (n *firstNodeSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { services, err := n.opts.Registry.GetService(service) if err != nil { return nil, err } if len(services) == 0 { - return nil, registry.ErrNotFound + return nil, selector.ErrNotFound } - var sopts registry.SelectOptions + var sopts selector.SelectOptions for _, opt := range opts { opt(&sopts) } @@ -41,11 +42,11 @@ func (n *firstNodeSelector) Select(service string, opts ...registry.SelectOption } if len(services) == 0 { - return nil, registry.ErrNotFound + return nil, selector.ErrNotFound } if len(services[0].Nodes) == 0 { - return nil, registry.ErrNotFound + return nil, selector.ErrNotFound } return func() (*registry.Node, error) { @@ -66,8 +67,8 @@ func (n *firstNodeSelector) Close() error { } // Return a new first node selector -func FirstNodeSelector(opts ...registry.SelectorOption) registry.Selector { - var sopts registry.SelectorOptions +func FirstNodeSelector(opts ...selector.Option) selector.Selector { + var sopts selector.Options for _, opt := range opts { opt(&sopts) } diff --git a/registry/mock/mock.go b/registry/mock/mock.go new file mode 100644 index 00000000..30ddae9b --- /dev/null +++ b/registry/mock/mock.go @@ -0,0 +1,70 @@ +package mock + +import ( + "github.com/micro/go-micro/registry" +) + +type MockRegistry struct{} + +func (m *MockRegistry) GetService(service string) ([]*registry.Service, error) { + return []*registry.Service{ + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.0-123", + Address: "localhost", + Port: 9999, + }, + { + Id: "foo-1.0.0-321", + Address: "localhost", + Port: 9999, + }, + }, + }, + { + Name: "foo", + Version: "1.0.1", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.1-321", + Address: "localhost", + Port: 6666, + }, + }, + }, + { + Name: "foo", + Version: "1.0.3", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.3-345", + Address: "localhost", + Port: 8888, + }, + }, + }, + }, nil +} + +func (m *MockRegistry) ListServices() ([]*registry.Service, error) { + return []*registry.Service{}, nil +} + +func (m *MockRegistry) Register(s *registry.Service) error { + return nil +} + +func (m *MockRegistry) Deregister(s *registry.Service) error { + return nil +} + +func (m *MockRegistry) Watch() (registry.Watcher, error) { + return nil, nil +} + +func NewRegistry() *MockRegistry { + return &MockRegistry{} +} diff --git a/registry/mock_registry.go b/registry/mock_registry.go deleted file mode 100644 index 36d427cc..00000000 --- a/registry/mock_registry.go +++ /dev/null @@ -1,62 +0,0 @@ -package registry - -type mockRegistry struct{} - -func (m *mockRegistry) GetService(service string) ([]*Service, error) { - return []*Service{ - { - Name: "foo", - Version: "1.0.0", - Nodes: []*Node{ - { - Id: "foo-1.0.0-123", - Address: "localhost", - Port: 9999, - }, - { - Id: "foo-1.0.0-321", - Address: "localhost", - Port: 9999, - }, - }, - }, - { - Name: "foo", - Version: "1.0.1", - Nodes: []*Node{ - { - Id: "foo-1.0.1-321", - Address: "localhost", - Port: 6666, - }, - }, - }, - { - Name: "foo", - Version: "1.0.3", - Nodes: []*Node{ - { - Id: "foo-1.0.3-345", - Address: "localhost", - Port: 8888, - }, - }, - }, - }, nil -} - -func (m *mockRegistry) ListServices() ([]*Service, error) { - return []*Service{}, nil -} - -func (m *mockRegistry) Register(s *Service) error { - return nil -} - -func (m *mockRegistry) Deregister(s *Service) error { - return nil -} - -func (m *mockRegistry) Watch() (Watcher, error) { - return nil, nil -} diff --git a/registry/registry.go b/registry/registry.go index ea7895dc..4f211d6e 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,9 +1,5 @@ package registry -import ( - "errors" -) - type Registry interface { Register(*Service) error Deregister(*Service) error @@ -18,9 +14,6 @@ type Option func(*options) var ( DefaultRegistry = newConsulRegistry([]string{}) - - ErrNotFound = errors.New("not found") - ErrNoneAvailable = errors.New("none available") ) func NewRegistry(addrs []string, opt ...Option) Registry { diff --git a/registry/selector.go b/registry/selector.go deleted file mode 100644 index 45a651a4..00000000 --- a/registry/selector.go +++ /dev/null @@ -1,44 +0,0 @@ -package registry - -// Selector builds on the registry as a mechanism to pick nodes -// and mark their status. This allows host pools and other things -// to be built using various algorithms. -type Selector interface { - Select(service string, opts ...SelectOption) (SelectNext, error) - Mark(service string, node *Node, err error) - Reset(service string) - Close() error -} - -// SelectNext is a function that returns the next node -// based on the selector's algorithm -type SelectNext func() (*Node, error) - -type SelectorOptions struct { - Registry Registry -} - -type SelectOptions struct { - Filters []func([]*Service) []*Service -} - -// Option used to initialise the selector -type SelectorOption func(*SelectorOptions) - -// Option used when making a select call -type SelectOption func(*SelectOptions) - -// SelectorRegistry sets the registry used by the selector -func SelectorRegistry(r Registry) SelectorOption { - return func(o *SelectorOptions) { - o.Registry = r - } -} - -// SelectFilter adds a filter function to the list of filters -// used during the Select call. -func SelectFilter(fn func([]*Service) []*Service) SelectOption { - return func(o *SelectOptions) { - o.Filters = append(o.Filters, fn) - } -} diff --git a/registry/black_list_selector.go b/selector/blacklist/black_list_selector.go similarity index 75% rename from registry/black_list_selector.go rename to selector/blacklist/black_list_selector.go index 902b9924..b094ef97 100644 --- a/registry/black_list_selector.go +++ b/selector/blacklist/black_list_selector.go @@ -1,9 +1,12 @@ -package registry +package blacklist import ( "math/rand" "sync" "time" + + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" ) type blackListNode struct { @@ -13,7 +16,7 @@ type blackListNode struct { } type blackListSelector struct { - so SelectorOptions + so selector.Options ttl int64 exit chan bool once sync.Once @@ -51,8 +54,8 @@ func (r *blackListSelector) run() { } } -func (r *blackListSelector) Select(service string, opts ...SelectOption) (SelectNext, error) { - var sopts SelectOptions +func (r *blackListSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { + var sopts selector.SelectOptions for _, opt := range opts { opt(&sopts) } @@ -70,10 +73,10 @@ func (r *blackListSelector) Select(service string, opts ...SelectOption) (Select // if there's nothing left, return if len(services) == 0 { - return nil, ErrNotFound + return nil, selector.ErrNotFound } - var nodes []*Node + var nodes []*registry.Node for _, service := range services { for _, node := range service.Nodes { @@ -82,11 +85,11 @@ func (r *blackListSelector) Select(service string, opts ...SelectOption) (Select } if len(nodes) == 0 { - return nil, ErrNotFound + return nil, selector.ErrNotFound } - return func() (*Node, error) { - var viable []*Node + return func() (*registry.Node, error) { + var viable []*registry.Node r.RLock() for _, node := range nodes { @@ -97,14 +100,14 @@ func (r *blackListSelector) Select(service string, opts ...SelectOption) (Select r.RUnlock() if len(viable) == 0 { - return nil, ErrNoneAvailable + return nil, selector.ErrNoneAvailable } return viable[rand.Int()%len(viable)], nil }, nil } -func (r *blackListSelector) Mark(service string, node *Node, err error) { +func (r *blackListSelector) Mark(service string, node *registry.Node, err error) { r.Lock() defer r.Unlock() if err == nil { @@ -138,15 +141,15 @@ func (r *blackListSelector) Close() error { return nil } -func NewBlackListSelector(opts ...SelectorOption) Selector { - var sopts SelectorOptions +func NewSelector(opts ...selector.Option) selector.Selector { + var sopts selector.Options for _, opt := range opts { opt(&sopts) } if sopts.Registry == nil { - sopts.Registry = DefaultRegistry + sopts.Registry = registry.DefaultRegistry } var once sync.Once diff --git a/registry/black_list_selector_test.go b/selector/blacklist/black_list_selector_test.go similarity index 82% rename from registry/black_list_selector_test.go rename to selector/blacklist/black_list_selector_test.go index a68b03da..a9fefc75 100644 --- a/registry/black_list_selector_test.go +++ b/selector/blacklist/black_list_selector_test.go @@ -1,17 +1,20 @@ -package registry +package blacklist import ( "errors" "testing" "time" + + "github.com/micro/go-micro/registry/mock" + "github.com/micro/go-micro/selector" ) func TestBlackListSelector(t *testing.T) { counts := map[string]int{} bl := &blackListSelector{ - so: SelectorOptions{ - Registry: &mockRegistry{}, + so: selector.Options{ + Registry: mock.NewRegistry(), }, ttl: 2, bl: make(map[string]blackListNode), @@ -44,7 +47,7 @@ func TestBlackListSelector(t *testing.T) { } bl.Mark("foo", node, errors.New("blacklist")) } - if node, err := next(); err != ErrNoneAvailable { + if node, err := next(); err != selector.ErrNoneAvailable { t.Errorf("Expected none available err, got node %v err %v", node, err) } time.Sleep(time.Second * time.Duration(bl.ttl) * 2) @@ -60,7 +63,7 @@ func TestBlackListSelector(t *testing.T) { } bl.Mark("foo", node, errors.New("blacklist")) } - if node, err := next(); err != ErrNoneAvailable { + if node, err := next(); err != selector.ErrNoneAvailable { t.Errorf("Expected none available err, got node %v err %v", node, err) } bl.Reset("foo") diff --git a/selector/options.go b/selector/options.go new file mode 100644 index 00000000..59738edf --- /dev/null +++ b/selector/options.go @@ -0,0 +1,34 @@ +package selector + +import ( + "github.com/micro/go-micro/registry" +) + +type Options struct { + Registry registry.Registry +} + +type SelectOptions struct { + Filters []SelectFilter +} + +// Option used to initialise the selector +type Option func(*Options) + +// SelectOption used when making a select call +type SelectOption func(*SelectOptions) + +// Registry sets the registry used by the selector +func Registry(r registry.Registry) Option { + return func(o *Options) { + o.Registry = r + } +} + +// Filter adds a filter function to the list of filters +// used during the Select call. +func Filter(fn SelectFilter) SelectOption { + return func(o *SelectOptions) { + o.Filters = append(o.Filters, fn) + } +} diff --git a/selector/random/random.go b/selector/random/random.go new file mode 100644 index 00000000..c3ca6202 --- /dev/null +++ b/selector/random/random.go @@ -0,0 +1,7 @@ +package random + +import "github.com/micro/go-micro/selector" + +func NewSelector(opts ...selector.Option) selector.Selector { + return selector.NewSelector(opts...) +} diff --git a/registry/random_selector.go b/selector/random_selector.go similarity index 76% rename from registry/random_selector.go rename to selector/random_selector.go index 7f4d60b1..03ae39cc 100644 --- a/registry/random_selector.go +++ b/selector/random_selector.go @@ -1,19 +1,21 @@ -package registry +package selector import ( "math/rand" "time" + + "github.com/micro/go-micro/registry" ) type randomSelector struct { - so SelectorOptions + so Options } func init() { rand.Seed(time.Now().Unix()) } -func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNext, error) { +func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) { var sopts SelectOptions for _, opt := range opts { opt(&sopts) @@ -35,7 +37,7 @@ func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNex return nil, ErrNotFound } - var nodes []*Node + var nodes []*registry.Node for _, service := range services { for _, node := range service.Nodes { @@ -47,7 +49,7 @@ func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNex return nil, ErrNotFound } - return func() (*Node, error) { + return func() (*registry.Node, error) { i := rand.Int() j := i % len(services) @@ -60,7 +62,7 @@ func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNex }, nil } -func (r *randomSelector) Mark(service string, node *Node, err error) { +func (r *randomSelector) Mark(service string, node *registry.Node, err error) { return } @@ -72,15 +74,15 @@ func (r *randomSelector) Close() error { return nil } -func NewRandomSelector(opts ...SelectorOption) Selector { - var sopts SelectorOptions +func newRandomSelector(opts ...Option) Selector { + var sopts Options for _, opt := range opts { opt(&sopts) } if sopts.Registry == nil { - sopts.Registry = DefaultRegistry + sopts.Registry = registry.DefaultRegistry } return &randomSelector{sopts} diff --git a/registry/random_selector_test.go b/selector/random_selector_test.go similarity index 69% rename from registry/random_selector_test.go rename to selector/random_selector_test.go index 5d4ef1a7..479ea750 100644 --- a/registry/random_selector_test.go +++ b/selector/random_selector_test.go @@ -1,19 +1,21 @@ -package registry +package selector import ( "testing" + + "github.com/micro/go-micro/registry/mock" ) func TestRandomSelector(t *testing.T) { counts := map[string]int{} - rr := &randomSelector{ - so: SelectorOptions{ - Registry: &mockRegistry{}, + bl := &randomSelector{ + so: Options{ + Registry: mock.NewRegistry(), }, } - next, err := rr.Select("foo") + next, err := bl.Select("foo") if err != nil { t.Errorf("Unexpected error calling random select: %v", err) } diff --git a/registry/round_robin_selector.go b/selector/roundrobin/round_robin_selector.go similarity index 58% rename from registry/round_robin_selector.go rename to selector/roundrobin/round_robin_selector.go index 28624a06..0bbed58c 100644 --- a/registry/round_robin_selector.go +++ b/selector/roundrobin/round_robin_selector.go @@ -1,15 +1,18 @@ -package registry +package roundrobin import ( "sync" + + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" ) type roundRobinSelector struct { - so SelectorOptions + so selector.Options } -func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (SelectNext, error) { - var sopts SelectOptions +func (r *roundRobinSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { + var sopts selector.SelectOptions for _, opt := range opts { opt(&sopts) } @@ -27,10 +30,10 @@ func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (Selec // if there's nothing left, return if len(services) == 0 { - return nil, ErrNotFound + return nil, selector.ErrNotFound } - var nodes []*Node + var nodes []*registry.Node for _, service := range services { for _, node := range service.Nodes { @@ -39,13 +42,13 @@ func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (Selec } if len(nodes) == 0 { - return nil, ErrNotFound + return nil, selector.ErrNotFound } var i int var mtx sync.Mutex - return func() (*Node, error) { + return func() (*registry.Node, error) { mtx.Lock() defer mtx.Unlock() i++ @@ -53,7 +56,7 @@ func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (Selec }, nil } -func (r *roundRobinSelector) Mark(service string, node *Node, err error) { +func (r *roundRobinSelector) Mark(service string, node *registry.Node, err error) { return } @@ -65,15 +68,15 @@ func (r *roundRobinSelector) Close() error { return nil } -func NewRoundRobinSelector(opts ...SelectorOption) Selector { - var sopts SelectorOptions +func NewRoundRobinSelector(opts ...selector.Option) selector.Selector { + var sopts selector.Options for _, opt := range opts { opt(&sopts) } if sopts.Registry == nil { - sopts.Registry = DefaultRegistry + sopts.Registry = registry.DefaultRegistry } return &roundRobinSelector{sopts} diff --git a/registry/round_robin_selector_test.go b/selector/roundrobin/round_robin_selector_test.go similarity index 73% rename from registry/round_robin_selector_test.go rename to selector/roundrobin/round_robin_selector_test.go index 4e1d58ac..f02babe7 100644 --- a/registry/round_robin_selector_test.go +++ b/selector/roundrobin/round_robin_selector_test.go @@ -1,15 +1,18 @@ -package registry +package roundrobin import ( "testing" + + "github.com/micro/go-micro/registry/mock" + "github.com/micro/go-micro/selector" ) func TestRoundRobinSelector(t *testing.T) { counts := map[string]int{} rr := &roundRobinSelector{ - so: SelectorOptions{ - Registry: &mockRegistry{}, + so: selector.Options{ + Registry: mock.NewRegistry(), }, } diff --git a/selector/selector.go b/selector/selector.go new file mode 100644 index 00000000..409549f5 --- /dev/null +++ b/selector/selector.go @@ -0,0 +1,93 @@ +/* +The Selector package provides a way to algorithmically filter and return +nodes required by the client or any other system. Selector's implemented +by Micro build on the registry but it's of optional use. One could +provide a static Selector that has a fixed pool. + + func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) { + var sopts SelectOptions + for _, opt := range opts { + opt(&sopts) + } + + // get the service + services, err := r.so.Registry.GetService(service) + if err != nil { + return nil, err + } + + // apply the filters + for _, filter := range sopts.Filters { + services = filter(services) + } + + // if there's nothing left, return + if len(services) == 0 { + return nil, ErrNotFound + } + + var nodes []*registry.Node + + for _, service := range services { + for _, node := range service.Nodes { + nodes = append(nodes, node) + } + } + + if len(nodes) == 0 { + return nil, ErrNotFound + } + + return func() (*registry.Node, error) { + i := rand.Int() + j := i % len(services) + + if len(services[j].Nodes) == 0 { + return nil, ErrNotFound + } + + k := i % len(services[j].Nodes) + return services[j].Nodes[k], nil + }, nil + } + + +*/ +package selector + +import ( + "errors" + "github.com/micro/go-micro/registry" +) + +// Selector builds on the registry as a mechanism to pick nodes +// and mark their status. This allows host pools and other things +// to be built using various algorithms. +type Selector interface { + // Select returns a function which should return the next node + Select(service string, opts ...SelectOption) (Next, error) + // Mark sets the success/error against a node + Mark(service string, node *registry.Node, err error) + // Reset returns state back to zero for a service + Reset(service string) + // Close renders the selector unusable + Close() error +} + +// Next is a function that returns the next node +// based on the selector's algorithm +type Next func() (*registry.Node, error) + +// SelectFilter is used to filter a service during the selection process +type SelectFilter func([]*registry.Service) []*registry.Service + +var ( + DefaultSelector = newRandomSelector() + + ErrNotFound = errors.New("not found") + ErrNoneAvailable = errors.New("none available") +) + +func NewSelector(opts ...Option) Selector { + return newRandomSelector(opts...) +}