mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-24 10:07:04 +02:00
fix bug in the tunnel which causes multicast connections to be closed
This commit is contained in:
parent
9ed257f151
commit
9678daeafa
@ -93,6 +93,9 @@ func (t *tun) getSession(channel, session string) (*session, bool) {
|
|||||||
// delSession deletes a session if it exists
|
// delSession deletes a session if it exists
|
||||||
func (t *tun) delSession(channel, session string) {
|
func (t *tun) delSession(channel, session string) {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
|
if s, ok := t.sessions[channel+session]; ok {
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
delete(t.sessions, channel+session)
|
delete(t.sessions, channel+session)
|
||||||
t.Unlock()
|
t.Unlock()
|
||||||
}
|
}
|
||||||
@ -512,10 +515,10 @@ func (t *tun) listen(link *link) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// try get the dialing socket
|
// try get the dialing socket
|
||||||
s, exists := t.getSession(channel, sessionId)
|
_, exists := t.getSession(channel, sessionId)
|
||||||
if exists {
|
if exists {
|
||||||
// close and continue
|
// delete and continue
|
||||||
s.Close()
|
t.delSession(channel, sessionId)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// otherwise its a session mapping of sorts
|
// otherwise its a session mapping of sorts
|
||||||
|
@ -66,6 +66,7 @@ func (t *tunListener) process() {
|
|||||||
sess, ok := conns[m.session]
|
sess, ok := conns[m.session]
|
||||||
log.Debugf("Tunnel listener received channel %s session %s exists: %t", m.channel, m.session, ok)
|
log.Debugf("Tunnel listener received channel %s session %s exists: %t", m.channel, m.session, ok)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
// we only process open and session types
|
||||||
switch m.typ {
|
switch m.typ {
|
||||||
case "open", "session":
|
case "open", "session":
|
||||||
default:
|
default:
|
||||||
@ -87,7 +88,7 @@ func (t *tunListener) process() {
|
|||||||
// the link the message was received on
|
// the link the message was received on
|
||||||
link: m.link,
|
link: m.link,
|
||||||
// set the connection mode
|
// set the connection mode
|
||||||
mode: m.mode,
|
mode: t.session.mode,
|
||||||
// close chan
|
// close chan
|
||||||
closed: make(chan bool),
|
closed: make(chan bool),
|
||||||
// recv called by the acceptor
|
// recv called by the acceptor
|
||||||
|
@ -413,6 +413,11 @@ func (s *session) Close() error {
|
|||||||
default:
|
default:
|
||||||
close(s.closed)
|
close(s.closed)
|
||||||
|
|
||||||
|
// don't broadcast the close for multicast
|
||||||
|
if s.mode != Unicast {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// append to backlog
|
// append to backlog
|
||||||
msg := s.newMessage("close")
|
msg := s.newMessage("close")
|
||||||
// no error response on close
|
// no error response on close
|
||||||
|
Loading…
Reference in New Issue
Block a user