diff --git a/server/debug.go b/server/debug.go index 48df8b1c..3aaf5717 100644 --- a/server/debug.go +++ b/server/debug.go @@ -17,5 +17,5 @@ func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto } func registerDebugHandler(s Server) { - s.Handle(s.NewHandler(&Debug{s.Options().DebugHandler})) + s.Handle(s.NewHandler(&Debug{s.Options().DebugHandler}, InternalHandler(true))) } diff --git a/server/handler.go b/server/handler.go index 5cb13cf4..a7ec3dc9 100644 --- a/server/handler.go +++ b/server/handler.go @@ -20,6 +20,7 @@ type Handler interface { Name() string Handler() interface{} Endpoints() []*registry.Endpoint + Options() HandlerOptions } // Subscriber interface represents a subscription to a given topic using @@ -28,4 +29,30 @@ type Subscriber interface { Topic() string Subscriber() interface{} Endpoints() []*registry.Endpoint + Options() SubscriberOptions +} + +type HandlerOptions struct { + Internal bool +} + +type SubscriberOptions struct { + Internal bool +} + +// Internal Handler options specifies that a handler is not advertised +// to the discovery system. In the future this may also limit request +// to the internal network or authorised user. +func InternalHandler(b bool) HandlerOption { + return func(o *HandlerOptions) { + o.Internal = b + } +} + +// Internal Subscriber options specifies that a subscriber is not advertised +// to the discovery system. +func InternalSubscriber(b bool) SubscriberOption { + return func(o *SubscriberOptions) { + o.Internal = b + } } diff --git a/server/rpc_handler.go b/server/rpc_handler.go index bd0d301d..e01b4b25 100644 --- a/server/rpc_handler.go +++ b/server/rpc_handler.go @@ -10,9 +10,15 @@ type rpcHandler struct { name string handler interface{} endpoints []*registry.Endpoint + opts HandlerOptions } -func newRpcHandler(handler interface{}) Handler { +func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler { + var options HandlerOptions + for _, o := range opts { + o(&options) + } + typ := reflect.TypeOf(handler) hdlr := reflect.ValueOf(handler) name := reflect.Indirect(hdlr).Type().Name() @@ -30,6 +36,7 @@ func newRpcHandler(handler interface{}) Handler { name: name, handler: handler, endpoints: endpoints, + opts: options, } } @@ -44,3 +51,7 @@ func (r *rpcHandler) Handler() interface{} { func (r *rpcHandler) Endpoints() []*registry.Endpoint { return r.endpoints } + +func (r *rpcHandler) Options() HandlerOptions { + return r.opts +} diff --git a/server/rpc_server.go b/server/rpc_server.go index e60581e3..939b4b55 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -114,8 +114,8 @@ func (s *rpcServer) Init(opts ...Option) error { return nil } -func (s *rpcServer) NewHandler(h interface{}) Handler { - return newRpcHandler(h) +func (s *rpcServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { + return newRpcHandler(h, opts...) } func (s *rpcServer) Handle(h Handler) error { @@ -128,8 +128,8 @@ func (s *rpcServer) Handle(h Handler) error { return nil } -func (s *rpcServer) NewSubscriber(topic string, sb interface{}) Subscriber { - return newSubscriber(topic, sb) +func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { + return newSubscriber(topic, sb, opts...) } func (s *rpcServer) Subscribe(sb Subscriber) error { @@ -199,10 +199,16 @@ func (s *rpcServer) Register() error { s.RLock() var endpoints []*registry.Endpoint for _, e := range s.handlers { - endpoints = append(endpoints, e.Endpoints()...) + // Only advertise non internal handlers + if !e.Options().Internal { + endpoints = append(endpoints, e.Endpoints()...) + } } for e, _ := range s.subscribers { - endpoints = append(endpoints, e.Endpoints()...) + // Only advertise non internal subscribers + if !e.Options().Internal { + endpoints = append(endpoints, e.Endpoints()...) + } } s.RUnlock() diff --git a/server/server.go b/server/server.go index 0fff3afb..c74e392b 100644 --- a/server/server.go +++ b/server/server.go @@ -42,8 +42,8 @@ type Server interface { Options() Options Init(...Option) error Handle(Handler) error - NewHandler(interface{}) Handler - NewSubscriber(string, interface{}) Subscriber + NewHandler(interface{}, ...HandlerOption) Handler + NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error Register() error Deregister() error @@ -82,6 +82,10 @@ type Streamer interface { type Option func(*Options) +type HandlerOption func(*HandlerOptions) + +type SubscriberOption func(*SubscriberOptions) + var ( DefaultAddress = ":0" DefaultName = "go-server" @@ -110,8 +114,8 @@ func NewServer(opt ...Option) Server { // Creates a new subscriber interface with the given topic // and handler using the default server -func NewSubscriber(topic string, h interface{}) Subscriber { - return DefaultServer.NewSubscriber(topic, h) +func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber { + return DefaultServer.NewSubscriber(topic, h, opts...) } // Creates a new handler interface using the default server @@ -124,8 +128,8 @@ func NewSubscriber(topic string, h interface{}) Subscriber { // return nil // } // -func NewHandler(h interface{}) Handler { - return DefaultServer.NewHandler(h) +func NewHandler(h interface{}, opts ...HandlerOption) Handler { + return DefaultServer.NewHandler(h, opts...) } // Registers a handler interface with the default server to diff --git a/server/subscriber.go b/server/subscriber.go index 5a834497..04b5cb0b 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -29,9 +29,15 @@ type subscriber struct { subscriber interface{} handlers []*handler endpoints []*registry.Endpoint + opts SubscriberOptions } -func newSubscriber(topic string, sub interface{}) Subscriber { +func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { + var options SubscriberOptions + for _, o := range opts { + o(&options) + } + var endpoints []*registry.Endpoint var handlers []*handler @@ -96,6 +102,7 @@ func newSubscriber(topic string, sub interface{}) Subscriber { subscriber: sub, handlers: handlers, endpoints: endpoints, + opts: options, } } @@ -241,3 +248,7 @@ func (s *subscriber) Subscriber() interface{} { func (s *subscriber) Endpoints() []*registry.Endpoint { return s.endpoints } + +func (s *subscriber) Options() SubscriberOptions { + return s.opts +}