mirror of
https://github.com/go-micro/go-micro.git
synced 2025-04-11 11:02:02 +02:00
fix: consume and publish blocked after rabbitmq reconnecting (#2492)
* Support direct generation of grpc method when package and service names of proto files are different. * fix req.Interface() return nil. * Get rid of dependence on 'Micro-Topic' * Revert "Get rid of dependence on 'Micro-Topic'" This reverts commit 3ff69443364d39f5fda3a32fc2249826e2d207dd. * Revert "fix req.Interface() return nil." This reverts commit 90a1b34195e07772fa6f2074e1cf22237ac2a87f. * Revert "Revert "fix req.Interface() return nil."" This reverts commit e64737b7da8d1767c4456881f6730f1c196cea60. * Revert "Revert "Get rid of dependence on 'Micro-Topic'"" This reverts commit 141bb0a557c81cb6d1c651b085b3e65483d5e681. * fix: consume and publish blocked after reconnecting Co-authored-by: maxinglun <maxinglun@zhijiaxing.net>
This commit is contained in:
parent
367771923c
commit
8293988499
@ -11,8 +11,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/logger"
|
||||
"github.com/streadway/amqp"
|
||||
"go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -129,42 +129,36 @@ func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) {
|
||||
chanNotifyClose := make(chan *amqp.Error)
|
||||
channel := r.ExchangeChannel.channel
|
||||
channel.NotifyClose(chanNotifyClose)
|
||||
channelNotifyReturn := make(chan amqp.Return)
|
||||
channel.NotifyReturn(channelNotifyReturn)
|
||||
|
||||
// block until closed
|
||||
select {
|
||||
case result, ok := <-channelNotifyReturn:
|
||||
if !ok {
|
||||
// Channel closed, probably also the channel or connection.
|
||||
// To avoid deadlocks it is necessary to consume the messages from all channels.
|
||||
for notifyClose != nil || chanNotifyClose != nil {
|
||||
// block until closed
|
||||
select {
|
||||
case err := <-chanNotifyClose:
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
}
|
||||
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
|
||||
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
|
||||
r.Lock()
|
||||
r.connected = false
|
||||
r.waitConnection = make(chan struct{})
|
||||
r.Unlock()
|
||||
chanNotifyClose = nil
|
||||
case err := <-notifyClose:
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
}
|
||||
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
|
||||
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
|
||||
r.Lock()
|
||||
r.connected = false
|
||||
r.waitConnection = make(chan struct{})
|
||||
r.Unlock()
|
||||
notifyClose = nil
|
||||
case <-r.close:
|
||||
return
|
||||
}
|
||||
// Do what you need with messageFailing.
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("notify error reason: %s, description: %s", result.ReplyText, result.Exchange)
|
||||
}
|
||||
case err := <-chanNotifyClose:
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
}
|
||||
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
|
||||
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
|
||||
r.Lock()
|
||||
r.connected = false
|
||||
r.waitConnection = make(chan struct{})
|
||||
r.Unlock()
|
||||
case err := <-notifyClose:
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
}
|
||||
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
|
||||
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
|
||||
r.Lock()
|
||||
r.connected = false
|
||||
r.waitConnection = make(chan struct{})
|
||||
r.Unlock()
|
||||
case <-r.close:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -102,6 +102,10 @@ func (s *subscriber) resubscribe() {
|
||||
return
|
||||
//wait until we reconect to rabbit
|
||||
case <-s.r.conn.waitConnection:
|
||||
// When the connection is disconnected, the waitConnection will be re-assigned, so '<-s.r.conn.waitConnection' maybe blocked.
|
||||
// Here, it returns once a second, and then the latest waitconnection will be used
|
||||
case <-time.After(time.Second):
|
||||
continue
|
||||
}
|
||||
|
||||
// it may crash (panic) in case of Consume without connection, so recheck it
|
||||
|
Loading…
x
Reference in New Issue
Block a user