mirror of
https://github.com/go-micro/go-micro.git
synced 2024-11-24 08:02:32 +02:00
Prune stale nodes in the whole topology.
This commit is contained in:
parent
2599ee8591
commit
a72a2f717d
@ -262,11 +262,11 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
||||
peers: make(map[string]*node),
|
||||
lastSeen: now,
|
||||
}
|
||||
if ok := n.node.AddPeer(peer); !ok {
|
||||
if err := n.node.AddPeer(peer); err == ErrPeerExists {
|
||||
log.Debugf("Network peer exists, refreshing: %s", peer.id)
|
||||
// update lastSeen time for the existing node
|
||||
if ok := n.RefreshPeer(peer.id, now); !ok {
|
||||
log.Debugf("Network failed refreshing peer: %s", peer.id)
|
||||
if err := n.RefreshPeer(peer.id, now); err != nil {
|
||||
log.Debugf("Network failed refreshing peer %s: %v", peer.id, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@ -299,7 +299,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
||||
peers: make(map[string]*node),
|
||||
lastSeen: now,
|
||||
}
|
||||
if ok := n.node.AddPeer(peer); ok {
|
||||
if err := n.node.AddPeer(peer); err == ErrPeerExists {
|
||||
// send a solicit message when discovering new peer
|
||||
msg := &pbRtr.Solicit{
|
||||
Id: n.options.Id,
|
||||
@ -311,14 +311,14 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
||||
}
|
||||
log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id)
|
||||
// update lastSeen time for the peer
|
||||
if ok := n.RefreshPeer(pbNetPeer.Node.Id, now); !ok {
|
||||
log.Debugf("Network failed refreshing peer: %s", pbNetPeer.Node.Id)
|
||||
if err := n.RefreshPeer(pbNetPeer.Node.Id, now); err != nil {
|
||||
log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err)
|
||||
}
|
||||
// NOTE: we don't unpack MaxDepth toplogy
|
||||
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
|
||||
log.Debugf("Network updating topology of node: %s", n.node.id)
|
||||
if ok := n.node.UpdatePeer(peer); !ok {
|
||||
log.Debugf("Network failed to update peers")
|
||||
if err := n.node.UpdatePeer(peer); err != nil {
|
||||
log.Debugf("Network failed to update peers: %v", err)
|
||||
}
|
||||
case "close":
|
||||
pbNetClose := &pbNet.Close{}
|
||||
@ -331,15 +331,16 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
||||
continue
|
||||
}
|
||||
log.Debugf("Network received close message from: %s", pbNetClose.Node.Id)
|
||||
n.node.Lock()
|
||||
peer := &node{
|
||||
id: pbNetClose.Node.Id,
|
||||
address: pbNetClose.Node.Address,
|
||||
}
|
||||
if err := n.prunePeer(peer); err != nil {
|
||||
log.Debugf("Network failed to prune node %s routes: %v", peer.id, err)
|
||||
if err := n.DeletePeerNode(peer.id); err != nil {
|
||||
log.Debugf("Network failed to delete node %s routes: %v", peer.id, err)
|
||||
}
|
||||
if err := n.prunePeerRoutes(peer); err != nil {
|
||||
log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err)
|
||||
}
|
||||
n.node.Unlock()
|
||||
}
|
||||
case <-n.closed:
|
||||
return
|
||||
@ -421,7 +422,6 @@ func (n *network) prunePeerRoutes(peer *node) error {
|
||||
router.QueryRouter(peer.id),
|
||||
)
|
||||
if err := n.pruneRoutes(q); err != nil {
|
||||
log.Debugf("Network failed deleting routes originated by %s: %s", peer.id, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -430,23 +430,12 @@ func (n *network) prunePeerRoutes(peer *node) error {
|
||||
router.QueryGateway(peer.address),
|
||||
)
|
||||
if err := n.pruneRoutes(q); err != nil {
|
||||
log.Debugf("Network failed deleting routes routable via gateway %s: %s", peer.address, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// prunePeer prune peer from network node as well as all all the routes associated with it
|
||||
func (n *network) prunePeer(peer *node) error {
|
||||
delete(n.node.peers, peer.id)
|
||||
if err := n.prunePeerRoutes(peer); err != nil {
|
||||
log.Debugf("Network failed to prune %s routes: %v", peer.id, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// prune deltes node peers that have not been seen for longer than PruneTime seconds
|
||||
// prune also removes all the routes either originated by or routable by the stale nodes
|
||||
func (n *network) prune() {
|
||||
@ -458,19 +447,13 @@ func (n *network) prune() {
|
||||
case <-n.closed:
|
||||
return
|
||||
case <-prune.C:
|
||||
n.node.Lock()
|
||||
for id, peer := range n.peers {
|
||||
if id == n.options.Id {
|
||||
continue
|
||||
}
|
||||
if time.Since(peer.lastSeen) > PruneTime {
|
||||
log.Debugf("Network peer exceeded prune time: %s", id)
|
||||
if err := n.prunePeer(peer); err != nil {
|
||||
log.Debugf("Network failed to prune %s: %s", id, err)
|
||||
}
|
||||
pruned := n.PruneStalePeerNodes(PruneTime)
|
||||
for id, peer := range pruned {
|
||||
log.Debugf("Network peer exceeded prune time: %s", id)
|
||||
if err := n.prunePeerRoutes(peer); err != nil {
|
||||
log.Debugf("Network failed pruning peer %s routes: %v", id, err)
|
||||
}
|
||||
}
|
||||
n.node.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
207
network/node.go
207
network/node.go
@ -2,6 +2,7 @@ package network
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -13,6 +14,13 @@ var (
|
||||
MaxDepth uint = 3
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrPeerExists is returned when adding a peer which already exists
|
||||
ErrPeerExists = errors.New("peer already exists")
|
||||
// ErrPeerNotFound is returned when a peer could not be found in node topology
|
||||
ErrPeerNotFound = errors.New("peer not found")
|
||||
)
|
||||
|
||||
// node is network node
|
||||
type node struct {
|
||||
sync.RWMutex
|
||||
@ -22,6 +30,8 @@ type node struct {
|
||||
address string
|
||||
// peers are nodes with direct link to this node
|
||||
peers map[string]*node
|
||||
// edges store the node edges
|
||||
edges map[string]map[string]*node
|
||||
// network returns the node network
|
||||
network Network
|
||||
// lastSeen keeps track of node lifetime and updates
|
||||
@ -43,74 +53,8 @@ func (n *node) Network() Network {
|
||||
return n.network
|
||||
}
|
||||
|
||||
// AddPeer adds a new peer to node topology
|
||||
// It returns false if the peer already exists
|
||||
func (n *node) AddPeer(peer *node) bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if _, ok := n.peers[peer.id]; !ok {
|
||||
n.peers[peer.id] = peer
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// UpdatePeer updates a peer if it exists
|
||||
// It returns false if the peer does not exist
|
||||
func (n *node) UpdatePeer(peer *node) bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if _, ok := n.peers[peer.id]; ok {
|
||||
n.peers[peer.id] = peer
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// DeletePeer deletes a peer from node peers
|
||||
// It returns true if the peers has been deleted
|
||||
func (n *node) DeletePeer(id string) bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
delete(n.peers, id)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// HasPeer returns true if node has peer with given id
|
||||
func (n *node) HasPeer(id string) bool {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
_, ok := n.peers[id]
|
||||
return ok
|
||||
}
|
||||
|
||||
// RefreshPeer updates node timestamp
|
||||
// It returns false if the peer has not been found.
|
||||
func (n *node) RefreshPeer(id string, now time.Time) bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
peer, ok := n.peers[id]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
if peer.lastSeen.Before(now) {
|
||||
peer.lastSeen = now
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// walk walks the node graph until some condition is met
|
||||
func (n *node) walk(until func(peer *node) bool) map[string]*node {
|
||||
func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)) map[string]*node {
|
||||
// track the visited nodes
|
||||
visited := make(map[string]*node)
|
||||
// queue of the nodes to visit
|
||||
@ -125,15 +69,16 @@ func (n *node) walk(until func(peer *node) bool) map[string]*node {
|
||||
for queue.Len() > 0 {
|
||||
// pop the node from the front of the queue
|
||||
qnode := queue.Front()
|
||||
if until(qnode.Value.(*node)) {
|
||||
return visited
|
||||
}
|
||||
// iterate through all of the node peers
|
||||
// mark the visited nodes; enqueue the non-visted
|
||||
for id, node := range qnode.Value.(*node).peers {
|
||||
for id, peer := range qnode.Value.(*node).peers {
|
||||
if _, ok := visited[id]; !ok {
|
||||
visited[id] = node
|
||||
queue.PushBack(node)
|
||||
}
|
||||
if until(node) {
|
||||
return visited
|
||||
visited[id] = peer
|
||||
action(qnode.Value.(*node), peer)
|
||||
queue.PushBack(peer)
|
||||
}
|
||||
}
|
||||
// remove the node from the queue
|
||||
@ -143,6 +88,63 @@ func (n *node) walk(until func(peer *node) bool) map[string]*node {
|
||||
return visited
|
||||
}
|
||||
|
||||
// AddPeer adds a new peer to node topology
|
||||
// It returns false if the peer already exists
|
||||
func (n *node) AddPeer(peer *node) error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if _, ok := n.peers[peer.id]; !ok {
|
||||
n.peers[peer.id] = peer
|
||||
return nil
|
||||
}
|
||||
|
||||
return ErrPeerExists
|
||||
}
|
||||
|
||||
// DeletePeer deletes a peer from node peers
|
||||
// It returns true if the peer has been deleted
|
||||
func (n *node) DeletePeer(id string) bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
delete(n.peers, id)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// UpdatePeer updates a peer if it already exists
|
||||
// It returns error if the peer does not exist
|
||||
func (n *node) UpdatePeer(peer *node) error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if _, ok := n.peers[peer.id]; ok {
|
||||
n.peers[peer.id] = peer
|
||||
return nil
|
||||
}
|
||||
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
|
||||
// RefreshPeer updates node timestamp
|
||||
// It returns false if the peer has not been found.
|
||||
func (n *node) RefreshPeer(id string, now time.Time) error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
peer, ok := n.peers[id]
|
||||
if !ok {
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
|
||||
if peer.lastSeen.Before(now) {
|
||||
peer.lastSeen = now
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Nodes returns a slice of all nodes in the whole node topology
|
||||
func (n *node) Nodes() []Node {
|
||||
// we need to freeze the network graph here
|
||||
@ -151,11 +153,12 @@ func (n *node) Nodes() []Node {
|
||||
defer n.RUnlock()
|
||||
|
||||
// NOTE: this should never be true
|
||||
untilNoMorePeers := func(n *node) bool {
|
||||
return n == nil
|
||||
untilNoMorePeers := func(node *node) bool {
|
||||
return node == nil
|
||||
}
|
||||
justWalk := func(parent, node *node) {}
|
||||
|
||||
visited := n.walk(untilNoMorePeers)
|
||||
visited := n.walk(untilNoMorePeers, justWalk)
|
||||
|
||||
var nodes []Node
|
||||
// collect all the nodes and return them
|
||||
@ -178,8 +181,9 @@ func (n *node) GetPeerNode(id string) *node {
|
||||
untilFoundPeer := func(n *node) bool {
|
||||
return n.id == id
|
||||
}
|
||||
justWalk := func(paent, node *node) {}
|
||||
|
||||
visited := top.walk(untilFoundPeer)
|
||||
visited := top.walk(untilFoundPeer, justWalk)
|
||||
|
||||
peerNode, ok := visited[id]
|
||||
if !ok {
|
||||
@ -189,6 +193,55 @@ func (n *node) GetPeerNode(id string) *node {
|
||||
return peerNode
|
||||
}
|
||||
|
||||
// DeletePeerNode removes peer node from node topology
|
||||
func (n *node) DeletePeerNode(id string) error {
|
||||
n.Lock()
|
||||
n.Unlock()
|
||||
|
||||
untilNoMorePeers := func(node *node) bool {
|
||||
return node == nil
|
||||
}
|
||||
|
||||
deleted := make(map[string]*node)
|
||||
deletePeer := func(parent, node *node) {
|
||||
if node.id != n.id && node.id == id {
|
||||
delete(parent.peers, node.id)
|
||||
deleted[node.id] = node
|
||||
}
|
||||
}
|
||||
|
||||
n.walk(untilNoMorePeers, deletePeer)
|
||||
|
||||
if _, ok := deleted[id]; !ok {
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PruneStalePeerNodes prune the peers that have not been seen for longer than given time
|
||||
// It returns a map of the the nodes that got pruned
|
||||
func (n *node) PruneStalePeerNodes(pruneTime time.Duration) map[string]*node {
|
||||
n.Lock()
|
||||
n.Unlock()
|
||||
|
||||
untilNoMorePeers := func(node *node) bool {
|
||||
return node == nil
|
||||
}
|
||||
|
||||
pruned := make(map[string]*node)
|
||||
pruneStalePeer := func(parent, node *node) {
|
||||
if node.id != n.id && time.Since(node.lastSeen) > PruneTime {
|
||||
delete(parent.peers, node.id)
|
||||
pruned[node.id] = node
|
||||
}
|
||||
}
|
||||
|
||||
n.walk(untilNoMorePeers, pruneStalePeer)
|
||||
|
||||
return pruned
|
||||
}
|
||||
|
||||
// Topology returns a copy of the node topology down to given depth
|
||||
// NOTE: the returned node is a node graph - not a single node
|
||||
func (n *node) Topology(depth uint) *node {
|
||||
|
@ -192,6 +192,46 @@ func TestPeers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeletePeerNode(t *testing.T) {
|
||||
// complicated node graph
|
||||
node := testSetup()
|
||||
|
||||
nodeCount := len(node.Nodes())
|
||||
|
||||
// should not find non-existent peer node
|
||||
if err := node.DeletePeerNode("foobar"); err != ErrPeerNotFound {
|
||||
t.Errorf("Expected: %v, got: %v", ErrPeerNotFound, err)
|
||||
}
|
||||
|
||||
// lets pick one of the peer1 peers
|
||||
if err := node.DeletePeerNode(testPeerOfPeerIds[0]); err != nil {
|
||||
t.Errorf("Error deleting peer node: %v", err)
|
||||
}
|
||||
|
||||
nodeDelCount := len(node.Nodes())
|
||||
|
||||
if nodeDelCount != nodeCount-1 {
|
||||
t.Errorf("Expected node count: %d, got: %d", nodeCount-1, nodeDelCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPruneStalePeerNodes(t *testing.T) {
|
||||
// complicated node graph
|
||||
node := testSetup()
|
||||
|
||||
nodes := node.Nodes()
|
||||
|
||||
pruneTime := 10 * time.Millisecond
|
||||
time.Sleep(pruneTime)
|
||||
|
||||
// should delete all nodes besides node
|
||||
pruned := node.PruneStalePeerNodes(pruneTime)
|
||||
|
||||
if len(pruned) != len(nodes)-1 {
|
||||
t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-1, len(pruned))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnpackPeerTopology(t *testing.T) {
|
||||
pbPeer := &pb.Peer{
|
||||
Node: &pb.Node{
|
||||
|
Loading…
Reference in New Issue
Block a user