1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-11-23 21:44:41 +02:00

[feature] ability to use wildcard in topic name (factory-events.*.*) (#2804)

This commit is contained in:
Ak-Army
2025-09-09 13:51:39 +02:00
committed by GitHub
parent e5cd820c71
commit 9070b3befd
4 changed files with 79 additions and 61 deletions

View File

@@ -14,7 +14,8 @@ import (
// HandleEvent handles inbound messages to the service directly. // HandleEvent handles inbound messages to the service directly.
// These events are a result of registering to the topic with the service name. // These events are a result of registering to the topic with the service name.
// TODO: handle requests from an event. We won't send a response. // TODO: handle requests from an event. We won't send a response.
func (s *rpcServer) HandleEvent(e broker.Event) error { func (s *rpcServer) HandleEvent(subscriber string) func(e broker.Event) error {
return func(e broker.Event) error {
// formatting horrible cruft // formatting horrible cruft
msg := e.Message() msg := e.Message()
@@ -55,7 +56,10 @@ func (s *rpcServer) HandleEvent(e broker.Event) error {
r := Router(s.router) r := Router(s.router)
if s.opts.Router != nil { if s.opts.Router != nil {
// create a wrapped function // create a wrapped function
handler := s.opts.Router.ProcessMessage // create a wrapped function
handler := func(ctx context.Context, msg Message) error {
return s.opts.Router.ProcessMessage(ctx, subscriber, msg)
}
// execute the wrapper for it // execute the wrapper for it
for i := len(s.opts.SubWrappers); i > 0; i-- { for i := len(s.opts.SubWrappers); i > 0; i-- {
@@ -63,10 +67,13 @@ func (s *rpcServer) HandleEvent(e broker.Event) error {
} }
// set the router // set the router
r = rpcRouter{m: handler} r = rpcRouter{m: func(ctx context.Context, _ string, msg Message) error {
return handler(ctx, msg)
}}
} }
return r.ProcessMessage(ctx, rpcMsg) return r.ProcessMessage(ctx, subscriber, rpcMsg)
}
} }
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
@@ -102,7 +109,7 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
// subscribeServer will subscribe the server to the topic with its own name. // subscribeServer will subscribe the server to the topic with its own name.
func (s *rpcServer) subscribeServer(config Options) error { func (s *rpcServer) subscribeServer(config Options) error {
if s.opts.Router != nil && s.subscriber == nil { if s.opts.Router != nil && s.subscriber == nil {
sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent) sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent(config.Name))
if err != nil { if err != nil {
return err return err
} }
@@ -134,7 +141,7 @@ func (s *rpcServer) reSubscribe(config Options) {
} }
config.Logger.Logf(log.InfoLevel, "Subscribing to topic: %s", sb.Topic()) config.Logger.Logf(log.InfoLevel, "Subscribing to topic: %s", sb.Topic())
sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent(sb.Topic()), opts...)
if err != nil { if err != nil {
config.Logger.Logf(log.WarnLevel, "Unable to subscribing to topic: %s, error: %s", sb.Topic(), err) config.Logger.Logf(log.WarnLevel, "Unable to subscribing to topic: %s, error: %s", sb.Topic(), err)
continue continue

View File

@@ -81,11 +81,11 @@ type router struct {
// rpcRouter encapsulates functions that become a Router. // rpcRouter encapsulates functions that become a Router.
type rpcRouter struct { type rpcRouter struct {
h func(context.Context, Request, interface{}) error h func(context.Context, Request, interface{}) error
m func(context.Context, Message) error m func(context.Context, string, Message) error
} }
func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error { func (r rpcRouter) ProcessMessage(ctx context.Context, subscriber string, msg Message) error {
return r.m(ctx, msg) return r.m(ctx, subscriber, msg)
} }
func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error { func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error {
@@ -188,7 +188,11 @@ func prepareMethod(method reflect.Method, logger log.Logger) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
} }
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error { func (router *router) sendResponse(sending sync.Locker,
req *request,
reply interface{},
cc codec.Writer,
last bool) error {
msg := new(codec.Message) msg := new(codec.Message)
msg.Type = codec.Response msg.Type = codec.Response
resp := router.getResponse() resp := router.getResponse()
@@ -205,7 +209,13 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte
return err return err
} }
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error { func (s *service) call(ctx context.Context,
router *router,
sending *sync.Mutex,
mtype *methodType,
req *request,
argv, replyv reflect.Value,
cc codec.Writer) error {
defer router.freeRequest(req) defer router.freeRequest(req)
function := mtype.method.Func function := mtype.method.Func
@@ -227,7 +237,8 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
if !mtype.stream { if !mtype.stream {
fn := func(ctx context.Context, req Request, rsp interface{}) error { fn := func(ctx context.Context, req Request, rsp interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx),
reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
// The return value for the method is an error. // The return value for the method is an error.
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
@@ -534,7 +545,7 @@ func (router *router) Subscribe(s Subscriber) error {
return nil return nil
} }
func (router *router) ProcessMessage(ctx context.Context, msg Message) (err error) { func (router *router) ProcessMessage(ctx context.Context, subscriber string, msg Message) (err error) {
defer func() { defer func() {
// recover any panics // recover any panics
if r := recover(); r != nil { if r := recover(); r != nil {
@@ -546,7 +557,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro
// get the subscribers by topic // get the subscribers by topic
router.su.RLock() router.su.RLock()
subs, ok := router.subscribers[msg.Topic()] subs, ok := router.subscribers[subscriber]
router.su.RUnlock() router.su.RUnlock()
if !ok { if !ok {
log.Warnf("Subscriber not found for topic %s", msg.Topic()) log.Warnf("Subscriber not found for topic %s", msg.Topic())

View File

@@ -167,7 +167,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// Process the event // Process the event
ev := newEvent(msg) ev := newEvent(msg)
if err := s.HandleEvent(ev); err != nil { if err := s.HandleEvent(ev.Topic())(ev); err != nil {
msg.Header[headers.Error] = err.Error() msg.Header[headers.Error] = err.Error()
logger.Logf(log.ErrorLevel, "failed to handle event: %v", err) logger.Logf(log.ErrorLevel, "failed to handle event: %v", err)
} }

View File

@@ -40,7 +40,7 @@ type Server interface {
// Router handle serving messages. // Router handle serving messages.
type Router interface { type Router interface {
// ProcessMessage processes a message // ProcessMessage processes a message
ProcessMessage(context.Context, Message) error ProcessMessage(context.Context, string, Message) error
// ServeRequest processes a request to completion // ServeRequest processes a request to completion
ServeRequest(context.Context, Request, Response) error ServeRequest(context.Context, Request, Response) error
} }