1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-05 10:20:53 +02:00

Merge pull request #733 from milosgajdos83/freeze-graph

Freeze network graph when building full network topology
This commit is contained in:
Asim Aslam 2019-09-05 07:21:53 +01:00 committed by GitHub
commit d0761e0a1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 32 deletions

View File

@ -300,7 +300,7 @@ func (n *network) processNetChan(l tunnel.Listener) {
continue
}
n.Lock()
log.Debugf("Network connect message received from: %s", pbNetConnect.Node.Id)
log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id)
// if the entry already exists skip adding it
if neighbour, ok := n.neighbours[pbNetConnect.Node.Id]; ok {
// update lastSeen timestamp
@ -332,8 +332,8 @@ func (n *network) processNetChan(l tunnel.Listener) {
continue
}
n.Lock()
log.Debugf("Network neighbour message received from: %s", pbNetNeighbour.Node.Id)
// only add the neighbour if it's not already in the neighbourhood
log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id)
// only add the neighbour if it is NOT already in node's list of neighbours
if _, ok := n.neighbours[pbNetNeighbour.Node.Id]; !ok {
n.neighbours[pbNetNeighbour.Node.Id] = &node{
id: pbNetNeighbour.Node.Id,
@ -347,10 +347,14 @@ func (n *network) processNetChan(l tunnel.Listener) {
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
}
// update/store the neighbour node neighbours
// NOTE: * we dont update lastSeen time for the neighbours of the neighbour
// * even though we are NOT interested in neighbours of neighbours here
// we still allocate the map of neighbours for each of them
for _, pbNeighbour := range pbNetNeighbour.Neighbours {
neighbourNode := &node{
id: pbNeighbour.Id,
address: pbNeighbour.Address,
id: pbNeighbour.Id,
address: pbNeighbour.Address,
neighbours: make(map[string]*node),
}
n.neighbours[pbNetNeighbour.Node.Id].neighbours[neighbourNode.id] = neighbourNode
}
@ -366,6 +370,7 @@ func (n *network) processNetChan(l tunnel.Listener) {
continue
}
n.Lock()
log.Debugf("Network received close message from: %s", pbNetClose.Node.Id)
if err := n.pruneNode(pbNetClose.Node.Id); err != nil {
log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err)
continue
@ -423,6 +428,7 @@ func (n *network) announce(client transport.Client) {
Body: body,
}
log.Debugf("Network sending neighbour message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send neighbour messsage: %v", err)
continue
@ -704,6 +710,7 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A
Body: body,
}
log.Debugf("Network sending advert message from: %s", pbRtrAdvert.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send advert %s: %v", pbRtrAdvert.Id, err)
continue
@ -839,6 +846,12 @@ func (n *network) Nodes() []Node {
visited := make(map[string]*node)
// queue of the nodes to visit
queue := list.New()
// we need to freeze the network graph here
// otherwise we might get invalid results
n.RLock()
defer n.RUnlock()
// push network node to the back of queue
queue.PushBack(n.node)
// mark the node as visited
@ -898,39 +911,39 @@ func (n *network) Close() error {
case <-n.closed:
return nil
default:
// send close message only if we managed to connect to NetworkChannel
if netClient, ok := n.tunClient[NetworkChannel]; ok {
// send connect message to NetworkChannel
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetClose := &pbNet.Close{
Node: node,
}
// only proceed with sending to NetworkChannel if marshal succeeds
if body, err := proto.Marshal(pbNetClose); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "close",
},
Body: body,
}
log.Debugf("Network sending close message from: %s", node.Id)
if err := netClient.Send(&m); err != nil {
log.Debugf("Network failed to send close messsage: %v", err)
}
}
}
// TODO: send close message to the network channel
close(n.closed)
// set connected to false
n.connected = false
}
// send close message only if we managed to connect to NetworkChannel
if netClient, ok := n.tunClient[NetworkChannel]; ok {
// send connect message to NetworkChannel
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetClose := &pbNet.Close{
Node: node,
}
// only proceed with sending to NetworkChannel if marshal succeeds
if body, err := proto.Marshal(pbNetClose); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "close",
},
Body: body,
}
if err := netClient.Send(&m); err != nil {
log.Debugf("Network failed to send close messsage: %v", err)
}
}
}
return n.close()
}

View File

@ -1,3 +1,4 @@
// Package handler implements network RPC handler
package handler
import (