diff --git a/network/default.go b/network/default.go index 41119912..03cb8236 100644 --- a/network/default.go +++ b/network/default.go @@ -275,10 +275,10 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen lastSeen: now, } n.Unlock() - // get all the node peers down to MaxDepth encoded in protobuf message + // get all the node peers down to MaxDepth encoded in protobuf msg, err := n.node.getProtoTopology(MaxDepth) if err != nil { - log.Debugf("Network unable to retrieve node peers: %s", err) + log.Debugf("Network unable to retrieve node topology: %s", err) } // advertise yourself to the network if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { @@ -322,7 +322,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen // after adding new peer go to the next step continue } - // NOTE: we don't update max topology depth as we dont include this network node + // NOTE: we don't update MaxDepth toplogy as we dont update this node only its peers if err := n.node.updatePeerTopology(pbNetPeer, MaxDepth-1); err != nil { log.Debugf("Network failed to update peers") } @@ -396,14 +396,12 @@ func (n *network) announce(client transport.Client) { case <-n.closed: return case <-announce.C: - n.RLock() msg, err := n.node.getProtoTopology(MaxDepth) if err != nil { - log.Debugf("Network unable to retrieve node peers: %s", err) - n.RUnlock() + log.Debugf("Network unable to retrieve node topology: %s", err) continue } - n.RUnlock() + n.node.RUnlock() // advertise yourself to the network if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { log.Debugf("Network failed to advertise peers: %v", err) diff --git a/network/node.go b/network/node.go index 67a59b02..6c9d59a9 100644 --- a/network/node.go +++ b/network/node.go @@ -136,9 +136,11 @@ func (n *node) Topology(depth uint) *node { return node } -// getProtoTopology returns node peers up to given depth encoded in protobufs -// NOTE: this method is NOT thread-safe, so make sure you serialize access to it +// getProtoTopology returns node topology down to the given depth encoded in protobuf func (n *node) getProtoTopology(depth uint) (*pb.Peer, error) { + n.RLock() + defer n.RUnlock() + node := &pb.Node{ Id: n.id, Address: n.address, @@ -176,7 +178,7 @@ func (n *node) getProtoTopology(depth uint) (*pb.Peer, error) { } // unpackPeer unpacks pb.Peer into node topology of given depth -// NOTE: this method is NOT thread-safe, so make sure you serialize access to it +// NOTE: this function is not thread-safe func unpackPeer(pbPeer *pb.Peer, depth uint) *node { peerNode := &node{ id: pbPeer.Node.Id, @@ -203,16 +205,17 @@ func unpackPeer(pbPeer *pb.Peer, depth uint) *node { return peerNode } -// updatePeer updates node peer up to given depth -// NOTE: this method is not thread safe, so make sure you serialize access to it +// updateTopology updates node peer topology down to given depth func (n *node) updatePeerTopology(pbPeer *pb.Peer, depth uint) error { + n.Lock() + defer n.Unlock() + if pbPeer == nil { return errors.New("peer not initialized") } - // NOTE: we need MaxDepth-1 because node n is the parent adding which - // gives us the max peer topology we maintain and propagate - peer := unpackPeer(pbPeer, MaxDepth-1) + // unpack Peer topology into *node + peer := unpackPeer(pbPeer, depth) // update node peers with new topology n.peers[pbPeer.Node.Id] = peer diff --git a/network/node_test.go b/network/node_test.go index df5a5e4a..623958b3 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -2,6 +2,8 @@ package network import ( "testing" + + pb "github.com/micro/go-micro/network/proto" ) var ( @@ -229,3 +231,111 @@ func TestTopology(t *testing.T) { } } } + +func TestUpdatePeerTopology(t *testing.T) { + // single node + single := &node{ + id: testNodeId, + address: testNodeAddress, + peers: make(map[string]*node), + network: newNetwork(Name(testNodeNetName)), + } + // nil peer should return error + if err := single.updatePeerTopology(nil, 5); err == nil { + t.Errorf("Expected error, got %s", err) + } + + // update with peer that is not yet in the peer map + pbPeer := &pb.Peer{ + Node: &pb.Node{ + Id: "newPeer", + Address: "newPeerAddress", + }, + Peers: make([]*pb.Peer, 0), + } + // it should add pbPeer to the single node peers + if err := single.updatePeerTopology(pbPeer, 5); err != nil { + t.Errorf("Error updating topology: %s", err) + } + if _, ok := single.peers[pbPeer.Node.Id]; !ok { + t.Errorf("Expected %s to be added to %s peers", pbPeer.Node.Id, single.id) + } + + // complicated node graph + node := testSetup() + // build a simple topology to update node peer1 + peer1 := node.peers["peer1"] + pbPeer1Node := &pb.Node{ + Id: peer1.id, + Address: peer1.address, + } + + pbPeer111 := &pb.Peer{ + Node: &pb.Node{ + Id: "peer111", + Address: "peer111Address", + }, + Peers: make([]*pb.Peer, 0), + } + + pbPeer121 := &pb.Peer{ + Node: &pb.Node{ + Id: "peer121", + Address: "peer121Address", + }, + Peers: make([]*pb.Peer, 0), + } + // topology to update + pbPeer1 := &pb.Peer{ + Node: pbPeer1Node, + Peers: []*pb.Peer{pbPeer111, pbPeer121}, + } + // update peer1 topology + if err := node.updatePeerTopology(pbPeer1, 5); err != nil { + t.Errorf("Error updating topology: %s", err) + } + // make sure peer1 topology has been correctly updated + newPeerIds := []string{pbPeer111.Node.Id, pbPeer121.Node.Id} + for _, id := range newPeerIds { + if _, ok := node.peers["peer1"].peers[id]; !ok { + t.Errorf("Expected %s to be a peer of %s", id, "peer1") + } + } +} + +func TestGetProtoTopology(t *testing.T) { + // single node + single := &node{ + id: testNodeId, + address: testNodeAddress, + peers: make(map[string]*node), + network: newNetwork(Name(testNodeNetName)), + } + topCount := 0 + + protoTop, err := single.getProtoTopology(10) + if err != nil { + t.Errorf("Error getting proto topology: %s", err) + } + if len(protoTop.Peers) != topCount { + t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoTop.Peers)) + } + + // complicated node graph + node := testSetup() + topCount = 3 + // list of ids of nodes of depth 1 i.e. node peers + peerIds := make(map[string]bool) + // add peer Ids + for _, id := range testNodePeerIds { + peerIds[id] = true + } + // depth 1 should give us immmediate neighbours only + protoTop, err = node.getProtoTopology(1) + if err != nil { + t.Errorf("Error getting proto topology: %s", err) + } + if len(protoTop.Peers) != topCount { + t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoTop.Peers)) + } +}