mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-11 17:18:28 +02:00
Small refactoring; Split horizon loop break.
This commit is contained in:
parent
d72e91fb38
commit
323a72be34
@ -264,7 +264,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
||||
}
|
||||
if ok := n.node.AddPeer(peer); !ok {
|
||||
log.Debugf("Network peer exists, refreshing: %s", peer.id)
|
||||
// update lastSeen time for the peer
|
||||
// update lastSeen time for the existing node
|
||||
if ok := n.RefreshPeer(peer.id, now); !ok {
|
||||
log.Debugf("Network failed refreshing peer: %s", peer.id)
|
||||
}
|
||||
@ -314,7 +314,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
||||
if ok := n.RefreshPeer(pbNetPeer.Node.Id, now); !ok {
|
||||
log.Debugf("Network failed refreshing peer: %s", pbNetPeer.Node.Id)
|
||||
}
|
||||
// NOTE: we don't uunpack MaxDepth toplogy
|
||||
// NOTE: we don't unpack MaxDepth toplogy
|
||||
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
|
||||
log.Debugf("Network updating topology of node: %s", n.node.id)
|
||||
if ok := n.node.UpdatePeer(peer); !ok {
|
||||
@ -539,34 +539,23 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
|
||||
if pbRtrAdvert.Id == n.options.Id {
|
||||
continue
|
||||
}
|
||||
log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id)
|
||||
// loookup advertising node in our peers
|
||||
advertNode := n.node.GetPeer(pbRtrAdvert.Id)
|
||||
// if we dont recognize the node as our peer we skip processing its adverts
|
||||
log.Debugf("Network received advert message with %d events from: %s", len(pbRtrAdvert.Events), pbRtrAdvert.Id)
|
||||
// loookup advertising node in our peer topology
|
||||
advertNode := n.node.GetPeerNode(pbRtrAdvert.Id)
|
||||
if advertNode == nil {
|
||||
// if we can't find the node in our topology (MaxDepth) we skipp prcessing adverts
|
||||
log.Debugf("Network skipping advert message from unknown peer: %s", pbRtrAdvert.Id)
|
||||
continue
|
||||
}
|
||||
|
||||
var events []*router.Event
|
||||
for _, event := range pbRtrAdvert.Events {
|
||||
// set the address of the advertising node
|
||||
// we know Route.Gateway is the address of advertNode
|
||||
// NOTE: this is true only when advertNode had not been registered
|
||||
// as our peer when we received the advert from it
|
||||
if advertNode.address == "" {
|
||||
advertNode.address = event.Route.Gateway
|
||||
if ok := n.node.UpdatePeer(advertNode); !ok {
|
||||
log.Debugf("Network failed to update peer: %s", advertNode.id)
|
||||
continue
|
||||
}
|
||||
}
|
||||
// if advertising node id is not the same as Route.Router
|
||||
// we know the advertising node is not the origin of the route
|
||||
if advertNode.id != event.Route.Router {
|
||||
if pbRtrAdvert.Id != event.Route.Router {
|
||||
// if the origin router is not the advertising node peer
|
||||
// we can't rule out potential routing loops so we bail here
|
||||
if peer := advertNode.GetPeer(event.Route.Router); peer == nil {
|
||||
if !advertNode.HasPeer(event.Route.Router) {
|
||||
log.Debugf("Network skipping advert message from peer: %s", pbRtrAdvert.Id)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -676,14 +665,13 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A
|
||||
|
||||
// Connect connects the network
|
||||
func (n *network) Connect() error {
|
||||
n.RLock()
|
||||
n.Lock()
|
||||
// return if already connected
|
||||
if n.connected {
|
||||
n.RUnlock()
|
||||
n.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
// try to resolve network nodes
|
||||
nodes, err := n.resolveNodes()
|
||||
if err != nil {
|
||||
@ -831,8 +819,6 @@ func (n *network) Close() error {
|
||||
// unlock the lock otherwise we'll deadlock sending the close
|
||||
n.Unlock()
|
||||
|
||||
// send close message only if we managed to connect to NetworkChannel
|
||||
log.Debugf("Sending close message from: %s", n.options.Id)
|
||||
msg := &pbNet.Close{
|
||||
Node: &pbNet.Node{
|
||||
Id: n.options.Id,
|
||||
|
114
network/node.go
114
network/node.go
@ -43,7 +43,7 @@ func (n *node) Network() Network {
|
||||
return n.network
|
||||
}
|
||||
|
||||
// AddPeer adds a new peer to node
|
||||
// AddPeer adds a new peer to node topology
|
||||
// It returns false if the peer already exists
|
||||
func (n *node) AddPeer(peer *node) bool {
|
||||
n.Lock()
|
||||
@ -57,33 +57,6 @@ func (n *node) AddPeer(peer *node) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// GetPeer returns a peer if it exists
|
||||
// It returns nil if the peer was not found
|
||||
func (n *node) GetPeer(id string) *node {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
p, ok := n.peers[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
peer := &node{
|
||||
id: p.id,
|
||||
address: p.address,
|
||||
peers: make(map[string]*node),
|
||||
network: p.network,
|
||||
lastSeen: p.lastSeen,
|
||||
}
|
||||
|
||||
// TODO: recursively retrieve all of its peers
|
||||
for id, pop := range p.peers {
|
||||
peer.peers[id] = pop
|
||||
}
|
||||
|
||||
return peer
|
||||
}
|
||||
|
||||
// UpdatePeer updates a peer if it exists
|
||||
// It returns false if the peer does not exist
|
||||
func (n *node) UpdatePeer(peer *node) bool {
|
||||
@ -106,7 +79,17 @@ func (n *node) DeletePeer(id string) {
|
||||
delete(n.peers, id)
|
||||
}
|
||||
|
||||
// Refresh updates node timestamp
|
||||
// HasPeer returns true if node has peer with given id
|
||||
func (n *node) HasPeer(id string) bool {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
_, ok := n.peers[id]
|
||||
return ok
|
||||
}
|
||||
|
||||
// RefreshPeer updates node timestamp
|
||||
// It returns false if the peer has not been found.
|
||||
func (n *node) RefreshPeer(id string, now time.Time) bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
@ -123,13 +106,7 @@ func (n *node) RefreshPeer(id string, now time.Time) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Nodes returns a slice if all nodes in node topology
|
||||
func (n *node) Nodes() []Node {
|
||||
// we need to freeze the network graph here
|
||||
// otherwise we might get inconsisten results
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
func (n *node) walk(until func(peer *node) bool) map[string]*node {
|
||||
// track the visited nodes
|
||||
visited := make(map[string]*node)
|
||||
// queue of the nodes to visit
|
||||
@ -151,11 +128,31 @@ func (n *node) Nodes() []Node {
|
||||
visited[id] = node
|
||||
queue.PushBack(node)
|
||||
}
|
||||
if until(node) {
|
||||
return visited
|
||||
}
|
||||
}
|
||||
// remove the node from the queue
|
||||
queue.Remove(qnode)
|
||||
}
|
||||
|
||||
return visited
|
||||
}
|
||||
|
||||
// Nodes returns a slice if all nodes in node topology
|
||||
func (n *node) Nodes() []Node {
|
||||
// we need to freeze the network graph here
|
||||
// otherwise we might get inconsisten results
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
// NOTE: this should never be true
|
||||
untilNoMorePeers := func(n *node) bool {
|
||||
return n == nil
|
||||
}
|
||||
|
||||
visited := n.walk(untilNoMorePeers)
|
||||
|
||||
var nodes []Node
|
||||
// collect all the nodes and return them
|
||||
for _, node := range visited {
|
||||
@ -165,8 +162,34 @@ func (n *node) Nodes() []Node {
|
||||
return nodes
|
||||
}
|
||||
|
||||
// topology returns node topology down to given depth
|
||||
func (n *node) topology(depth uint) *node {
|
||||
// GetPeerNode returns a peer from node topology i.e. up to MaxDepth
|
||||
// It returns nil if the peer was not found in the node topology
|
||||
func (n *node) GetPeerNode(id string) *node {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
// get node topology up to MaxDepth
|
||||
top := n.Topology(MaxDepth)
|
||||
|
||||
untilFoundPeer := func(n *node) bool {
|
||||
return n.id == id
|
||||
}
|
||||
|
||||
visited := top.walk(untilFoundPeer)
|
||||
|
||||
peerNode, ok := visited[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return peerNode
|
||||
}
|
||||
|
||||
// Topology returns a copy of th node topology down to given depth
|
||||
func (n *node) Topology(depth uint) *node {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
// make a copy of yourself
|
||||
node := &node{
|
||||
id: n.id,
|
||||
@ -186,7 +209,7 @@ func (n *node) topology(depth uint) *node {
|
||||
|
||||
// iterate through our peers and update the node peers
|
||||
for _, peer := range n.peers {
|
||||
nodePeer := peer.topology(depth)
|
||||
nodePeer := peer.Topology(depth)
|
||||
if _, ok := node.peers[nodePeer.id]; !ok {
|
||||
node.peers[nodePeer.id] = nodePeer
|
||||
}
|
||||
@ -195,15 +218,16 @@ func (n *node) topology(depth uint) *node {
|
||||
return node
|
||||
}
|
||||
|
||||
// Peers returns node peers
|
||||
// Peers returns node peers up to MaxDepth
|
||||
func (n *node) Peers() []Node {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
var peers []Node
|
||||
for _, nodePeer := range n.peers {
|
||||
peer := nodePeer.topology(MaxDepth)
|
||||
peer := nodePeer.Topology(MaxDepth)
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
n.RUnlock()
|
||||
|
||||
return peers
|
||||
}
|
||||
@ -236,7 +260,7 @@ func UnpackPeerTopology(pbPeer *pb.Peer, lastSeen time.Time, depth uint) *node {
|
||||
return peerNode
|
||||
}
|
||||
|
||||
func peerTopology(peer Node, depth uint) *pb.Peer {
|
||||
func peerProtoTopology(peer Node, depth uint) *pb.Peer {
|
||||
node := &pb.Node{
|
||||
Id: peer.Id(),
|
||||
Address: peer.Address(),
|
||||
@ -257,7 +281,7 @@ func peerTopology(peer Node, depth uint) *pb.Peer {
|
||||
|
||||
// iterate through peers of peers aka pops
|
||||
for _, pop := range peer.Peers() {
|
||||
peer := peerTopology(pop, depth)
|
||||
peer := peerProtoTopology(pop, depth)
|
||||
pbPeers.Peers = append(pbPeers.Peers, peer)
|
||||
}
|
||||
|
||||
@ -278,7 +302,7 @@ func PeersToProto(node Node, depth uint) *pb.Peer {
|
||||
}
|
||||
|
||||
for _, peer := range node.Peers() {
|
||||
pbPeer := peerTopology(peer, depth)
|
||||
pbPeer := peerProtoTopology(peer, depth)
|
||||
pbPeers.Peers = append(pbPeers.Peers, pbPeer)
|
||||
}
|
||||
|
||||
|
@ -117,6 +117,12 @@ func TestNodes(t *testing.T) {
|
||||
t.Errorf("Expected to find %s node", node.Id())
|
||||
}
|
||||
}
|
||||
|
||||
// this is a leaf node
|
||||
id := "peer11"
|
||||
if nodePeer := node.GetPeerNode(id); nodePeer == nil {
|
||||
t.Errorf("Expected to find %s node", id)
|
||||
}
|
||||
}
|
||||
|
||||
func collectPeerIds(peer Node, ids map[string]bool) map[string]bool {
|
||||
|
@ -337,6 +337,7 @@ func (r *router) advertiseTable() error {
|
||||
|
||||
// advertise all routes as Update events to subscribers
|
||||
if len(events) > 0 {
|
||||
log.Debugf("Network router flushing table with %d events: %s", len(events), r.options.Id)
|
||||
r.advertWg.Add(1)
|
||||
go r.publishAdvert(RouteUpdate, events)
|
||||
}
|
||||
@ -668,11 +669,13 @@ func (r *router) Process(a *Advert) error {
|
||||
for _, event := range events {
|
||||
// skip if the router is the origin of this route
|
||||
if event.Route.Router == r.options.Id {
|
||||
log.Debugf("Network router skipping processing its own route: %s", r.options.Id)
|
||||
continue
|
||||
}
|
||||
// create a copy of the route
|
||||
route := event.Route
|
||||
action := event.Type
|
||||
log.Debugf("Network router processing route action %s: %s", action, r.options.Id)
|
||||
if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil {
|
||||
return fmt.Errorf("failed applying action %s to routing table: %s", action, err)
|
||||
}
|
||||
|
@ -110,8 +110,12 @@ func (t *table) Update(r Route) error {
|
||||
if _, ok := t.routes[service][sum]; !ok {
|
||||
t.routes[service][sum] = r
|
||||
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
|
||||
return nil
|
||||
}
|
||||
|
||||
t.routes[service][sum] = r
|
||||
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user