1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00
This commit is contained in:
Asim Aslam 2019-12-08 00:53:55 +00:00
parent 9bd0fb9125
commit 283c85d256
4 changed files with 30 additions and 11 deletions

View File

@ -379,9 +379,8 @@ func (n *network) processNetChan(listener tunnel.Listener) {
}
// update peer links
log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetConnect.Node.Address)
if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil {
if err := n.updatePeerLinks(pbNetConnect.Node.Address, m); err != nil {
log.Debugf("Network failed updating peer links: %s", err)
}
@ -443,9 +442,8 @@ func (n *network) processNetChan(listener tunnel.Listener) {
}
// update peer links
log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetPeer.Node.Address)
if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil {
if err := n.updatePeerLinks(pbNetPeer.Node.Address, m); err != nil {
log.Debugf("Network failed updating peer links: %s", err)
}
@ -680,10 +678,12 @@ func (n *network) sendMsg(method, channel string, msg proto.Message) error {
}
// updatePeerLinks updates link for a given peer
func (n *network) updatePeerLinks(peerAddr string, linkId string) error {
func (n *network) updatePeerLinks(peerAddr string, m *message) error {
n.Lock()
defer n.Unlock()
linkId := m.msg.Header["Micro-Link"]
log.Tracef("Network looking up link %s in the peer links", linkId)
// lookup the peer link

View File

@ -266,7 +266,7 @@ func (t *tun) manageLinks() {
for _, node := range connect {
wg.Add(1)
go func() {
go func(node string) {
defer wg.Done()
// create new link
@ -280,7 +280,7 @@ func (t *tun) manageLinks() {
t.Lock()
t.links[node] = link
t.Unlock()
}()
}(node)
}
// wait for all threads to finish
@ -801,6 +801,7 @@ func (t *tun) setupLink(node string) (*link, error) {
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
link.Close()
return nil, err
}

View File

@ -65,12 +65,24 @@ func (t *tunListener) process() {
return
// receive a new message
case m := <-t.session.recv:
// session id
sessionId := m.session
var sessionId string
var linkId string
switch m.mode {
case Multicast:
sessionId = "multicast"
linkId = "multicast"
case Broadcast:
sessionId = "broadcast"
linkId = "broadcast"
default:
sessionId = m.session
linkId = m.link
}
// get a session
sess, ok := conns[sessionId]
log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, sessionId, m.typ, ok)
log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, m.session, m.typ, ok)
if !ok {
// we only process open and session types
switch m.typ {
@ -92,7 +104,7 @@ func (t *tunListener) process() {
// is loopback conn
loopback: m.loopback,
// the link the message was received on
link: m.link,
link: linkId,
// set the connection mode
mode: m.mode,
// close chan

View File

@ -400,6 +400,12 @@ func (s *session) Recv(m *transport.Message) error {
msg.data.Header[k] = string(val)
}
// set the link
// TODO: decruft, this is only for multicast
// since the session is now a single session
// likely provide as part of message.Link()
msg.data.Header["Micro-Link"] = msg.link
// set message
*m = *msg.data
// return nil