1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

broker/eats: broker disconnect fix (#1186)

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Vasiliy Tolstov 2020-02-11 18:46:50 +03:00 committed by GitHub
parent d1d6eada98
commit 2764de9a1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 2 deletions

View File

@ -295,6 +295,10 @@ func (n *natsBroker) Connect() error {
return nil return nil
default: // DISCONNECTED or CLOSED or DRAINING default: // DISCONNECTED or CLOSED or DRAINING
opts := n.nopts opts := n.nopts
opts.DrainTimeout = 1 * time.Second
opts.AsyncErrorCB = n.onAsyncError
opts.DisconnectedErrCB = n.onDisconnectedError
opts.ClosedCB = n.onClose
opts.Servers = n.servers opts.Servers = n.servers
opts.Secure = n.opts.Secure opts.Secure = n.opts.Secure
opts.TLSConfig = n.opts.TLSConfig opts.TLSConfig = n.opts.TLSConfig
@ -324,7 +328,7 @@ func (n *natsBroker) Disconnect() error {
// drain the connection if specified // drain the connection if specified
if n.drain { if n.drain {
n.conn.Drain() n.conn.Drain()
return <-n.closeCh n.closeCh <- nil
} }
// close the client connection // close the client connection
@ -434,6 +438,10 @@ func (n *natsBroker) onClose(conn *nats.Conn) {
n.closeCh <- nil n.closeCh <- nil
} }
func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) {
n.closeCh <- nil
}
func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) { func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
// There are kinds of different async error nats might callback, but we are interested // There are kinds of different async error nats might callback, but we are interested
// in ErrDrainTimeout only here. // in ErrDrainTimeout only here.

View File

@ -318,7 +318,7 @@ func (n *natsBroker) Disconnect() error {
// drain the connection if specified // drain the connection if specified
if n.drain { if n.drain {
n.conn.Drain() n.conn.Drain()
return <-n.closeCh n.closeCh <- nil
} }
// close the client connection // close the client connection
@ -440,6 +440,7 @@ func (n *natsBroker) setOption(opts ...broker.Option) {
n.closeCh = make(chan error) n.closeCh = make(chan error)
n.nopts.ClosedCB = n.onClose n.nopts.ClosedCB = n.onClose
n.nopts.AsyncErrorCB = n.onAsyncError n.nopts.AsyncErrorCB = n.onAsyncError
n.nopts.DisconnectedErrCB = n.onDisconnectedError
} }
} }
@ -455,6 +456,10 @@ func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err e
} }
} }
func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) {
n.closeCh <- nil
}
func NewBroker(opts ...broker.Option) broker.Broker { func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{ options := broker.Options{
// Default codec // Default codec