1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

getProtoTopology has been replaced by PeersToProto

This helps us remove redundant code across node and handler
This commit is contained in:
Milos Gajdos 2019-09-11 23:56:57 +01:00
parent fa4ff8921e
commit 77c6c9781b
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
4 changed files with 65 additions and 96 deletions

View File

@ -276,7 +276,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
} }
n.Unlock() n.Unlock()
// get all the node peers down to MaxDepth encoded in protobuf // get all the node peers down to MaxDepth encoded in protobuf
msg := n.node.getProtoTopology(MaxDepth) msg := PeersToProto(n.node, n.Peers(), MaxDepth)
// advertise yourself to the network // advertise yourself to the network
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to advertise peers: %v", err) log.Debugf("Network failed to advertise peers: %v", err)
@ -393,7 +393,7 @@ func (n *network) announce(client transport.Client) {
case <-n.closed: case <-n.closed:
return return
case <-announce.C: case <-announce.C:
msg := n.node.getProtoTopology(MaxDepth) msg := PeersToProto(n.node, n.Peers(), MaxDepth)
// advertise yourself to the network // advertise yourself to the network
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to advertise peers: %v", err) log.Debugf("Network failed to advertise peers: %v", err)

View File

@ -15,35 +15,6 @@ type Network struct {
Network network.Network Network network.Network
} }
// toplogyToProto recursively traverses node topology and returns it
func peerTopology(peer network.Node, depth uint) *pbNet.Peer {
node := &pbNet.Node{
Id: peer.Id(),
Address: peer.Address(),
}
pbPeers := &pbNet.Peer{
Node: node,
Peers: make([]*pbNet.Peer, 0),
}
// return if we reached the end of topology or depth
if depth == 0 || len(peer.Peers()) == 0 {
return pbPeers
}
// decrement the depth
depth--
// iterate through peers of peers aka pops
for _, pop := range peer.Peers() {
peer := peerTopology(pop, depth)
pbPeers.Peers = append(pbPeers.Peers, peer)
}
return pbPeers
}
// ListPeers returns a list of all the nodes the node has a direct link with // ListPeers returns a list of all the nodes the node has a direct link with
func (n *Network) ListPeers(ctx context.Context, req *pbNet.PeerRequest, resp *pbNet.PeerResponse) error { func (n *Network) ListPeers(ctx context.Context, req *pbNet.PeerRequest, resp *pbNet.PeerResponse) error {
depth := uint(req.Depth) depth := uint(req.Depth)
@ -54,21 +25,8 @@ func (n *Network) ListPeers(ctx context.Context, req *pbNet.PeerRequest, resp *p
// get node peers // get node peers
nodePeers := n.Network.Peers() nodePeers := n.Network.Peers()
// network node aka root node // get peers encoded into protobuf
node := &pbNet.Node{ peers := network.PeersToProto(n.Network, nodePeers, depth)
Id: n.Network.Id(),
Address: n.Network.Address(),
}
// we will build proto topology into this
peers := &pbNet.Peer{
Node: node,
Peers: make([]*pbNet.Peer, 0),
}
for _, nodePeer := range nodePeers {
peer := peerTopology(nodePeer, depth)
peers.Peers = append(peers.Peers, peer)
}
resp.Peers = peers resp.Peers = peers

View File

@ -128,42 +128,22 @@ func (n *node) Peers() []Node {
return peers return peers
} }
// getProtoTopology returns node topology down to the given depth encoded in protobuf // updateTopology updates node peer topology down to given depth
func (n *node) getProtoTopology(depth uint) *pb.Peer { func (n *node) updatePeerTopology(pbPeer *pb.Peer, depth uint) error {
n.RLock() n.Lock()
defer n.RUnlock() defer n.Unlock()
node := &pb.Node{ if pbPeer == nil {
Id: n.id, return errors.New("peer not initialized")
Address: n.address,
} }
pbPeers := &pb.Peer{ // unpack Peer topology into *node
Node: node, peer := unpackPeer(pbPeer, depth)
Peers: make([]*pb.Peer, 0),
}
// return if have either reached the depth or have no more peers // update node peers with new topology
if depth == 0 || len(n.peers) == 0 { n.peers[pbPeer.Node.Id] = peer
return pbPeers
}
// decrement the depth return nil
depth--
var peers []*pb.Peer
for _, peer := range n.peers {
// get peers of the node peers
// NOTE: this is [not] a recursive call
pbPeerPeer := peer.getProtoTopology(depth)
// add current peer to explored peers
peers = append(peers, pbPeerPeer)
}
// add peers to the parent topology
pbPeers.Peers = peers
return pbPeers
} }
// unpackPeer unpacks pb.Peer into node topology of given depth // unpackPeer unpacks pb.Peer into node topology of given depth
@ -194,20 +174,51 @@ func unpackPeer(pbPeer *pb.Peer, depth uint) *node {
return peerNode return peerNode
} }
// updateTopology updates node peer topology down to given depth func peerTopology(peer Node, depth uint) *pb.Peer {
func (n *node) updatePeerTopology(pbPeer *pb.Peer, depth uint) error { node := &pb.Node{
n.Lock() Id: peer.Id(),
defer n.Unlock() Address: peer.Address(),
if pbPeer == nil {
return errors.New("peer not initialized")
} }
// unpack Peer topology into *node pbPeers := &pb.Peer{
peer := unpackPeer(pbPeer, depth) Node: node,
Peers: make([]*pb.Peer, 0),
}
// update node peers with new topology // return if we reached the end of topology or depth
n.peers[pbPeer.Node.Id] = peer if depth == 0 || len(peer.Peers()) == 0 {
return pbPeers
}
return nil // decrement the depth
depth--
// iterate through peers of peers aka pops
for _, pop := range peer.Peers() {
peer := peerTopology(pop, depth)
pbPeers.Peers = append(pbPeers.Peers, peer)
}
return pbPeers
}
// PeersToProto returns node peers graph encoded into protobuf
func PeersToProto(root Node, peers []Node, depth uint) *pb.Peer {
// network node aka root node
node := &pb.Node{
Id: root.Id(),
Address: root.Address(),
}
// we will build proto topology into this
pbPeers := &pb.Peer{
Node: node,
Peers: make([]*pb.Peer, 0),
}
for _, peer := range peers {
pbPeer := peerTopology(peer, depth)
pbPeers.Peers = append(pbPeers.Peers, pbPeer)
}
return pbPeers
} }

View File

@ -256,7 +256,7 @@ func TestUpdatePeerTopology(t *testing.T) {
} }
} }
func TestGetProtoTopology(t *testing.T) { func TestPeersToProto(t *testing.T) {
// single node // single node
single := &node{ single := &node{
id: testNodeId, id: testNodeId,
@ -266,10 +266,10 @@ func TestGetProtoTopology(t *testing.T) {
} }
topCount := 0 topCount := 0
protoTop := single.getProtoTopology(10) protoPeers := PeersToProto(single, single.Peers(), 0)
if len(protoTop.Peers) != topCount { if len(protoPeers.Peers) != topCount {
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoTop.Peers)) t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers))
} }
// complicated node graph // complicated node graph
@ -282,9 +282,9 @@ func TestGetProtoTopology(t *testing.T) {
peerIds[id] = true peerIds[id] = true
} }
// depth 1 should give us immmediate neighbours only // depth 1 should give us immmediate neighbours only
protoTop = node.getProtoTopology(1) protoPeers = PeersToProto(node, node.Peers(), 1)
if len(protoTop.Peers) != topCount { if len(protoPeers.Peers) != topCount {
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoTop.Peers)) t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers))
} }
} }