mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-23 17:53:05 +02:00
A variety of fixes to try combat the multicast issue
This commit is contained in:
parent
6f1c30aef5
commit
c840cee404
@ -939,17 +939,17 @@ func (n *network) Connect() error {
|
|||||||
log.Debugf("Network failed to resolve nodes: %v", err)
|
log.Debugf("Network failed to resolve nodes: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize the tunnel to resolved nodes
|
|
||||||
n.tunnel.Init(
|
|
||||||
tunnel.Nodes(nodes...),
|
|
||||||
)
|
|
||||||
|
|
||||||
// connect network tunnel
|
// connect network tunnel
|
||||||
if err := n.tunnel.Connect(); err != nil {
|
if err := n.tunnel.Connect(); err != nil {
|
||||||
n.Unlock()
|
n.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initialize the tunnel to resolved nodes
|
||||||
|
n.tunnel.Init(
|
||||||
|
tunnel.Nodes(nodes...),
|
||||||
|
)
|
||||||
|
|
||||||
// return if already connected
|
// return if already connected
|
||||||
if n.connected {
|
if n.connected {
|
||||||
// unlock first
|
// unlock first
|
||||||
|
@ -502,22 +502,26 @@ func (t *tun) listen(link *link) {
|
|||||||
// TODO: handle the close message
|
// TODO: handle the close message
|
||||||
// maybe report io.EOF or kill the link
|
// maybe report io.EOF or kill the link
|
||||||
|
|
||||||
// close the link entirely
|
// if there is no channel then we close the link
|
||||||
|
// as its a signal from the other side to close the connection
|
||||||
if len(channel) == 0 {
|
if len(channel) == 0 {
|
||||||
log.Debugf("Tunnel link %s received close message", link.Remote())
|
log.Debugf("Tunnel link %s received close message", link.Remote())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// the entire listener was closed so remove it from the mapping
|
// the entire listener was closed by the remote side so we need to
|
||||||
|
// remove the channel mapping for it. should we also close sessions?
|
||||||
if sessionId == "listener" {
|
if sessionId == "listener" {
|
||||||
link.delChannel(channel)
|
link.delChannel(channel)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assuming there's a channel and session
|
||||||
// try get the dialing socket
|
// try get the dialing socket
|
||||||
_, exists := t.getSession(channel, sessionId)
|
s, exists := t.getSession(channel, sessionId)
|
||||||
if exists {
|
if exists && s.mode == Unicast && !loopback {
|
||||||
// delete and continue
|
// only delete this if its unicast
|
||||||
|
// but not if its a loopback conn
|
||||||
t.delSession(channel, sessionId)
|
t.delSession(channel, sessionId)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -673,6 +677,7 @@ func (t *tun) listen(link *link) {
|
|||||||
typ: mtype,
|
typ: mtype,
|
||||||
channel: channel,
|
channel: channel,
|
||||||
session: sessionId,
|
session: sessionId,
|
||||||
|
mode: s.mode,
|
||||||
data: tmsg,
|
data: tmsg,
|
||||||
link: link.id,
|
link: link.id,
|
||||||
loopback: loopback,
|
loopback: loopback,
|
||||||
@ -703,7 +708,7 @@ func (t *tun) discover(link *link) {
|
|||||||
"Micro-Tunnel-Id": t.id,
|
"Micro-Tunnel-Id": t.id,
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Debugf("Tunnel failed to send discover to link %s: %v", link.id, err)
|
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
|
||||||
}
|
}
|
||||||
case <-link.closed:
|
case <-link.closed:
|
||||||
return
|
return
|
||||||
|
@ -88,7 +88,7 @@ func (t *tunListener) process() {
|
|||||||
// the link the message was received on
|
// the link the message was received on
|
||||||
link: m.link,
|
link: m.link,
|
||||||
// set the connection mode
|
// set the connection mode
|
||||||
mode: t.session.mode,
|
mode: m.mode,
|
||||||
// close chan
|
// close chan
|
||||||
closed: make(chan bool),
|
closed: make(chan bool),
|
||||||
// recv called by the acceptor
|
// recv called by the acceptor
|
||||||
|
@ -413,11 +413,6 @@ func (s *session) Close() error {
|
|||||||
default:
|
default:
|
||||||
close(s.closed)
|
close(s.closed)
|
||||||
|
|
||||||
// don't broadcast the close for multicast
|
|
||||||
if s.mode != Unicast {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// append to backlog
|
// append to backlog
|
||||||
msg := s.newMessage("close")
|
msg := s.newMessage("close")
|
||||||
// no error response on close
|
// no error response on close
|
||||||
|
Loading…
x
Reference in New Issue
Block a user