diff --git a/plugins/broker/rabbitmq/connection.go b/plugins/broker/rabbitmq/connection.go index 5b1771fc..4ab0e828 100644 --- a/plugins/broker/rabbitmq/connection.go +++ b/plugins/broker/rabbitmq/connection.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/asim/go-micro/logger" "github.com/streadway/amqp" ) @@ -125,10 +126,37 @@ func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) { connect = true notifyClose := make(chan *amqp.Error) r.Connection.NotifyClose(notifyClose) + chanNotifyClose := make(chan *amqp.Error) + channel := r.ExchangeChannel.channel + channel.NotifyClose(chanNotifyClose) + channelNotifyReturn := make(chan amqp.Return) + channel.NotifyReturn(channelNotifyReturn) // block until closed select { - case <-notifyClose: + case result, ok := <-channelNotifyReturn: + if !ok { + // Channel closed, probably also the channel or connection. + return + } + // Do what you need with messageFailing. + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("notify error reason: %s, description: %s", result.ReplyText, result.Exchange) + } + case err := <-chanNotifyClose: + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + // block all resubscribe attempt - they are useless because there is no connection to rabbitmq + // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) + r.Lock() + r.connected = false + r.waitConnection = make(chan struct{}) + r.Unlock() + case err := <-notifyClose: + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } // block all resubscribe attempt - they are useless because there is no connection to rabbitmq // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) r.Lock()