diff --git a/server/subscriber.go b/server/subscriber.go index 0af2457b..9f2100c5 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "reflect" + "strings" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" @@ -176,6 +177,8 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle delete(hdr, "Content-Type") ctx := metadata.NewContext(context.Background(), hdr) + results := make(chan error, len(sb.handlers)) + for i := 0; i < len(sb.handlers); i++ { handler := sb.handlers[i] @@ -229,13 +232,26 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle s.wg.Add(1) go func() { defer s.wg.Done() - fn(ctx, &rpcMessage{ + results <- fn(ctx, &rpcMessage{ topic: sb.topic, contentType: ct, payload: req.Interface(), }) }() } + + var errors []string + + for i := 0; i < len(sb.handlers); i++ { + if err := <-results; err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) > 0 { + return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + return nil } }