From 802cc8239aff7bda3686a312020c044a6372af3b Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 9 Jan 2020 17:48:28 +0000 Subject: [PATCH 01/15] Send solicit message properly. Updated comments. --- network/default.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/network/default.go b/network/default.go index 7e78b992..afe2fdeb 100644 --- a/network/default.go +++ b/network/default.go @@ -774,10 +774,10 @@ func (n *network) processNetChan(listener tunnel.Listener) { // and wants to know what's on the network. The faster we // respond the faster we start to converge - // get node peers down to MaxDepth encoded in protobuf - msg := PeersToProto(n.node, MaxDepth) - go func() { + // get node peers down to MaxDepth encoded in protobuf + msg := PeersToProto(n.node, MaxDepth) + // advertise yourself to the new node if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { log.Debugf("Network failed to advertise peers: %v", err) @@ -785,8 +785,13 @@ func (n *network) processNetChan(listener tunnel.Listener) { <-time.After(time.Millisecond * 100) + // send a solicit message when discovering new peer + solicit := &pbRtr.Solicit{ + Id: n.options.Id, + } + // ask for the new nodes routes - if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil { + if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil { log.Debugf("Network failed to send solicit message: %s", err) } @@ -798,6 +803,8 @@ func (n *network) processNetChan(listener tunnel.Listener) { } // advertise all the routes when a new node has connected + // NOTE: this might unnecessarily flood network with advertisements + // every time a node's connection is flakey and it keeps reconnecting if err := n.router.Solicit(); err != nil { log.Debugf("Network failed to solicit routes: %s", err) } @@ -834,22 +841,23 @@ func (n *network) processNetChan(listener tunnel.Listener) { } if err := n.node.AddPeer(peer); err == nil { - // send a solicit message when discovering new peer - msg := &pbRtr.Solicit{ - Id: n.options.Id, - } - go func() { + msg := PeersToProto(n.node, MaxDepth) + // advertise yourself to the peer if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { log.Debugf("Network failed to advertise peers: %v", err) } - // wait for a second <-time.After(time.Millisecond * 100) + // send a solicit message when discovering new peer + solicit := &pbRtr.Solicit{ + Id: n.options.Id, + } + // then solicit this peer - if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil { + if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil { log.Debugf("Network failed to send solicit message: %s", err) } @@ -1318,6 +1326,9 @@ func (n *network) connect() { if !discovered { // recreate the clients because all the tunnel links are gone // so we haven't send discovery beneath + // NOTE: when starting the tunnel for the first time we might be recreating potentially + // well functioning tunnel clients as "discovered" will be false until the + // n.discovered channel is read at some point later on. if err := n.createClients(); err != nil { log.Debugf("Failed to recreate network/control clients: %v", err) continue From 63edfaa852b93d99e705024c5fe18d043d4d30f6 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 9 Jan 2020 17:49:06 +0000 Subject: [PATCH 02/15] Added Sync message Sync message will be sent between peers when a new node connects/joins the network --- network/service/proto/network.pb.go | 174 +++++++++++++++------- network/service/proto/network.pb.micro.go | 2 +- network/service/proto/network.proto | 12 +- 3 files changed, 129 insertions(+), 59 deletions(-) diff --git a/network/service/proto/network.pb.go b/network/service/proto/network.pb.go index d94687f5..ae036d62 100644 --- a/network/service/proto/network.pb.go +++ b/network/service/proto/network.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: github.com/micro/go-micro/network/proto/network.proto +// source: network.proto package go_micro_network @@ -37,7 +37,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{0} + return fileDescriptor_8571034d60397816, []int{0} } func (m *Query) XXX_Unmarshal(b []byte) error { @@ -104,7 +104,7 @@ func (m *ConnectRequest) Reset() { *m = ConnectRequest{} } func (m *ConnectRequest) String() string { return proto.CompactTextString(m) } func (*ConnectRequest) ProtoMessage() {} func (*ConnectRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{1} + return fileDescriptor_8571034d60397816, []int{1} } func (m *ConnectRequest) XXX_Unmarshal(b []byte) error { @@ -142,7 +142,7 @@ func (m *ConnectResponse) Reset() { *m = ConnectResponse{} } func (m *ConnectResponse) String() string { return proto.CompactTextString(m) } func (*ConnectResponse) ProtoMessage() {} func (*ConnectResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{2} + return fileDescriptor_8571034d60397816, []int{2} } func (m *ConnectResponse) XXX_Unmarshal(b []byte) error { @@ -176,7 +176,7 @@ func (m *NodesRequest) Reset() { *m = NodesRequest{} } func (m *NodesRequest) String() string { return proto.CompactTextString(m) } func (*NodesRequest) ProtoMessage() {} func (*NodesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{3} + return fileDescriptor_8571034d60397816, []int{3} } func (m *NodesRequest) XXX_Unmarshal(b []byte) error { @@ -217,7 +217,7 @@ func (m *NodesResponse) Reset() { *m = NodesResponse{} } func (m *NodesResponse) String() string { return proto.CompactTextString(m) } func (*NodesResponse) ProtoMessage() {} func (*NodesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{4} + return fileDescriptor_8571034d60397816, []int{4} } func (m *NodesResponse) XXX_Unmarshal(b []byte) error { @@ -257,7 +257,7 @@ func (m *GraphRequest) Reset() { *m = GraphRequest{} } func (m *GraphRequest) String() string { return proto.CompactTextString(m) } func (*GraphRequest) ProtoMessage() {} func (*GraphRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{5} + return fileDescriptor_8571034d60397816, []int{5} } func (m *GraphRequest) XXX_Unmarshal(b []byte) error { @@ -296,7 +296,7 @@ func (m *GraphResponse) Reset() { *m = GraphResponse{} } func (m *GraphResponse) String() string { return proto.CompactTextString(m) } func (*GraphResponse) ProtoMessage() {} func (*GraphResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{6} + return fileDescriptor_8571034d60397816, []int{6} } func (m *GraphResponse) XXX_Unmarshal(b []byte) error { @@ -336,7 +336,7 @@ func (m *RoutesRequest) Reset() { *m = RoutesRequest{} } func (m *RoutesRequest) String() string { return proto.CompactTextString(m) } func (*RoutesRequest) ProtoMessage() {} func (*RoutesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{7} + return fileDescriptor_8571034d60397816, []int{7} } func (m *RoutesRequest) XXX_Unmarshal(b []byte) error { @@ -375,7 +375,7 @@ func (m *RoutesResponse) Reset() { *m = RoutesResponse{} } func (m *RoutesResponse) String() string { return proto.CompactTextString(m) } func (*RoutesResponse) ProtoMessage() {} func (*RoutesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{8} + return fileDescriptor_8571034d60397816, []int{8} } func (m *RoutesResponse) XXX_Unmarshal(b []byte) error { @@ -413,7 +413,7 @@ func (m *ServicesRequest) Reset() { *m = ServicesRequest{} } func (m *ServicesRequest) String() string { return proto.CompactTextString(m) } func (*ServicesRequest) ProtoMessage() {} func (*ServicesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{9} + return fileDescriptor_8571034d60397816, []int{9} } func (m *ServicesRequest) XXX_Unmarshal(b []byte) error { @@ -445,7 +445,7 @@ func (m *ServicesResponse) Reset() { *m = ServicesResponse{} } func (m *ServicesResponse) String() string { return proto.CompactTextString(m) } func (*ServicesResponse) ProtoMessage() {} func (*ServicesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{10} + return fileDescriptor_8571034d60397816, []int{10} } func (m *ServicesResponse) XXX_Unmarshal(b []byte) error { @@ -492,7 +492,7 @@ func (m *Node) Reset() { *m = Node{} } func (m *Node) String() string { return proto.CompactTextString(m) } func (*Node) ProtoMessage() {} func (*Node) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{11} + return fileDescriptor_8571034d60397816, []int{11} } func (m *Node) XXX_Unmarshal(b []byte) error { @@ -554,7 +554,7 @@ func (m *Connect) Reset() { *m = Connect{} } func (m *Connect) String() string { return proto.CompactTextString(m) } func (*Connect) ProtoMessage() {} func (*Connect) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{12} + return fileDescriptor_8571034d60397816, []int{12} } func (m *Connect) XXX_Unmarshal(b []byte) error { @@ -595,7 +595,7 @@ func (m *Close) Reset() { *m = Close{} } func (m *Close) String() string { return proto.CompactTextString(m) } func (*Close) ProtoMessage() {} func (*Close) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{13} + return fileDescriptor_8571034d60397816, []int{13} } func (m *Close) XXX_Unmarshal(b []byte) error { @@ -638,7 +638,7 @@ func (m *Peer) Reset() { *m = Peer{} } func (m *Peer) String() string { return proto.CompactTextString(m) } func (*Peer) ProtoMessage() {} func (*Peer) Descriptor() ([]byte, []int) { - return fileDescriptor_0b7953b26a7c4730, []int{14} + return fileDescriptor_8571034d60397816, []int{14} } func (m *Peer) XXX_Unmarshal(b []byte) error { @@ -673,6 +673,65 @@ func (m *Peer) GetPeers() []*Peer { return nil } +// Sync is network sync message +type Sync struct { + // node address + Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + // node peers + Peers []*Peer `protobuf:"bytes,2,rep,name=peers,proto3" json:"peers,omitempty"` + // node routes + Routes []*proto1.Route `protobuf:"bytes,3,rep,name=routes,proto3" json:"routes,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Sync) Reset() { *m = Sync{} } +func (m *Sync) String() string { return proto.CompactTextString(m) } +func (*Sync) ProtoMessage() {} +func (*Sync) Descriptor() ([]byte, []int) { + return fileDescriptor_8571034d60397816, []int{15} +} + +func (m *Sync) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Sync.Unmarshal(m, b) +} +func (m *Sync) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Sync.Marshal(b, m, deterministic) +} +func (m *Sync) XXX_Merge(src proto.Message) { + xxx_messageInfo_Sync.Merge(m, src) +} +func (m *Sync) XXX_Size() int { + return xxx_messageInfo_Sync.Size(m) +} +func (m *Sync) XXX_DiscardUnknown() { + xxx_messageInfo_Sync.DiscardUnknown(m) +} + +var xxx_messageInfo_Sync proto.InternalMessageInfo + +func (m *Sync) GetAddress() string { + if m != nil { + return m.Address + } + return "" +} + +func (m *Sync) GetPeers() []*Peer { + if m != nil { + return m.Peers + } + return nil +} + +func (m *Sync) GetRoutes() []*proto1.Route { + if m != nil { + return m.Routes + } + return nil +} + func init() { proto.RegisterType((*Query)(nil), "go.micro.network.Query") proto.RegisterType((*ConnectRequest)(nil), "go.micro.network.ConnectRequest") @@ -690,48 +749,49 @@ func init() { proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") proto.RegisterType((*Close)(nil), "go.micro.network.Close") proto.RegisterType((*Peer)(nil), "go.micro.network.Peer") + proto.RegisterType((*Sync)(nil), "go.micro.network.Sync") } -func init() { - proto.RegisterFile("github.com/micro/go-micro/network/proto/network.proto", fileDescriptor_0b7953b26a7c4730) -} +func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } -var fileDescriptor_0b7953b26a7c4730 = []byte{ - // 576 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x61, 0x6a, 0xdb, 0x4c, - 0x10, 0x8d, 0x2c, 0xcb, 0x76, 0xe6, 0x8b, 0xfd, 0xb9, 0x4b, 0x49, 0x85, 0x7e, 0xb4, 0xee, 0xe2, - 0x1f, 0xa1, 0x34, 0x32, 0x24, 0x04, 0x4a, 0x4d, 0x43, 0x20, 0x94, 0x42, 0x21, 0x21, 0x55, 0x2e, - 0x50, 0xc5, 0x1a, 0x6c, 0x93, 0x58, 0xeb, 0xac, 0xd6, 0x09, 0x3e, 0x41, 0x8f, 0xd0, 0x33, 0xf5, - 0x56, 0x65, 0x77, 0x47, 0x8a, 0x1d, 0xcb, 0xa2, 0xf9, 0xe7, 0xd1, 0xbc, 0xf7, 0x66, 0x67, 0xe6, - 0x8d, 0xe1, 0x64, 0x3c, 0x55, 0x93, 0xc5, 0x4d, 0x38, 0x12, 0xb3, 0xc1, 0x6c, 0x3a, 0x92, 0x62, - 0x30, 0x16, 0x87, 0xf6, 0x47, 0x8a, 0xea, 0x51, 0xc8, 0xdb, 0xc1, 0x5c, 0x0a, 0x55, 0x44, 0xa1, - 0x89, 0x58, 0x77, 0x2c, 0x42, 0x83, 0x0a, 0xe9, 0x7b, 0x30, 0xdc, 0x2e, 0x24, 0xc5, 0x42, 0xa1, - 0x1c, 0x64, 0x28, 0x1f, 0xa6, 0x23, 0x24, 0x3d, 0xfb, 0xd1, 0xca, 0xf1, 0x5f, 0x0e, 0x78, 0x3f, - 0x16, 0x28, 0x97, 0xcc, 0x87, 0x26, 0xe1, 0x7c, 0xa7, 0xe7, 0x1c, 0xec, 0x46, 0x79, 0xa8, 0x33, - 0x71, 0x92, 0x48, 0xcc, 0x32, 0xbf, 0x66, 0x33, 0x14, 0xea, 0xcc, 0x38, 0x56, 0xf8, 0x18, 0x2f, - 0x7d, 0xd7, 0x66, 0x28, 0x64, 0xfb, 0xd0, 0xb0, 0x75, 0xfc, 0xba, 0x49, 0x50, 0xa4, 0x19, 0xf4, - 0x6e, 0xdf, 0xb3, 0x0c, 0x0a, 0xf9, 0x29, 0x74, 0xce, 0x45, 0x9a, 0xe2, 0x48, 0x45, 0x78, 0xbf, - 0xc0, 0x4c, 0xb1, 0x8f, 0xe0, 0xa5, 0x22, 0xc1, 0xcc, 0x77, 0x7a, 0xee, 0xc1, 0x7f, 0x47, 0xfb, - 0xe1, 0xf3, 0xd6, 0xc3, 0x4b, 0x91, 0x60, 0x64, 0x41, 0xfc, 0x15, 0xfc, 0x5f, 0xf0, 0xb3, 0xb9, - 0x48, 0x33, 0xe4, 0x7d, 0xd8, 0xd3, 0x88, 0x2c, 0x17, 0x7c, 0x0d, 0x5e, 0x82, 0x73, 0x35, 0x31, - 0x0d, 0xb6, 0x23, 0x1b, 0xf0, 0x2f, 0xd0, 0x26, 0x94, 0xa5, 0xbd, 0xb0, 0x6e, 0x1f, 0xf6, 0xbe, - 0xc9, 0x78, 0x3e, 0xa9, 0x2e, 0x32, 0x84, 0x36, 0xa1, 0xa8, 0xc8, 0x07, 0xa8, 0x4b, 0x21, 0x94, - 0x41, 0x95, 0xd6, 0xb8, 0x42, 0x94, 0x91, 0xc1, 0xf0, 0x53, 0x68, 0x47, 0x7a, 0x7c, 0x45, 0x23, - 0x87, 0xe0, 0xdd, 0xeb, 0xa5, 0x11, 0xfb, 0xcd, 0x26, 0xdb, 0xec, 0x34, 0xb2, 0x28, 0x7e, 0x06, - 0x9d, 0x9c, 0x4f, 0xd5, 0x43, 0x5a, 0x4f, 0x49, 0x8f, 0x64, 0x0f, 0x43, 0xa0, 0xb5, 0x99, 0xe1, - 0x5e, 0x5b, 0x37, 0xe4, 0x6f, 0xe0, 0x21, 0x74, 0x9f, 0x3e, 0x91, 0x6c, 0x00, 0x2d, 0x32, 0x8d, - 0x15, 0xde, 0x8d, 0x8a, 0x98, 0xff, 0x71, 0xa0, 0xae, 0xe7, 0xc6, 0x3a, 0x50, 0x9b, 0x26, 0xe4, - 0xb1, 0xda, 0x34, 0xa9, 0xb6, 0x57, 0x6e, 0x16, 0x77, 0xcd, 0x2c, 0xec, 0x0c, 0x5a, 0x33, 0x54, - 0x71, 0x12, 0xab, 0xd8, 0xaf, 0x9b, 0x0e, 0xfa, 0xe5, 0x5b, 0x0a, 0x2f, 0x08, 0xf6, 0x35, 0x55, - 0x72, 0x19, 0x15, 0xac, 0x60, 0x08, 0xed, 0xb5, 0x14, 0xeb, 0x82, 0x7b, 0x8b, 0x4b, 0x7a, 0x97, - 0xfe, 0xa9, 0x37, 0xf9, 0x10, 0xdf, 0x2d, 0x90, 0x9e, 0x65, 0x83, 0xcf, 0xb5, 0x4f, 0x0e, 0x3f, - 0x81, 0x26, 0x79, 0x4d, 0xef, 0x51, 0xfb, 0x60, 0xfb, 0x1e, 0x8d, 0x57, 0x0c, 0x86, 0x1f, 0x83, - 0x77, 0x7e, 0x27, 0xec, 0xf2, 0xff, 0x99, 0xf4, 0x13, 0xea, 0xda, 0x0a, 0x2f, 0xe1, 0x68, 0x07, - 0xcf, 0x11, 0xa5, 0x1e, 0xa8, 0x5b, 0xe1, 0x2e, 0x0b, 0x3a, 0xfa, 0xed, 0x42, 0xf3, 0x92, 0x06, - 0x7b, 0xf5, 0xd4, 0x59, 0x6f, 0x93, 0xb5, 0x7e, 0xa0, 0xc1, 0xfb, 0x0a, 0x04, 0x9d, 0xe0, 0x0e, - 0xfb, 0x0e, 0x9e, 0x71, 0x3e, 0x7b, 0xbb, 0x89, 0x5e, 0x3d, 0x9c, 0xe0, 0xdd, 0xd6, 0xfc, 0xaa, - 0x96, 0x39, 0xd5, 0x32, 0xad, 0xd5, 0x4b, 0x2f, 0xd3, 0x5a, 0xbb, 0x71, 0xbe, 0xc3, 0x2e, 0xa0, - 0x61, 0x8f, 0x82, 0x95, 0x80, 0xd7, 0xce, 0x2d, 0xe8, 0x6d, 0x07, 0x14, 0x72, 0xd7, 0xd0, 0xca, - 0xcf, 0x81, 0x95, 0xcc, 0xe5, 0xd9, 0xf5, 0x04, 0xbc, 0x0a, 0x92, 0x8b, 0xde, 0x34, 0xcc, 0x9f, - 0xf4, 0xf1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa7, 0x5b, 0x0a, 0x25, 0x2c, 0x06, 0x00, 0x00, +var fileDescriptor_8571034d60397816 = []byte{ + // 594 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xc1, 0x6a, 0xdb, 0x40, + 0x10, 0x8d, 0x2c, 0x29, 0x76, 0xa6, 0x91, 0xeb, 0x2e, 0x25, 0x15, 0x3a, 0xb4, 0xee, 0xe2, 0x43, + 0x28, 0x8d, 0x0c, 0x09, 0x85, 0x52, 0xd3, 0x10, 0x08, 0xa5, 0x50, 0x48, 0x48, 0xe5, 0x1f, 0xa8, + 0x62, 0x2d, 0xb6, 0x49, 0xac, 0x75, 0x56, 0xeb, 0x04, 0x5f, 0x7a, 0xed, 0x27, 0xf4, 0x9b, 0xfa, + 0x57, 0x65, 0x77, 0x47, 0x8e, 0x14, 0xcb, 0x22, 0xbe, 0x79, 0x34, 0x6f, 0xde, 0xec, 0xce, 0x7b, + 0xb3, 0x06, 0x2f, 0x65, 0xf2, 0x81, 0x8b, 0x9b, 0x70, 0x2e, 0xb8, 0xe4, 0xa4, 0x33, 0xe6, 0xe1, + 0x6c, 0x3a, 0x12, 0x3c, 0xc4, 0xef, 0xc1, 0x60, 0x3c, 0x95, 0x93, 0xc5, 0x75, 0x38, 0xe2, 0xb3, + 0xbe, 0xce, 0xf4, 0xc7, 0xfc, 0xc8, 0xfc, 0x10, 0x7c, 0x21, 0x99, 0xe8, 0x67, 0x4c, 0xdc, 0x4f, + 0x47, 0xac, 0xaf, 0x19, 0xf0, 0xa3, 0xa1, 0xa3, 0x7f, 0x2c, 0x70, 0x7f, 0x2e, 0x98, 0x58, 0x12, + 0x1f, 0x9a, 0x88, 0xf3, 0xad, 0xae, 0x75, 0xb8, 0x17, 0xe5, 0xa1, 0xca, 0xc4, 0x49, 0x22, 0x58, + 0x96, 0xf9, 0x0d, 0x93, 0xc1, 0x50, 0x65, 0xc6, 0xb1, 0x64, 0x0f, 0xf1, 0xd2, 0xb7, 0x4d, 0x06, + 0x43, 0x72, 0x00, 0xbb, 0xa6, 0x8f, 0xef, 0xe8, 0x04, 0x46, 0xaa, 0x02, 0xcf, 0xed, 0xbb, 0xa6, + 0x02, 0x43, 0x7a, 0x0a, 0xed, 0x73, 0x9e, 0xa6, 0x6c, 0x24, 0x23, 0x76, 0xb7, 0x60, 0x99, 0x24, + 0x1f, 0xc1, 0x4d, 0x79, 0xc2, 0x32, 0xdf, 0xea, 0xda, 0x87, 0x2f, 0x8e, 0x0f, 0xc2, 0xa7, 0x57, + 0x0f, 0x2f, 0x79, 0xc2, 0x22, 0x03, 0xa2, 0xaf, 0xe0, 0xe5, 0xaa, 0x3e, 0x9b, 0xf3, 0x34, 0x63, + 0xb4, 0x07, 0xfb, 0x0a, 0x91, 0xe5, 0x84, 0xaf, 0xc1, 0x4d, 0xd8, 0x5c, 0x4e, 0xf4, 0x05, 0xbd, + 0xc8, 0x04, 0xf4, 0x2b, 0x78, 0x88, 0x32, 0x65, 0x5b, 0xf6, 0xed, 0xc1, 0xfe, 0x77, 0x11, 0xcf, + 0x27, 0xf5, 0x4d, 0x06, 0xe0, 0x21, 0x0a, 0x9b, 0x7c, 0x00, 0x47, 0x70, 0x2e, 0x35, 0xaa, 0xb2, + 0xc7, 0x15, 0x63, 0x22, 0xd2, 0x18, 0x7a, 0x0a, 0x5e, 0xa4, 0xc6, 0xb7, 0xba, 0xc8, 0x11, 0xb8, + 0x77, 0x4a, 0x34, 0xac, 0x7e, 0xb3, 0x5e, 0xad, 0x35, 0x8d, 0x0c, 0x8a, 0x9e, 0x41, 0x3b, 0xaf, + 0xc7, 0xee, 0x21, 0xca, 0x53, 0x71, 0x47, 0xb4, 0x87, 0x2e, 0x40, 0xd9, 0xf4, 0x70, 0x87, 0xc6, + 0x0d, 0xf9, 0x19, 0x68, 0x08, 0x9d, 0xc7, 0x4f, 0x48, 0x1b, 0x40, 0x0b, 0x4d, 0x63, 0x88, 0xf7, + 0xa2, 0x55, 0x4c, 0xff, 0x59, 0xe0, 0xa8, 0xb9, 0x91, 0x36, 0x34, 0xa6, 0x09, 0x7a, 0xac, 0x31, + 0x4d, 0xea, 0xed, 0x95, 0x9b, 0xc5, 0x2e, 0x99, 0x85, 0x9c, 0x41, 0x6b, 0xc6, 0x64, 0x9c, 0xc4, + 0x32, 0xf6, 0x1d, 0x7d, 0x83, 0x5e, 0xb5, 0x4a, 0xe1, 0x05, 0xc2, 0xbe, 0xa5, 0x52, 0x2c, 0xa3, + 0x55, 0x55, 0x30, 0x00, 0xaf, 0x94, 0x22, 0x1d, 0xb0, 0x6f, 0xd8, 0x12, 0xcf, 0xa5, 0x7e, 0x2a, + 0x25, 0xef, 0xe3, 0xdb, 0x05, 0xc3, 0x63, 0x99, 0xe0, 0x4b, 0xe3, 0xb3, 0x45, 0x3f, 0x41, 0x13, + 0xbd, 0xa6, 0x74, 0x54, 0x3e, 0xd8, 0xac, 0xa3, 0xf6, 0x8a, 0xc6, 0xd0, 0x13, 0x70, 0xcf, 0x6f, + 0xb9, 0x11, 0xff, 0xd9, 0x45, 0xbf, 0xc0, 0x51, 0x56, 0xd8, 0xa6, 0x46, 0x39, 0x78, 0xce, 0x98, + 0x50, 0x03, 0xb5, 0x6b, 0xdc, 0x65, 0x40, 0xf4, 0x37, 0x38, 0xc3, 0x65, 0x3a, 0x2a, 0x0a, 0x61, + 0x95, 0x85, 0xd8, 0x8a, 0xaf, 0x60, 0x2e, 0xfb, 0x39, 0xe6, 0x3a, 0xfe, 0x6b, 0x43, 0xf3, 0x12, + 0x85, 0xbd, 0x7a, 0x9c, 0x6c, 0x77, 0xbd, 0x4b, 0xf9, 0x81, 0x08, 0xde, 0xd7, 0x20, 0xf0, 0x09, + 0xd8, 0x21, 0x3f, 0xc0, 0xd5, 0x9b, 0x47, 0xde, 0xae, 0xa3, 0x8b, 0x8b, 0x1b, 0xbc, 0xdb, 0x98, + 0x2f, 0x72, 0xe9, 0xa7, 0xa2, 0x8a, 0xab, 0xf8, 0xd2, 0x54, 0x71, 0x95, 0xde, 0x18, 0xba, 0x43, + 0x2e, 0x60, 0xd7, 0x2c, 0x25, 0xa9, 0x00, 0x97, 0xd6, 0x3d, 0xe8, 0x6e, 0x06, 0xac, 0xe8, 0x86, + 0xd0, 0xca, 0xd7, 0x91, 0x54, 0xcc, 0xe5, 0xc9, 0xf6, 0x06, 0xb4, 0x0e, 0x92, 0x93, 0x5e, 0xef, + 0xea, 0x3f, 0x89, 0x93, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff, 0x78, 0x0e, 0x14, 0x60, 0x84, 0x06, + 0x00, 0x00, } diff --git a/network/service/proto/network.pb.micro.go b/network/service/proto/network.pb.micro.go index 488d0c97..cc01d095 100644 --- a/network/service/proto/network.pb.micro.go +++ b/network/service/proto/network.pb.micro.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-micro. DO NOT EDIT. -// source: github.com/micro/go-micro/network/proto/network.proto +// source: network.proto package go_micro_network diff --git a/network/service/proto/network.proto b/network/service/proto/network.proto index b4dece64..b498e1bb 100644 --- a/network/service/proto/network.proto +++ b/network/service/proto/network.proto @@ -54,7 +54,7 @@ message GraphResponse { Peer root = 1; } -message RoutesRequest { +message RoutesRequest { // filter based on Query query = 1; } @@ -100,3 +100,13 @@ message Peer { // node peers repeated Peer peers = 2; } + +// Sync is network sync message +message Sync { + // node address + string address = 1; + // node peers + repeated Peer peers = 2; + // node routes + repeated go.micro.router.Route routes = 3; +} From 0a4bd02503d6088396da62c1cb723840d0ebefec Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 10 Jan 2020 10:43:07 +0000 Subject: [PATCH 03/15] Add RefreshSync method for Sync bookkeeping --- network/node.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/network/node.go b/network/node.go index 0d79ffc5..ec9b9962 100644 --- a/network/node.go +++ b/network/node.go @@ -36,6 +36,8 @@ type node struct { network Network // lastSeen keeps track of node lifetime and updates lastSeen time.Time + // lastSync keeps track of node last sync request + lastSync time.Time } // Id is node ide @@ -127,7 +129,7 @@ func (n *node) UpdatePeer(peer *node) error { return ErrPeerNotFound } -// RefreshPeer updates node timestamp +// RefreshPeer updates node last seen timestamp // It returns false if the peer has not been found. func (n *node) RefreshPeer(id, link string, now time.Time) error { n.Lock() @@ -146,6 +148,16 @@ func (n *node) RefreshPeer(id, link string, now time.Time) error { return nil } +// RefreshSync refreshes nodes sync time +func (n *node) RefreshSync(now time.Time) error { + n.Lock() + defer n.Unlock() + + n.lastSync = now + + return nil +} + // Nodes returns a slice of all nodes in the whole node topology func (n *node) Nodes() []Node { // we need to freeze the network graph here From bf42c028fbebad7c186fa240d998dfb4c7d5f4e5 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 10 Jan 2020 11:57:34 +0000 Subject: [PATCH 04/15] Added sync message. Refactored connect flow. Adverts are gossipped. This commit adds a Sync message which is sent as a reply to Connect message. This should in theory speed up convergence of a (re)connecting node. We respond to Sync message by sending a peer message back to the peer origin of the Sync message. We consequently update our routing table and peer graph with the data sent in via Sync message. We now gossip advertse to up to 3 randomly selected peers instead of sending a multicast message to the network. Equally, Solicitation i.e. full table adverts are gossipped to a randomly selected peer. If that fails we send a multicast message to the network. --- network/default.go | 168 +++++++++++++++++++++++----- network/service/proto/network.pb.go | 97 ++++++++-------- network/service/proto/network.proto | 8 +- util/proto/proto.go | 33 ++++++ 4 files changed, 222 insertions(+), 84 deletions(-) create mode 100644 util/proto/proto.go diff --git a/network/default.go b/network/default.go index afe2fdeb..ce5f7703 100644 --- a/network/default.go +++ b/network/default.go @@ -6,6 +6,7 @@ import ( "hash/fnv" "io" "math" + "math/rand" "sort" "sync" "time" @@ -25,6 +26,7 @@ import ( tun "github.com/micro/go-micro/tunnel/transport" "github.com/micro/go-micro/util/backoff" "github.com/micro/go-micro/util/log" + pbUtil "github.com/micro/go-micro/util/proto" ) var ( @@ -270,9 +272,9 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *message) { } } -// handleCtrlConn handles ControlChannel connections // advertise advertises routes to the network func (n *network) advertise(advertChan <-chan *router.Advert) { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) hasher := fnv.New64() for { select { @@ -322,11 +324,22 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { Events: events, } - // send the advert to all on the control channel - // since its not a solicitation + // send the advert to a select number of random peers if advert.Type != router.Solicitation { - if err := n.sendMsg("advert", ControlChannel, msg); err != nil { - log.Debugf("Network failed to advertise routes: %v", err) + // get a list of node peers + peers := n.Peers() + + // advertise to max 3 peers + max := len(peers) + if max > 3 { + max = 3 + } + for i := 0; i < max; i++ { + if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { + if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { + log.Debugf("Network failed to advertise routes: %v", err) + } + } } continue } @@ -338,8 +351,16 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { // someone requested the route n.sendTo("advert", ControlChannel, peer, msg) default: - if err := n.sendMsg("advert", ControlChannel, msg); err != nil { - log.Debugf("Network failed to advertise routes: %v", err) + // get a list of node peers + peers := n.Peers() + // pick a random peer from the list of peers + peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()) + if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { + log.Debugf("Network failed to advertise routes to %s: %v, sending multicast", err, peer.Id()) + // send a multicast message if we fail to send Unicast message + if err := n.sendMsg("advert", ControlChannel, msg); err != nil { + log.Debugf("Network failed to advertise routes: %v", err) + } } } case <-n.closed: @@ -460,6 +481,7 @@ func (n *network) handleNetConn(s tunnel.Session, msg chan *message) { } } +// handleCtrlConn handles ControlChannel connections func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { for { m := new(transport.Message) @@ -733,8 +755,8 @@ func (n *network) processNetChan(listener tunnel.Listener) { case "connect": // mark the time the message has been received now := time.Now() - pbNetConnect := &pbNet.Connect{} + pbNetConnect := &pbNet.Connect{} if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil { log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) continue @@ -757,35 +779,61 @@ func (n *network) processNetChan(listener tunnel.Listener) { // update peer links + // TODO: should we do this only if we manage to add a peer + // What should we do if the peer links failed to be updated? if err := n.updatePeerLinks(peer); err != nil { log.Debugf("Network failed updating peer links: %s", err) } // add peer to the list of node peers - if err := n.node.AddPeer(peer); err == ErrPeerExists { + if err := n.AddPeer(peer); err == ErrPeerExists { log.Tracef("Network peer exists, refreshing: %s", peer.id) - // update lastSeen time for the existing node + // update lastSeen time for the peer if err := n.RefreshPeer(peer.id, peer.link, now); err != nil { log.Debugf("Network failed refreshing peer %s: %v", peer.id, err) } } - // we send the peer message because someone has sent connect - // and wants to know what's on the network. The faster we - // respond the faster we start to converge + // we send the sync message because someone has sent connect + // and wants to either connect or reconnect to the network + // The faster it gets the network config (routes and peer graph) + // the faster the network converges to a stable state go func() { - // get node peers down to MaxDepth encoded in protobuf - msg := PeersToProto(n.node, MaxDepth) + // get node peer graph to send back to the connecting node + node := PeersToProto(n.node, MaxDepth) - // advertise yourself to the new node - if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise peers: %v", err) + msg := &pbNet.Sync{ + Peer: node, } + // get a list of all of our routes + routes, err := n.options.Router.Table().List() + switch err { + case nil: + // encode the routes to protobuf + pbRoutes := make([]*pbRtr.Route, 0, len(routes)) + for _, route := range routes { + pbRoute := pbUtil.RouteToProto(route) + pbRoutes = append(pbRoutes, pbRoute) + } + // pack the routes into the sync message + msg.Routes = pbRoutes + default: + // we can't list the routes + log.Debugf("Network node %s failed listing routes: %v", n.id, err) + } + + // send sync message to the newly connected peer + if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { + log.Debugf("Network failed to send sync message: %v", err) + } + // wait for a short period of time before sending a solicit message <-time.After(time.Millisecond * 100) // send a solicit message when discovering new peer + // this triggers the node to flush its routing table to the network + // and leads to faster convergence of the network solicit := &pbRtr.Solicit{ Id: n.options.Id, } @@ -801,13 +849,6 @@ func (n *network) processNetChan(listener tunnel.Listener) { default: // don't block } - - // advertise all the routes when a new node has connected - // NOTE: this might unnecessarily flood network with advertisements - // every time a node's connection is flakey and it keeps reconnecting - if err := n.router.Solicit(); err != nil { - log.Debugf("Network failed to solicit routes: %s", err) - } }() case "peer": // mark the time the message has been received @@ -836,10 +877,13 @@ func (n *network) processNetChan(listener tunnel.Listener) { // update peer links + // TODO: should we do this only if we manage to add a peer + // What should we do if the peer links failed to be updated? if err := n.updatePeerLinks(peer); err != nil { log.Debugf("Network failed updating peer links: %s", err) } + // if it's a new peer i.e. we do not have it in our graph, we solicit its routes if err := n.node.AddPeer(peer); err == nil { go func() { msg := PeersToProto(n.node, MaxDepth) @@ -875,7 +919,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { }() continue - // we're expecting any error to be ErrPeerExists + // if we already have the peer in our graph, skip further steps } else if err != ErrPeerExists { log.Debugf("Network got error adding peer %v", err) continue @@ -905,6 +949,75 @@ func (n *network) processNetChan(listener tunnel.Listener) { default: // don't block here } + case "sync": + // record the timestamp of the message receipt + now := time.Now() + + pbNetSync := &pbNet.Sync{} + if err := proto.Unmarshal(m.msg.Body, pbNetSync); err != nil { + log.Debugf("Network tunnel [%s] sync unmarshal error: %v", NetworkChannel, err) + continue + } + + // don't process your own messages + if pbNetSync.Peer.Node.Id == n.options.Id { + continue + } + + log.Debugf("Network received sync message from: %s", pbNetSync.Peer.Node.Id) + + peer := &node{ + id: pbNetSync.Peer.Node.Id, + address: pbNetSync.Peer.Node.Address, + link: m.msg.Header["Micro-Link"], + peers: make(map[string]*node), + lastSeen: now, + } + + // update peer links + + // TODO: should we do this only if we manage to add a peer + // What should we do if the peer links failed to be updated? + if err := n.updatePeerLinks(peer); err != nil { + log.Debugf("Network failed updating peer links: %s", err) + } + + // add peer to the list of node peers + if err := n.node.AddPeer(peer); err == ErrPeerExists { + log.Tracef("Network peer exists, refreshing: %s", peer.id) + // update lastSeen time for the existing node + if err := n.RefreshPeer(peer.id, peer.link, now); err != nil { + log.Debugf("Network failed refreshing peer %s: %v", peer.id, err) + } + } + + // when we receive a sync message we update our routing table + // and send a peer message back to the network to announce our presence + // we consequently flush our table to the network too to make the convergence faster + + go func() { + // add all the routes we have received in the sync message + for _, pbRoute := range pbNetSync.Routes { + route := pbUtil.ProtoToRoute(pbRoute) + if err := n.router.Table().Create(route); err != nil && err != router.ErrDuplicateRoute { + log.Debugf("Network node %s failed to add route: %v", n.id, err) + } + } + + // update your sync timestamp + // NOTE: this might go away as we will be doing full table advert to random peer + if err := n.RefreshSync(now); err != nil { + log.Debugf("Network failed refreshing sync time: %v", err) + } + + // get node peer graph to send back to the syncing node + msg := PeersToProto(n.node, MaxDepth) + + // advertise yourself to the new node + if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { + log.Debugf("Network failed to advertise peers: %v", err) + } + }() case "close": pbNetClose := &pbNet.Close{} if err := proto.Unmarshal(m.msg.Body, pbNetClose); err != nil { @@ -932,6 +1045,9 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) } + // NOTE: we should maybe advertise this to the network so we converge faster on closed nodes + // as opposed to our waiting until the node eventually gets pruned; something to think about + // delete peer from the peerLinks n.Lock() delete(n.peerLinks, pbNetClose.Node.Address) diff --git a/network/service/proto/network.pb.go b/network/service/proto/network.pb.go index ae036d62..aa98ff3e 100644 --- a/network/service/proto/network.pb.go +++ b/network/service/proto/network.pb.go @@ -675,12 +675,10 @@ func (m *Peer) GetPeers() []*Peer { // Sync is network sync message type Sync struct { - // node address - Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` - // node peers - Peers []*Peer `protobuf:"bytes,2,rep,name=peers,proto3" json:"peers,omitempty"` + // peer origin + Peer *Peer `protobuf:"bytes,1,opt,name=peer,proto3" json:"peer,omitempty"` // node routes - Routes []*proto1.Route `protobuf:"bytes,3,rep,name=routes,proto3" json:"routes,omitempty"` + Routes []*proto1.Route `protobuf:"bytes,2,rep,name=routes,proto3" json:"routes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -711,16 +709,9 @@ func (m *Sync) XXX_DiscardUnknown() { var xxx_messageInfo_Sync proto.InternalMessageInfo -func (m *Sync) GetAddress() string { +func (m *Sync) GetPeer() *Peer { if m != nil { - return m.Address - } - return "" -} - -func (m *Sync) GetPeers() []*Peer { - if m != nil { - return m.Peers + return m.Peer } return nil } @@ -755,43 +746,43 @@ func init() { func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } var fileDescriptor_8571034d60397816 = []byte{ - // 594 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xc1, 0x6a, 0xdb, 0x40, - 0x10, 0x8d, 0x2c, 0x29, 0x76, 0xa6, 0x91, 0xeb, 0x2e, 0x25, 0x15, 0x3a, 0xb4, 0xee, 0xe2, 0x43, - 0x28, 0x8d, 0x0c, 0x09, 0x85, 0x52, 0xd3, 0x10, 0x08, 0xa5, 0x50, 0x48, 0x48, 0xe5, 0x1f, 0xa8, - 0x62, 0x2d, 0xb6, 0x49, 0xac, 0x75, 0x56, 0xeb, 0x04, 0x5f, 0x7a, 0xed, 0x27, 0xf4, 0x9b, 0xfa, - 0x57, 0x65, 0x77, 0x47, 0x8e, 0x14, 0xcb, 0x22, 0xbe, 0x79, 0x34, 0x6f, 0xde, 0xec, 0xce, 0x7b, - 0xb3, 0x06, 0x2f, 0x65, 0xf2, 0x81, 0x8b, 0x9b, 0x70, 0x2e, 0xb8, 0xe4, 0xa4, 0x33, 0xe6, 0xe1, - 0x6c, 0x3a, 0x12, 0x3c, 0xc4, 0xef, 0xc1, 0x60, 0x3c, 0x95, 0x93, 0xc5, 0x75, 0x38, 0xe2, 0xb3, - 0xbe, 0xce, 0xf4, 0xc7, 0xfc, 0xc8, 0xfc, 0x10, 0x7c, 0x21, 0x99, 0xe8, 0x67, 0x4c, 0xdc, 0x4f, - 0x47, 0xac, 0xaf, 0x19, 0xf0, 0xa3, 0xa1, 0xa3, 0x7f, 0x2c, 0x70, 0x7f, 0x2e, 0x98, 0x58, 0x12, - 0x1f, 0x9a, 0x88, 0xf3, 0xad, 0xae, 0x75, 0xb8, 0x17, 0xe5, 0xa1, 0xca, 0xc4, 0x49, 0x22, 0x58, - 0x96, 0xf9, 0x0d, 0x93, 0xc1, 0x50, 0x65, 0xc6, 0xb1, 0x64, 0x0f, 0xf1, 0xd2, 0xb7, 0x4d, 0x06, - 0x43, 0x72, 0x00, 0xbb, 0xa6, 0x8f, 0xef, 0xe8, 0x04, 0x46, 0xaa, 0x02, 0xcf, 0xed, 0xbb, 0xa6, - 0x02, 0x43, 0x7a, 0x0a, 0xed, 0x73, 0x9e, 0xa6, 0x6c, 0x24, 0x23, 0x76, 0xb7, 0x60, 0x99, 0x24, - 0x1f, 0xc1, 0x4d, 0x79, 0xc2, 0x32, 0xdf, 0xea, 0xda, 0x87, 0x2f, 0x8e, 0x0f, 0xc2, 0xa7, 0x57, - 0x0f, 0x2f, 0x79, 0xc2, 0x22, 0x03, 0xa2, 0xaf, 0xe0, 0xe5, 0xaa, 0x3e, 0x9b, 0xf3, 0x34, 0x63, - 0xb4, 0x07, 0xfb, 0x0a, 0x91, 0xe5, 0x84, 0xaf, 0xc1, 0x4d, 0xd8, 0x5c, 0x4e, 0xf4, 0x05, 0xbd, - 0xc8, 0x04, 0xf4, 0x2b, 0x78, 0x88, 0x32, 0x65, 0x5b, 0xf6, 0xed, 0xc1, 0xfe, 0x77, 0x11, 0xcf, - 0x27, 0xf5, 0x4d, 0x06, 0xe0, 0x21, 0x0a, 0x9b, 0x7c, 0x00, 0x47, 0x70, 0x2e, 0x35, 0xaa, 0xb2, - 0xc7, 0x15, 0x63, 0x22, 0xd2, 0x18, 0x7a, 0x0a, 0x5e, 0xa4, 0xc6, 0xb7, 0xba, 0xc8, 0x11, 0xb8, - 0x77, 0x4a, 0x34, 0xac, 0x7e, 0xb3, 0x5e, 0xad, 0x35, 0x8d, 0x0c, 0x8a, 0x9e, 0x41, 0x3b, 0xaf, - 0xc7, 0xee, 0x21, 0xca, 0x53, 0x71, 0x47, 0xb4, 0x87, 0x2e, 0x40, 0xd9, 0xf4, 0x70, 0x87, 0xc6, - 0x0d, 0xf9, 0x19, 0x68, 0x08, 0x9d, 0xc7, 0x4f, 0x48, 0x1b, 0x40, 0x0b, 0x4d, 0x63, 0x88, 0xf7, - 0xa2, 0x55, 0x4c, 0xff, 0x59, 0xe0, 0xa8, 0xb9, 0x91, 0x36, 0x34, 0xa6, 0x09, 0x7a, 0xac, 0x31, - 0x4d, 0xea, 0xed, 0x95, 0x9b, 0xc5, 0x2e, 0x99, 0x85, 0x9c, 0x41, 0x6b, 0xc6, 0x64, 0x9c, 0xc4, - 0x32, 0xf6, 0x1d, 0x7d, 0x83, 0x5e, 0xb5, 0x4a, 0xe1, 0x05, 0xc2, 0xbe, 0xa5, 0x52, 0x2c, 0xa3, - 0x55, 0x55, 0x30, 0x00, 0xaf, 0x94, 0x22, 0x1d, 0xb0, 0x6f, 0xd8, 0x12, 0xcf, 0xa5, 0x7e, 0x2a, - 0x25, 0xef, 0xe3, 0xdb, 0x05, 0xc3, 0x63, 0x99, 0xe0, 0x4b, 0xe3, 0xb3, 0x45, 0x3f, 0x41, 0x13, - 0xbd, 0xa6, 0x74, 0x54, 0x3e, 0xd8, 0xac, 0xa3, 0xf6, 0x8a, 0xc6, 0xd0, 0x13, 0x70, 0xcf, 0x6f, - 0xb9, 0x11, 0xff, 0xd9, 0x45, 0xbf, 0xc0, 0x51, 0x56, 0xd8, 0xa6, 0x46, 0x39, 0x78, 0xce, 0x98, - 0x50, 0x03, 0xb5, 0x6b, 0xdc, 0x65, 0x40, 0xf4, 0x37, 0x38, 0xc3, 0x65, 0x3a, 0x2a, 0x0a, 0x61, - 0x95, 0x85, 0xd8, 0x8a, 0xaf, 0x60, 0x2e, 0xfb, 0x39, 0xe6, 0x3a, 0xfe, 0x6b, 0x43, 0xf3, 0x12, - 0x85, 0xbd, 0x7a, 0x9c, 0x6c, 0x77, 0xbd, 0x4b, 0xf9, 0x81, 0x08, 0xde, 0xd7, 0x20, 0xf0, 0x09, - 0xd8, 0x21, 0x3f, 0xc0, 0xd5, 0x9b, 0x47, 0xde, 0xae, 0xa3, 0x8b, 0x8b, 0x1b, 0xbc, 0xdb, 0x98, - 0x2f, 0x72, 0xe9, 0xa7, 0xa2, 0x8a, 0xab, 0xf8, 0xd2, 0x54, 0x71, 0x95, 0xde, 0x18, 0xba, 0x43, - 0x2e, 0x60, 0xd7, 0x2c, 0x25, 0xa9, 0x00, 0x97, 0xd6, 0x3d, 0xe8, 0x6e, 0x06, 0xac, 0xe8, 0x86, - 0xd0, 0xca, 0xd7, 0x91, 0x54, 0xcc, 0xe5, 0xc9, 0xf6, 0x06, 0xb4, 0x0e, 0x92, 0x93, 0x5e, 0xef, - 0xea, 0x3f, 0x89, 0x93, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff, 0x78, 0x0e, 0x14, 0x60, 0x84, 0x06, - 0x00, 0x00, + // 593 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0x5d, 0x6a, 0xdb, 0x40, + 0x10, 0x8e, 0x2c, 0x29, 0x3f, 0xd3, 0xc8, 0x75, 0x97, 0x92, 0x0a, 0x3d, 0xb4, 0xee, 0xe2, 0x87, + 0x50, 0x1a, 0x19, 0x12, 0x0a, 0xa5, 0xa6, 0x21, 0x10, 0x4a, 0xa1, 0x90, 0x90, 0xca, 0x17, 0xa8, + 0x6c, 0x0d, 0xb6, 0x49, 0xac, 0x75, 0x56, 0xeb, 0x04, 0x9f, 0xa0, 0x47, 0xe8, 0x99, 0x7a, 0xab, + 0xb2, 0xbb, 0x23, 0xc7, 0x8e, 0x65, 0x37, 0x79, 0xd3, 0xec, 0x7c, 0xdf, 0xcc, 0xce, 0xcc, 0x37, + 0x2b, 0x08, 0x72, 0x54, 0xf7, 0x42, 0x5e, 0xc7, 0x13, 0x29, 0x94, 0x60, 0x8d, 0x81, 0x88, 0xc7, + 0xa3, 0xbe, 0x14, 0x31, 0x9d, 0x47, 0x9d, 0xc1, 0x48, 0x0d, 0xa7, 0xbd, 0xb8, 0x2f, 0xc6, 0x6d, + 0xe3, 0x69, 0x0f, 0xc4, 0x91, 0xfd, 0x90, 0x62, 0xaa, 0x50, 0xb6, 0x0b, 0x94, 0x77, 0xa3, 0x3e, + 0xb6, 0x4d, 0x04, 0x3a, 0xb4, 0xe1, 0xf8, 0x6f, 0x07, 0xfc, 0x9f, 0x53, 0x94, 0x33, 0x16, 0xc2, + 0x0e, 0xe1, 0x42, 0xa7, 0xe9, 0x1c, 0xee, 0x25, 0xa5, 0xa9, 0x3d, 0x69, 0x96, 0x49, 0x2c, 0x8a, + 0xb0, 0x66, 0x3d, 0x64, 0x6a, 0xcf, 0x20, 0x55, 0x78, 0x9f, 0xce, 0x42, 0xd7, 0x7a, 0xc8, 0x64, + 0x07, 0xb0, 0x6d, 0xf3, 0x84, 0x9e, 0x71, 0x90, 0xa5, 0x19, 0x74, 0xef, 0xd0, 0xb7, 0x0c, 0x32, + 0xf9, 0x29, 0xd4, 0xcf, 0x45, 0x9e, 0x63, 0x5f, 0x25, 0x78, 0x3b, 0xc5, 0x42, 0xb1, 0x8f, 0xe0, + 0xe7, 0x22, 0xc3, 0x22, 0x74, 0x9a, 0xee, 0xe1, 0x8b, 0xe3, 0x83, 0xf8, 0x71, 0xe9, 0xf1, 0xa5, + 0xc8, 0x30, 0xb1, 0x20, 0xfe, 0x0a, 0x5e, 0xce, 0xf9, 0xc5, 0x44, 0xe4, 0x05, 0xf2, 0x16, 0xec, + 0x6b, 0x44, 0x51, 0x06, 0x7c, 0x0d, 0x7e, 0x86, 0x13, 0x35, 0x34, 0x05, 0x06, 0x89, 0x35, 0xf8, + 0x57, 0x08, 0x08, 0x65, 0x69, 0xcf, 0xcc, 0xdb, 0x82, 0xfd, 0xef, 0x32, 0x9d, 0x0c, 0x37, 0x27, + 0xe9, 0x40, 0x40, 0x28, 0x4a, 0xf2, 0x01, 0x3c, 0x29, 0x84, 0x32, 0xa8, 0xca, 0x1c, 0x57, 0x88, + 0x32, 0x31, 0x18, 0x7e, 0x0a, 0x41, 0xa2, 0xdb, 0x37, 0x2f, 0xe4, 0x08, 0xfc, 0x5b, 0x3d, 0x34, + 0x62, 0xbf, 0x59, 0x65, 0x9b, 0x99, 0x26, 0x16, 0xc5, 0xcf, 0xa0, 0x5e, 0xf2, 0x29, 0x7b, 0x4c, + 0xe3, 0xa9, 0xa8, 0x91, 0xe4, 0x61, 0x08, 0x34, 0x36, 0xd3, 0xdc, 0xae, 0x55, 0x43, 0x79, 0x07, + 0x1e, 0x43, 0xe3, 0xe1, 0x88, 0xc2, 0x46, 0xb0, 0x4b, 0xa2, 0xb1, 0x81, 0xf7, 0x92, 0xb9, 0xcd, + 0xff, 0x3a, 0xe0, 0xe9, 0xbe, 0xb1, 0x3a, 0xd4, 0x46, 0x19, 0x69, 0xac, 0x36, 0xca, 0x36, 0xcb, + 0xab, 0x14, 0x8b, 0xbb, 0x24, 0x16, 0x76, 0x06, 0xbb, 0x63, 0x54, 0x69, 0x96, 0xaa, 0x34, 0xf4, + 0x4c, 0x05, 0xad, 0xea, 0x29, 0xc5, 0x17, 0x04, 0xfb, 0x96, 0x2b, 0x39, 0x4b, 0xe6, 0xac, 0xa8, + 0x03, 0xc1, 0x92, 0x8b, 0x35, 0xc0, 0xbd, 0xc6, 0x19, 0xdd, 0x4b, 0x7f, 0xea, 0x49, 0xde, 0xa5, + 0x37, 0x53, 0xa4, 0x6b, 0x59, 0xe3, 0x4b, 0xed, 0xb3, 0xc3, 0x3f, 0xc1, 0x0e, 0x69, 0x4d, 0xcf, + 0x51, 0xeb, 0x60, 0xfd, 0x1c, 0x8d, 0x56, 0x0c, 0x86, 0x9f, 0x80, 0x7f, 0x7e, 0x23, 0xec, 0xf0, + 0x9f, 0x4c, 0xfa, 0x05, 0x9e, 0x96, 0xc2, 0x73, 0x38, 0x5a, 0xc1, 0x13, 0x44, 0xa9, 0x1b, 0xea, + 0x6e, 0x50, 0x97, 0x05, 0xf1, 0x1e, 0x78, 0xdd, 0x59, 0xde, 0xd7, 0x19, 0xf4, 0xc1, 0xff, 0x24, + 0xa9, 0x31, 0x0b, 0x02, 0xaa, 0x3d, 0x45, 0x40, 0xc7, 0x7f, 0x5c, 0xd8, 0xb9, 0xa4, 0xe1, 0x5d, + 0x3d, 0x74, 0xaf, 0xb9, 0x9a, 0x64, 0xf9, 0x11, 0x88, 0xde, 0x6f, 0x40, 0xd0, 0x9a, 0x6f, 0xb1, + 0x1f, 0xe0, 0x9b, 0xed, 0x62, 0x6f, 0x57, 0xd1, 0x8b, 0xcb, 0x19, 0xbd, 0x5b, 0xeb, 0x5f, 0x8c, + 0x65, 0x9e, 0x83, 0xaa, 0x58, 0x8b, 0xaf, 0x49, 0x55, 0xac, 0xa5, 0x77, 0x84, 0x6f, 0xb1, 0x0b, + 0xd8, 0xb6, 0x8b, 0xc7, 0x2a, 0xc0, 0x4b, 0x2b, 0x1d, 0x35, 0xd7, 0x03, 0xe6, 0xe1, 0xba, 0xb0, + 0x5b, 0xae, 0x1c, 0xab, 0xe8, 0xcb, 0xa3, 0x0d, 0x8d, 0xf8, 0x26, 0x48, 0x19, 0xb4, 0xb7, 0x6d, + 0x7e, 0x04, 0x27, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x66, 0x66, 0x42, 0x5f, 0x68, 0x06, 0x00, + 0x00, } diff --git a/network/service/proto/network.proto b/network/service/proto/network.proto index b498e1bb..ee02cf8f 100644 --- a/network/service/proto/network.proto +++ b/network/service/proto/network.proto @@ -103,10 +103,8 @@ message Peer { // Sync is network sync message message Sync { - // node address - string address = 1; - // node peers - repeated Peer peers = 2; + // peer origin + Peer peer = 1; // node routes - repeated go.micro.router.Route routes = 3; + repeated go.micro.router.Route routes = 2; } diff --git a/util/proto/proto.go b/util/proto/proto.go new file mode 100644 index 00000000..1e41c61c --- /dev/null +++ b/util/proto/proto.go @@ -0,0 +1,33 @@ +// Package proto contains utility functions for working with protobufs +package proto + +import ( + "github.com/micro/go-micro/router" + pbRtr "github.com/micro/go-micro/router/service/proto" +) + +// RouteToProto encodes route into protobuf and returns it +func RouteToProto(route router.Route) *pbRtr.Route { + return &pbRtr.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Router: route.Router, + Link: route.Link, + Metric: int64(route.Metric), + } +} + +// ProtoToRoute decodes protobuf route into router route and returns it +func ProtoToRoute(route *pbRtr.Route) router.Route { + return router.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Router: route.Router, + Link: route.Link, + Metric: route.Metric, + } +} From 1e009e52dd228501f91738ecbe465dc116526083 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 10 Jan 2020 12:27:49 +0000 Subject: [PATCH 05/15] Avoid having the same log statements in initNodes and resolveNodes --- network/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/default.go b/network/default.go index ce5f7703..e0697f18 100644 --- a/network/default.go +++ b/network/default.go @@ -375,7 +375,7 @@ func (n *network) initNodes(startup bool) { // NOTE: this condition never fires // as resolveNodes() never returns error if err != nil && !startup { - log.Debugf("Network failed to resolve nodes: %v", err) + log.Debugf("Network failed to init nodes: %v", err) return } From 11904e11372853cd8dbebc6d5adc9689c99a5cb1 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 10 Jan 2020 19:02:42 +0000 Subject: [PATCH 06/15] Regular sync with network every 5 minutes. Apply routes before peering. --- network/default.go | 81 ++++++++++++++++++++++++++++++++++------------ network/network.go | 2 ++ 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/network/default.go b/network/default.go index e0697f18..15530c47 100644 --- a/network/default.go +++ b/network/default.go @@ -354,12 +354,13 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { // get a list of node peers peers := n.Peers() // pick a random peer from the list of peers - peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()) - if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise routes to %s: %v, sending multicast", err, peer.Id()) - // send a multicast message if we fail to send Unicast message - if err := n.sendMsg("advert", ControlChannel, msg); err != nil { - log.Debugf("Network failed to advertise routes: %v", err) + if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { + if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { + log.Debugf("Network failed to advertise routes to %s: %v, sending multicast", peer.Id(), err) + // send a multicast message if we fail to send Unicast message + if err := n.sendMsg("advert", ControlChannel, msg); err != nil { + log.Debugf("Network failed to advertise routes: %v", err) + } } } } @@ -993,23 +994,22 @@ func (n *network) processNetChan(listener tunnel.Listener) { // when we receive a sync message we update our routing table // and send a peer message back to the network to announce our presence - // we consequently flush our table to the network too to make the convergence faster + + // add all the routes we have received in the sync message + for _, pbRoute := range pbNetSync.Routes { + route := pbUtil.ProtoToRoute(pbRoute) + if err := n.router.Table().Create(route); err != nil && err != router.ErrDuplicateRoute { + log.Debugf("Network node %s failed to add route: %v", n.id, err) + } + } + + // update your sync timestamp + // NOTE: this might go away as we will be doing full table advert to random peer + if err := n.RefreshSync(now); err != nil { + log.Debugf("Network failed refreshing sync time: %v", err) + } go func() { - // add all the routes we have received in the sync message - for _, pbRoute := range pbNetSync.Routes { - route := pbUtil.ProtoToRoute(pbRoute) - if err := n.router.Table().Create(route); err != nil && err != router.ErrDuplicateRoute { - log.Debugf("Network node %s failed to add route: %v", n.id, err) - } - } - - // update your sync timestamp - // NOTE: this might go away as we will be doing full table advert to random peer - if err := n.RefreshSync(now); err != nil { - log.Debugf("Network failed refreshing sync time: %v", err) - } - // get node peer graph to send back to the syncing node msg := PeersToProto(n.node, MaxDepth) @@ -1100,12 +1100,15 @@ func (n *network) prunePeerRoutes(peer *node) error { // seen for a period of time. Also removes all the routes either originated by or routable // by the stale nodes. it also resolves nodes periodically and adds them to the tunnel func (n *network) manage() { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) announce := time.NewTicker(AnnounceTime) defer announce.Stop() prune := time.NewTicker(PruneTime) defer prune.Stop() resolve := time.NewTicker(ResolveTime) defer resolve.Stop() + netsync := time.NewTicker(SyncTime) + defer netsync.Stop() // list of links we've sent to links := make(map[string]time.Time) @@ -1253,6 +1256,42 @@ func (n *network) manage() { log.Debugf("Network failed deleting routes by %s: %v", route.Router, err) } } + case <-netsync.C: + // get a list of node peers + peers := n.Peers() + // pick a random peer from the list of peers and request full sync + if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { + go func() { + // get node peer graph to send back to the connecting node + node := PeersToProto(n.node, MaxDepth) + + msg := &pbNet.Sync{ + Peer: node, + } + + // get a list of all of our routes + routes, err := n.options.Router.Table().List() + switch err { + case nil: + // encode the routes to protobuf + pbRoutes := make([]*pbRtr.Route, 0, len(routes)) + for _, route := range routes { + pbRoute := pbUtil.RouteToProto(route) + pbRoutes = append(pbRoutes, pbRoute) + } + // pack the routes into the sync message + msg.Routes = pbRoutes + default: + // we can't list the routes + log.Debugf("Network node %s failed listing routes: %v", n.id, err) + } + + // send sync message to the newly connected peer + if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { + log.Debugf("Network failed to send sync message: %v", err) + } + }() + } case <-resolve.C: n.initNodes(false) } diff --git a/network/network.go b/network/network.go index 571deee1..5d959e1d 100644 --- a/network/network.go +++ b/network/network.go @@ -19,6 +19,8 @@ var ( AnnounceTime = 1 * time.Second // KeepAliveTime is the time in which we want to have sent a message to a peer KeepAliveTime = 30 * time.Second + // SyncTime is the time a network node requests full sync from the network + SyncTime = 5 * time.Minute // PruneTime defines time interval to periodically check nodes that need to be pruned // due to their not announcing their presence within this time interval PruneTime = 90 * time.Second From 770c7686babec754a82a2ee1bab0f067f0b3e8de Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 13 Jan 2020 20:02:56 +0000 Subject: [PATCH 07/15] Fix nasty bug when graph action may not have been executed in some branches --- network/node.go | 50 ++++++++++++++++++++++++++++++++++++++++++-- network/node_test.go | 19 +++++++++++++++-- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/network/node.go b/network/node.go index ec9b9962..159f15f9 100644 --- a/network/node.go +++ b/network/node.go @@ -38,6 +38,8 @@ type node struct { lastSeen time.Time // lastSync keeps track of node last sync request lastSync time.Time + // errCount tracks error count when communicating with peer + errCount int } // Id is node ide @@ -71,15 +73,17 @@ func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node) for queue.Len() > 0 { // pop the node from the front of the queue qnode := queue.Front() + //fmt.Printf("qnodeValue: %v\n", qnode.Value.(*node)) if until(qnode.Value.(*node)) { return visited } // iterate through all of the node peers // mark the visited nodes; enqueue the non-visted for id, peer := range qnode.Value.(*node).peers { + action(qnode.Value.(*node), peer) if _, ok := visited[id]; !ok { visited[id] = peer - action(qnode.Value.(*node), peer) + //action(qnode.Value.(*node), peer) queue.PushBack(peer) } } @@ -229,7 +233,25 @@ func (n *node) DeletePeerNode(id string) error { return nil } -// PruneStalePeerNodes prune the peers that have not been seen for longer than given time +// PrunePeer prunes the peers with the given id +func (n *node) PrunePeer(id string) { + n.Lock() + defer n.Unlock() + + untilNoMorePeers := func(node *node) bool { + return node == nil + } + + prunePeer := func(parent, node *node) { + if node.id != n.id && node.id == id { + delete(parent.peers, node.id) + } + } + + n.walk(untilNoMorePeers, prunePeer) +} + +// PruneStalePeerNodes prunes the peers that have not been seen for longer than pruneTime // It returns a map of the the nodes that got pruned func (n *node) PruneStalePeers(pruneTime time.Duration) map[string]*node { n.Lock() @@ -252,6 +274,30 @@ func (n *node) PruneStalePeers(pruneTime time.Duration) map[string]*node { return pruned } +// IncErrCount increments node error count +func (n *node) IncErrCount() { + n.Lock() + defer n.Unlock() + + n.errCount++ +} + +// ResetErrCount reset node error count +func (n *node) ResetErrCount() { + n.Lock() + defer n.Unlock() + + n.errCount = 0 +} + +// ErrCount returns node error count +func (n *node) ErrCount() int { + n.RLock() + defer n.RUnlock() + + return n.errCount +} + // getTopology traverses node graph and builds node topology // NOTE: this function is not thread safe func (n *node) getTopology(depth uint) *node { diff --git a/network/node_test.go b/network/node_test.go index 51c4988f..7f2efe3b 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -215,7 +215,22 @@ func TestDeletePeerNode(t *testing.T) { } } -func TestPruneStalePeerNodes(t *testing.T) { +func TestPrunePeer(t *testing.T) { + // complicated node graph + node := testSetup() + + before := node.Nodes() + + node.PrunePeer("peer3") + + now := node.Nodes() + + if len(now) != len(before)-1 { + t.Errorf("Expected pruned node count: %d, got: %d", len(before)-1, len(now)) + } +} + +func TestPruneStalePeers(t *testing.T) { // complicated node graph node := testSetup() @@ -224,7 +239,7 @@ func TestPruneStalePeerNodes(t *testing.T) { pruneTime := 10 * time.Millisecond time.Sleep(pruneTime) - // should delete all nodes besides node + // should delete all nodes besides (root) node pruned := node.PruneStalePeers(pruneTime) if len(pruned) != len(nodes)-1 { From efcac3d009430964047d916bddfa7fff9a1fa904 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 13 Jan 2020 20:06:02 +0000 Subject: [PATCH 08/15] Define tunnel errors --- tunnel/default.go | 8 ++++---- tunnel/tunnel.go | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index cc8e78be..798bc886 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -384,7 +384,7 @@ func (t *tun) process() { // if the link is not connected skip it if !connected { log.Debugf("Link for node %s not connected", id) - err = errors.New("link not connected") + err = ErrLinkDisconnected continue } @@ -392,14 +392,14 @@ func (t *tun) process() { // and the message is being sent outbound via // a dialled connection don't use this link if loopback && msg.outbound { - err = errors.New("link is loopback") + err = ErrLinkLoopback continue } // if the message was being returned by the loopback listener // send it back up the loopback link only if msg.loopback && !loopback { - err = errors.New("link is not loopback") + err = ErrLinkRemote continue } @@ -414,7 +414,7 @@ func (t *tun) process() { // this is where we explicitly set the link // in a message received via the listen method if len(msg.link) > 0 && id != msg.link { - err = errors.New("link not found") + err = ErrLinkNotFound continue } } diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index b59eb31d..516ce6c0 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -26,6 +26,12 @@ var ( ErrDiscoverChan = errors.New("failed to discover channel") // ErrLinkNotFound is returned when a link is specified at dial time and does not exist ErrLinkNotFound = errors.New("link not found") + // ErrLinkDisconnected is returned when a link we attempt to send to is disconnected + ErrLinkDisconnected = errors.New("link not connected") + // ErrLinkLoppback is returned when attempting to send an outbound message over loopback link + ErrLinkLoopback = errors.New("link is loopback") + // ErrLinkRemote is returned when attempting to send a loopback message over remote link + ErrLinkRemote = errors.New("link is remote") // ErrReadTimeout is a timeout on session.Recv ErrReadTimeout = errors.New("read timeout") // ErrDecryptingData is for when theres a nonce error From b4261e8cf9f8815375e1d549d5743725041d3d1e Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 13 Jan 2020 20:07:10 +0000 Subject: [PATCH 09/15] Updated log and comments --- network/default.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/network/default.go b/network/default.go index 15530c47..831118a5 100644 --- a/network/default.go +++ b/network/default.go @@ -337,7 +337,7 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { for i := 0; i < max; i++ { if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise routes: %v", err) + log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err) } } } @@ -359,7 +359,7 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { log.Debugf("Network failed to advertise routes to %s: %v, sending multicast", peer.Id(), err) // send a multicast message if we fail to send Unicast message if err := n.sendMsg("advert", ControlChannel, msg); err != nil { - log.Debugf("Network failed to advertise routes: %v", err) + log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err) } } } @@ -516,10 +516,11 @@ func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { } // getHopCount queries network graph and returns hop count for given router +// NOTE: this should be called getHopeMetric // - Routes for local services have hop count 1 -// - Routes with ID of adjacent nodes have hop count 2 -// - Routes by peers of the advertiser have hop count 3 -// - Routes beyond node neighbourhood have hop count 4 +// - Routes with ID of adjacent nodes have hop count 10 +// - Routes by peers of the advertiser have hop count 100 +// - Routes beyond node neighbourhood have hop count 1000 func (n *network) getHopCount(rtr string) int { // make sure node.peers are not modified n.node.RLock() @@ -1207,7 +1208,7 @@ func (n *network) manage() { // unknown link and peer so lets do the connect flow if err := n.sendTo("connect", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise peer %s: %v", peer.id, err) + log.Debugf("Network failed to connect %s: %v", peer.id, err) continue } From a91dad04eeabc4bbddb7a5e3178ddb4299cef7b7 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 13 Jan 2020 22:22:12 +0000 Subject: [PATCH 10/15] Increment node error count and prune when Max limit is hit --- network/default.go | 20 +++++++++++++++- network/node.go | 57 +++++++++++++++++++++++++--------------------- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/network/default.go b/network/default.go index 831118a5..e80f5d8d 100644 --- a/network/default.go +++ b/network/default.go @@ -38,6 +38,8 @@ var ( DefaultLink = "network" // MaxConnections is the max number of network client connections MaxConnections = 3 + // MaxPeerErrors is the max number of peer errors before we remove it from network graph + MaxPeerErrors = 3 ) var ( @@ -45,6 +47,8 @@ var ( ErrClientNotFound = errors.New("client not found") // ErrPeerLinkNotFound is returned when peer link could not be found in tunnel Links ErrPeerLinkNotFound = errors.New("peer link not found") + // ErrPeerMaxExceeded is returned when peer has reached its max error count limit + ErrPeerMaxExceeded = errors.New("peer max errors exceeded") ) // network implements Network interface @@ -1327,6 +1331,11 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) // Create a unicast connection to the peer but don't do the open/accept flow c, err := n.tunnel.Dial(channel, tunnel.DialWait(false), tunnel.DialLink(peer.link)) if err != nil { + // increment the peer error count; prune peer if we exceed MaxPeerErrors + peer.err.Increment() + if count := peer.err.GetCount(); count == MaxPeerErrors { + n.PrunePeer(peer.id) + } return err } defer c.Close() @@ -1351,7 +1360,16 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) tmsg.Header["Micro-Peer"] = peer.id } - return c.Send(tmsg) + if err := c.Send(tmsg); err != nil { + // increment the peer error count; prune peer if we exceed MaxPeerErrors + peer.err.Increment() + if count := peer.err.GetCount(); count == MaxPeerErrors { + n.PrunePeer(peer.id) + } + return err + } + + return nil } // sendMsg sends a message to the tunnel channel diff --git a/network/node.go b/network/node.go index 159f15f9..a5d3ad91 100644 --- a/network/node.go +++ b/network/node.go @@ -21,6 +21,35 @@ var ( ErrPeerNotFound = errors.New("peer not found") ) +type nodeError struct { + sync.RWMutex + count int +} + +// Increment increments node error count +func (n *nodeError) Increment() { + n.Lock() + defer n.Unlock() + + n.count++ +} + +// Reset reset node error count +func (n *nodeError) Reset() { + n.Lock() + defer n.Unlock() + + n.count = 0 +} + +// GetCount returns node error count +func (n *nodeError) GetCount() int { + n.RLock() + defer n.RUnlock() + + return n.count +} + // node is network node type node struct { sync.RWMutex @@ -38,8 +67,8 @@ type node struct { lastSeen time.Time // lastSync keeps track of node last sync request lastSync time.Time - // errCount tracks error count when communicating with peer - errCount int + // err tracks node errors + err nodeError } // Id is node ide @@ -274,30 +303,6 @@ func (n *node) PruneStalePeers(pruneTime time.Duration) map[string]*node { return pruned } -// IncErrCount increments node error count -func (n *node) IncErrCount() { - n.Lock() - defer n.Unlock() - - n.errCount++ -} - -// ResetErrCount reset node error count -func (n *node) ResetErrCount() { - n.Lock() - defer n.Unlock() - - n.errCount = 0 -} - -// ErrCount returns node error count -func (n *node) ErrCount() int { - n.RLock() - defer n.RUnlock() - - return n.errCount -} - // getTopology traverses node graph and builds node topology // NOTE: this function is not thread safe func (n *node) getTopology(depth uint) *node { From 994d371ff1bd67a976c99c644f1e321672411c6d Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 14 Jan 2020 10:49:34 +0000 Subject: [PATCH 11/15] Removed redundant comments. Add proper PruneStalePeers test. --- network/node.go | 2 -- network/node_test.go | 21 +++++++++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/network/node.go b/network/node.go index a5d3ad91..caacc7fd 100644 --- a/network/node.go +++ b/network/node.go @@ -102,7 +102,6 @@ func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node) for queue.Len() > 0 { // pop the node from the front of the queue qnode := queue.Front() - //fmt.Printf("qnodeValue: %v\n", qnode.Value.(*node)) if until(qnode.Value.(*node)) { return visited } @@ -112,7 +111,6 @@ func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node) action(qnode.Value.(*node), peer) if _, ok := visited[id]; !ok { visited[id] = peer - //action(qnode.Value.(*node), peer) queue.PushBack(peer) } } diff --git a/network/node_test.go b/network/node_test.go index 7f2efe3b..fc3088df 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -233,9 +233,8 @@ func TestPrunePeer(t *testing.T) { func TestPruneStalePeers(t *testing.T) { // complicated node graph node := testSetup() - nodes := node.Nodes() - + // this will delete all nodes besides the root node pruneTime := 10 * time.Millisecond time.Sleep(pruneTime) @@ -245,6 +244,24 @@ func TestPruneStalePeers(t *testing.T) { if len(pruned) != len(nodes)-1 { t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-1, len(pruned)) } + + // complicated node graph + node = testSetup() + nodes = node.Nodes() + + // set prune time to 100ms and wait for half of it + pruneTime = 100 * time.Millisecond + time.Sleep(pruneTime) + + // update the time of peer1 + node.peers["peer1"].lastSeen = time.Now() + + // should prune all but the root nodes and peer1 + pruned = node.PruneStalePeers(pruneTime) + + if len(pruned) != len(nodes)-2 { + t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-2, len(pruned)) + } } func TestUnpackPeerTopology(t *testing.T) { From 821fda41ae54eb59548fff40c0447b7ec55669ba Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 14 Jan 2020 18:12:36 +0000 Subject: [PATCH 12/15] Added Status method to network.Node fixed random segfaults. --- network/default.go | 58 +++-- network/network.go | 16 ++ network/node.go | 122 ++++++++-- network/node_test.go | 8 + network/service/proto/network.pb.go | 272 ++++++++++++++++++---- network/service/proto/network.pb.micro.go | 19 ++ network/service/proto/network.proto | 21 ++ 7 files changed, 432 insertions(+), 84 deletions(-) diff --git a/network/default.go b/network/default.go index e80f5d8d..39e29b4b 100644 --- a/network/default.go +++ b/network/default.go @@ -165,6 +165,7 @@ func newNetwork(opts ...Option) Network { id: options.Id, address: peerAddress, peers: make(map[string]*node), + status: newStatus(), }, options: options, router: options.Router, @@ -780,6 +781,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { address: pbNetConnect.Node.Address, link: m.msg.Header["Micro-Link"], peers: make(map[string]*node), + status: newStatus(), lastSeen: now, } @@ -874,10 +876,16 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address) peer := &node{ - id: pbNetPeer.Node.Id, - address: pbNetPeer.Node.Address, - link: m.msg.Header["Micro-Link"], - peers: make(map[string]*node), + id: pbNetPeer.Node.Id, + address: pbNetPeer.Node.Address, + link: m.msg.Header["Micro-Link"], + peers: make(map[string]*node), + status: &status{ + err: &nerr{ + count: int(pbNetPeer.Node.Status.Error.Count), + msg: errors.New(pbNetPeer.Node.Status.Error.Msg), + }, + }, lastSeen: now, } @@ -973,10 +981,16 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network received sync message from: %s", pbNetSync.Peer.Node.Id) peer := &node{ - id: pbNetSync.Peer.Node.Id, - address: pbNetSync.Peer.Node.Address, - link: m.msg.Header["Micro-Link"], - peers: make(map[string]*node), + id: pbNetSync.Peer.Node.Id, + address: pbNetSync.Peer.Node.Address, + link: m.msg.Header["Micro-Link"], + peers: make(map[string]*node), + status: &status{ + err: &nerr{ + count: int(pbNetSync.Peer.Node.Status.Error.Count), + msg: errors.New(pbNetSync.Peer.Node.Status.Error.Msg), + }, + }, lastSeen: now, } @@ -1328,13 +1342,19 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) if err != nil { return err } + // Create a unicast connection to the peer but don't do the open/accept flow c, err := n.tunnel.Dial(channel, tunnel.DialWait(false), tunnel.DialLink(peer.link)) if err != nil { - // increment the peer error count; prune peer if we exceed MaxPeerErrors - peer.err.Increment() - if count := peer.err.GetCount(); count == MaxPeerErrors { - n.PrunePeer(peer.id) + if peerNode := n.GetPeerNode(peer.id); peerNode != nil { + log.Debugf("Network found peer %s: %v", peer.id, peerNode) + // update node status when error happens + peerNode.status.err.Update(err) + log.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count()) + if count := peerNode.status.Error().Count(); count == MaxPeerErrors { + log.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count()) + n.PrunePeer(peerNode.id) + } } return err } @@ -1361,10 +1381,16 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) } if err := c.Send(tmsg); err != nil { - // increment the peer error count; prune peer if we exceed MaxPeerErrors - peer.err.Increment() - if count := peer.err.GetCount(); count == MaxPeerErrors { - n.PrunePeer(peer.id) + // TODO: Lookup peer in our graph + if peerNode := n.GetPeerNode(peer.id); peerNode != nil { + log.Debugf("Network found peer %s: %v", peer.id, peerNode) + // update node status when error happens + peerNode.status.err.Update(err) + log.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count()) + if count := peerNode.status.Error().Count(); count == MaxPeerErrors { + log.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count()) + n.PrunePeer(peerNode.id) + } } return err } diff --git a/network/network.go b/network/network.go index 5d959e1d..e06cd2c6 100644 --- a/network/network.go +++ b/network/network.go @@ -26,6 +26,20 @@ var ( PruneTime = 90 * time.Second ) +// Error is network node errors +type Error interface { + // Count is current count of errors + Count() int + // Msg is last error message + Msg() string +} + +// Status is node status +type Status interface { + // Error reports error status + Error() Error +} + // Node is network node type Node interface { // Id is node id @@ -36,6 +50,8 @@ type Node interface { Peers() []Node // Network is the network node is in Network() Network + // Status returns node status + Status() Status } // Network is micro network diff --git a/network/node.go b/network/node.go index caacc7fd..a7f3c51a 100644 --- a/network/node.go +++ b/network/node.go @@ -21,33 +21,62 @@ var ( ErrPeerNotFound = errors.New("peer not found") ) -type nodeError struct { +// nerr tracks node errors +type nerr struct { sync.RWMutex count int + msg error } // Increment increments node error count -func (n *nodeError) Increment() { - n.Lock() - defer n.Unlock() +func (e *nerr) Update(err error) { + e.Lock() + defer e.Unlock() - n.count++ + e.count++ + e.msg = err } -// Reset reset node error count -func (n *nodeError) Reset() { - n.Lock() - defer n.Unlock() +// Count returns node error count +func (e *nerr) Count() int { + e.RLock() + defer e.RUnlock() - n.count = 0 + return e.count } -// GetCount returns node error count -func (n *nodeError) GetCount() int { - n.RLock() - defer n.RUnlock() +func (e *nerr) Msg() string { + e.RLock() + defer e.RUnlock() - return n.count + if e.msg != nil { + return e.msg.Error() + } + + return "" +} + +// status returns node status +type status struct { + sync.RWMutex + err *nerr +} + +// newStatus creates +func newStatus() *status { + return &status{ + err: new(nerr), + } +} + +func (s *status) Error() Error { + s.RLock() + defer s.RUnlock() + + return &nerr{ + count: s.err.count, + msg: s.err.msg, + } } // node is network node @@ -67,8 +96,8 @@ type node struct { lastSeen time.Time // lastSync keeps track of node last sync request lastSync time.Time - // err tracks node errors - err nodeError + // err tracks node status + status *status } // Id is node ide @@ -86,6 +115,19 @@ func (n *node) Network() Network { return n.network } +// Status returns node status +func (n *node) Status() Status { + n.RLock() + defer n.RUnlock() + + return &status{ + err: &nerr{ + count: n.status.err.count, + msg: n.status.err.msg, + }, + } +} + // walk walks the node graph until some condition is met func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)) map[string]*node { // track the visited nodes @@ -127,7 +169,28 @@ func (n *node) AddPeer(peer *node) error { n.Lock() defer n.Unlock() + // get node topology: we need to check if the peer + // we are trying to add is already in our graph + top := n.getTopology(MaxDepth) + + untilFoundPeer := func(n *node) bool { + return n.id == peer.id + } + + justWalk := func(paent, node *node) {} + + visited := top.walk(untilFoundPeer, justWalk) + + peerNode, inTop := visited[peer.id] + if _, ok := n.peers[peer.id]; !ok { + if inTop { + // just create a new edge to the existing peer + // but make sure you update the peer link + peerNode.link = peer.link + n.peers[peer.id] = peerNode + return nil + } n.peers[peer.id] = peer return nil } @@ -310,6 +373,7 @@ func (n *node) getTopology(depth uint) *node { address: n.address, peers: make(map[string]*node), network: n.network, + status: n.status, lastSeen: n.lastSeen, } @@ -358,9 +422,15 @@ func (n *node) Peers() []Node { // UnpackPeerTopology unpacks pb.Peer into node topology of given depth func UnpackPeerTopology(pbPeer *pb.Peer, lastSeen time.Time, depth uint) *node { peerNode := &node{ - id: pbPeer.Node.Id, - address: pbPeer.Node.Address, - peers: make(map[string]*node), + id: pbPeer.Node.Id, + address: pbPeer.Node.Address, + peers: make(map[string]*node), + status: &status{ + err: &nerr{ + count: int(pbPeer.Node.Status.Error.Count), + msg: errors.New(pbPeer.Node.Status.Error.Msg), + }, + }, lastSeen: lastSeen, } @@ -387,6 +457,12 @@ func peerProtoTopology(peer Node, depth uint) *pb.Peer { node := &pb.Node{ Id: peer.Id(), Address: peer.Address(), + Status: &pb.Status{ + Error: &pb.Error{ + Count: uint32(peer.Status().Error().Count()), + Msg: peer.Status().Error().Msg(), + }, + }, } // set the network name if network is not nil @@ -422,6 +498,12 @@ func PeersToProto(node Node, depth uint) *pb.Peer { pbNode := &pb.Node{ Id: node.Id(), Address: node.Address(), + Status: &pb.Status{ + Error: &pb.Error{ + Count: uint32(node.Status().Error().Count()), + Msg: node.Status().Error().Msg(), + }, + }, } // set the network name if network is not nil diff --git a/network/node_test.go b/network/node_test.go index fc3088df..d4849a1c 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -21,6 +21,7 @@ func testSetup() *node { address: testNodeAddress, peers: make(map[string]*node), network: newNetwork(Name(testNodeNetName)), + err: new(nodeError), } // add some peers to the node @@ -30,6 +31,7 @@ func testSetup() *node { address: testNode.address + "-" + id, peers: make(map[string]*node), network: testNode.network, + err: new(nodeError), } } @@ -41,6 +43,7 @@ func testSetup() *node { address: testNode.address + "-" + id, peers: make(map[string]*node), network: testNode.network, + err: new(nodeError), } } @@ -269,6 +272,7 @@ func TestUnpackPeerTopology(t *testing.T) { Node: &pb.Node{ Id: "newPeer", Address: "newPeerAddress", + Err: &pb.NodeError{}, }, Peers: make([]*pb.Peer, 0), } @@ -284,12 +288,14 @@ func TestUnpackPeerTopology(t *testing.T) { pbPeer1Node := &pb.Node{ Id: peer1.id, Address: peer1.address, + Err: &pb.NodeError{}, } pbPeer111 := &pb.Peer{ Node: &pb.Node{ Id: "peer111", Address: "peer111Address", + Err: &pb.NodeError{}, }, Peers: make([]*pb.Peer, 0), } @@ -298,6 +304,7 @@ func TestUnpackPeerTopology(t *testing.T) { Node: &pb.Node{ Id: "peer121", Address: "peer121Address", + Err: &pb.NodeError{}, }, Peers: make([]*pb.Peer, 0), } @@ -324,6 +331,7 @@ func TestPeersToProto(t *testing.T) { address: testNodeAddress, peers: make(map[string]*node), network: newNetwork(Name(testNodeNetName)), + err: &nodeError{}, } topCount := 0 diff --git a/network/service/proto/network.pb.go b/network/service/proto/network.pb.go index aa98ff3e..54902c65 100644 --- a/network/service/proto/network.pb.go +++ b/network/service/proto/network.pb.go @@ -473,6 +473,164 @@ func (m *ServicesResponse) GetServices() []string { return nil } +type StatusRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusRequest) Reset() { *m = StatusRequest{} } +func (m *StatusRequest) String() string { return proto.CompactTextString(m) } +func (*StatusRequest) ProtoMessage() {} +func (*StatusRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_8571034d60397816, []int{11} +} + +func (m *StatusRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatusRequest.Unmarshal(m, b) +} +func (m *StatusRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatusRequest.Marshal(b, m, deterministic) +} +func (m *StatusRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatusRequest.Merge(m, src) +} +func (m *StatusRequest) XXX_Size() int { + return xxx_messageInfo_StatusRequest.Size(m) +} +func (m *StatusRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StatusRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StatusRequest proto.InternalMessageInfo + +type StatusResponse struct { + Status *Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} +func (*StatusResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_8571034d60397816, []int{12} +} + +func (m *StatusResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatusResponse.Unmarshal(m, b) +} +func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic) +} +func (m *StatusResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatusResponse.Merge(m, src) +} +func (m *StatusResponse) XXX_Size() int { + return xxx_messageInfo_StatusResponse.Size(m) +} +func (m *StatusResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StatusResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StatusResponse proto.InternalMessageInfo + +func (m *StatusResponse) GetStatus() *Status { + if m != nil { + return m.Status + } + return nil +} + +// Error tracks network errors +type Error struct { + Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` + Msg string `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Error) Reset() { *m = Error{} } +func (m *Error) String() string { return proto.CompactTextString(m) } +func (*Error) ProtoMessage() {} +func (*Error) Descriptor() ([]byte, []int) { + return fileDescriptor_8571034d60397816, []int{13} +} + +func (m *Error) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Error.Unmarshal(m, b) +} +func (m *Error) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Error.Marshal(b, m, deterministic) +} +func (m *Error) XXX_Merge(src proto.Message) { + xxx_messageInfo_Error.Merge(m, src) +} +func (m *Error) XXX_Size() int { + return xxx_messageInfo_Error.Size(m) +} +func (m *Error) XXX_DiscardUnknown() { + xxx_messageInfo_Error.DiscardUnknown(m) +} + +var xxx_messageInfo_Error proto.InternalMessageInfo + +func (m *Error) GetCount() uint32 { + if m != nil { + return m.Count + } + return 0 +} + +func (m *Error) GetMsg() string { + if m != nil { + return m.Msg + } + return "" +} + +// Status is node status +type Status struct { + Error *Error `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Status) Reset() { *m = Status{} } +func (m *Status) String() string { return proto.CompactTextString(m) } +func (*Status) ProtoMessage() {} +func (*Status) Descriptor() ([]byte, []int) { + return fileDescriptor_8571034d60397816, []int{14} +} + +func (m *Status) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Status.Unmarshal(m, b) +} +func (m *Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Status.Marshal(b, m, deterministic) +} +func (m *Status) XXX_Merge(src proto.Message) { + xxx_messageInfo_Status.Merge(m, src) +} +func (m *Status) XXX_Size() int { + return xxx_messageInfo_Status.Size(m) +} +func (m *Status) XXX_DiscardUnknown() { + xxx_messageInfo_Status.DiscardUnknown(m) +} + +var xxx_messageInfo_Status proto.InternalMessageInfo + +func (m *Status) GetError() *Error { + if m != nil { + return m.Error + } + return nil +} + // Node is network node type Node struct { // node id @@ -482,17 +640,19 @@ type Node struct { // the network Network string `protobuf:"bytes,3,opt,name=network,proto3" json:"network,omitempty"` // associated metadata - Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // node status + Status *Status `protobuf:"bytes,5,opt,name=status,proto3" json:"status,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Node) Reset() { *m = Node{} } func (m *Node) String() string { return proto.CompactTextString(m) } func (*Node) ProtoMessage() {} func (*Node) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{11} + return fileDescriptor_8571034d60397816, []int{15} } func (m *Node) XXX_Unmarshal(b []byte) error { @@ -541,6 +701,13 @@ func (m *Node) GetMetadata() map[string]string { return nil } +func (m *Node) GetStatus() *Status { + if m != nil { + return m.Status + } + return nil +} + // Connect is sent when the node connects to the network type Connect struct { // network mode @@ -554,7 +721,7 @@ func (m *Connect) Reset() { *m = Connect{} } func (m *Connect) String() string { return proto.CompactTextString(m) } func (*Connect) ProtoMessage() {} func (*Connect) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{12} + return fileDescriptor_8571034d60397816, []int{16} } func (m *Connect) XXX_Unmarshal(b []byte) error { @@ -595,7 +762,7 @@ func (m *Close) Reset() { *m = Close{} } func (m *Close) String() string { return proto.CompactTextString(m) } func (*Close) ProtoMessage() {} func (*Close) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{13} + return fileDescriptor_8571034d60397816, []int{17} } func (m *Close) XXX_Unmarshal(b []byte) error { @@ -638,7 +805,7 @@ func (m *Peer) Reset() { *m = Peer{} } func (m *Peer) String() string { return proto.CompactTextString(m) } func (*Peer) ProtoMessage() {} func (*Peer) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{14} + return fileDescriptor_8571034d60397816, []int{18} } func (m *Peer) XXX_Unmarshal(b []byte) error { @@ -688,7 +855,7 @@ func (m *Sync) Reset() { *m = Sync{} } func (m *Sync) String() string { return proto.CompactTextString(m) } func (*Sync) ProtoMessage() {} func (*Sync) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{15} + return fileDescriptor_8571034d60397816, []int{19} } func (m *Sync) XXX_Unmarshal(b []byte) error { @@ -735,6 +902,10 @@ func init() { proto.RegisterType((*RoutesResponse)(nil), "go.micro.network.RoutesResponse") proto.RegisterType((*ServicesRequest)(nil), "go.micro.network.ServicesRequest") proto.RegisterType((*ServicesResponse)(nil), "go.micro.network.ServicesResponse") + proto.RegisterType((*StatusRequest)(nil), "go.micro.network.StatusRequest") + proto.RegisterType((*StatusResponse)(nil), "go.micro.network.StatusResponse") + proto.RegisterType((*Error)(nil), "go.micro.network.Error") + proto.RegisterType((*Status)(nil), "go.micro.network.Status") proto.RegisterType((*Node)(nil), "go.micro.network.Node") proto.RegisterMapType((map[string]string)(nil), "go.micro.network.Node.MetadataEntry") proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") @@ -746,43 +917,48 @@ func init() { func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } var fileDescriptor_8571034d60397816 = []byte{ - // 593 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0x5d, 0x6a, 0xdb, 0x40, - 0x10, 0x8e, 0x2c, 0x29, 0x3f, 0xd3, 0xc8, 0x75, 0x97, 0x92, 0x0a, 0x3d, 0xb4, 0xee, 0xe2, 0x87, - 0x50, 0x1a, 0x19, 0x12, 0x0a, 0xa5, 0xa6, 0x21, 0x10, 0x4a, 0xa1, 0x90, 0x90, 0xca, 0x17, 0xa8, - 0x6c, 0x0d, 0xb6, 0x49, 0xac, 0x75, 0x56, 0xeb, 0x04, 0x9f, 0xa0, 0x47, 0xe8, 0x99, 0x7a, 0xab, - 0xb2, 0xbb, 0x23, 0xc7, 0x8e, 0x65, 0x37, 0x79, 0xd3, 0xec, 0x7c, 0xdf, 0xcc, 0xce, 0xcc, 0x37, - 0x2b, 0x08, 0x72, 0x54, 0xf7, 0x42, 0x5e, 0xc7, 0x13, 0x29, 0x94, 0x60, 0x8d, 0x81, 0x88, 0xc7, - 0xa3, 0xbe, 0x14, 0x31, 0x9d, 0x47, 0x9d, 0xc1, 0x48, 0x0d, 0xa7, 0xbd, 0xb8, 0x2f, 0xc6, 0x6d, - 0xe3, 0x69, 0x0f, 0xc4, 0x91, 0xfd, 0x90, 0x62, 0xaa, 0x50, 0xb6, 0x0b, 0x94, 0x77, 0xa3, 0x3e, - 0xb6, 0x4d, 0x04, 0x3a, 0xb4, 0xe1, 0xf8, 0x6f, 0x07, 0xfc, 0x9f, 0x53, 0x94, 0x33, 0x16, 0xc2, - 0x0e, 0xe1, 0x42, 0xa7, 0xe9, 0x1c, 0xee, 0x25, 0xa5, 0xa9, 0x3d, 0x69, 0x96, 0x49, 0x2c, 0x8a, - 0xb0, 0x66, 0x3d, 0x64, 0x6a, 0xcf, 0x20, 0x55, 0x78, 0x9f, 0xce, 0x42, 0xd7, 0x7a, 0xc8, 0x64, - 0x07, 0xb0, 0x6d, 0xf3, 0x84, 0x9e, 0x71, 0x90, 0xa5, 0x19, 0x74, 0xef, 0xd0, 0xb7, 0x0c, 0x32, - 0xf9, 0x29, 0xd4, 0xcf, 0x45, 0x9e, 0x63, 0x5f, 0x25, 0x78, 0x3b, 0xc5, 0x42, 0xb1, 0x8f, 0xe0, - 0xe7, 0x22, 0xc3, 0x22, 0x74, 0x9a, 0xee, 0xe1, 0x8b, 0xe3, 0x83, 0xf8, 0x71, 0xe9, 0xf1, 0xa5, - 0xc8, 0x30, 0xb1, 0x20, 0xfe, 0x0a, 0x5e, 0xce, 0xf9, 0xc5, 0x44, 0xe4, 0x05, 0xf2, 0x16, 0xec, - 0x6b, 0x44, 0x51, 0x06, 0x7c, 0x0d, 0x7e, 0x86, 0x13, 0x35, 0x34, 0x05, 0x06, 0x89, 0x35, 0xf8, - 0x57, 0x08, 0x08, 0x65, 0x69, 0xcf, 0xcc, 0xdb, 0x82, 0xfd, 0xef, 0x32, 0x9d, 0x0c, 0x37, 0x27, - 0xe9, 0x40, 0x40, 0x28, 0x4a, 0xf2, 0x01, 0x3c, 0x29, 0x84, 0x32, 0xa8, 0xca, 0x1c, 0x57, 0x88, - 0x32, 0x31, 0x18, 0x7e, 0x0a, 0x41, 0xa2, 0xdb, 0x37, 0x2f, 0xe4, 0x08, 0xfc, 0x5b, 0x3d, 0x34, - 0x62, 0xbf, 0x59, 0x65, 0x9b, 0x99, 0x26, 0x16, 0xc5, 0xcf, 0xa0, 0x5e, 0xf2, 0x29, 0x7b, 0x4c, - 0xe3, 0xa9, 0xa8, 0x91, 0xe4, 0x61, 0x08, 0x34, 0x36, 0xd3, 0xdc, 0xae, 0x55, 0x43, 0x79, 0x07, - 0x1e, 0x43, 0xe3, 0xe1, 0x88, 0xc2, 0x46, 0xb0, 0x4b, 0xa2, 0xb1, 0x81, 0xf7, 0x92, 0xb9, 0xcd, - 0xff, 0x3a, 0xe0, 0xe9, 0xbe, 0xb1, 0x3a, 0xd4, 0x46, 0x19, 0x69, 0xac, 0x36, 0xca, 0x36, 0xcb, - 0xab, 0x14, 0x8b, 0xbb, 0x24, 0x16, 0x76, 0x06, 0xbb, 0x63, 0x54, 0x69, 0x96, 0xaa, 0x34, 0xf4, - 0x4c, 0x05, 0xad, 0xea, 0x29, 0xc5, 0x17, 0x04, 0xfb, 0x96, 0x2b, 0x39, 0x4b, 0xe6, 0xac, 0xa8, - 0x03, 0xc1, 0x92, 0x8b, 0x35, 0xc0, 0xbd, 0xc6, 0x19, 0xdd, 0x4b, 0x7f, 0xea, 0x49, 0xde, 0xa5, - 0x37, 0x53, 0xa4, 0x6b, 0x59, 0xe3, 0x4b, 0xed, 0xb3, 0xc3, 0x3f, 0xc1, 0x0e, 0x69, 0x4d, 0xcf, - 0x51, 0xeb, 0x60, 0xfd, 0x1c, 0x8d, 0x56, 0x0c, 0x86, 0x9f, 0x80, 0x7f, 0x7e, 0x23, 0xec, 0xf0, - 0x9f, 0x4c, 0xfa, 0x05, 0x9e, 0x96, 0xc2, 0x73, 0x38, 0x5a, 0xc1, 0x13, 0x44, 0xa9, 0x1b, 0xea, - 0x6e, 0x50, 0x97, 0x05, 0xf1, 0x1e, 0x78, 0xdd, 0x59, 0xde, 0xd7, 0x19, 0xf4, 0xc1, 0xff, 0x24, - 0xa9, 0x31, 0x0b, 0x02, 0xaa, 0x3d, 0x45, 0x40, 0xc7, 0x7f, 0x5c, 0xd8, 0xb9, 0xa4, 0xe1, 0x5d, - 0x3d, 0x74, 0xaf, 0xb9, 0x9a, 0x64, 0xf9, 0x11, 0x88, 0xde, 0x6f, 0x40, 0xd0, 0x9a, 0x6f, 0xb1, - 0x1f, 0xe0, 0x9b, 0xed, 0x62, 0x6f, 0x57, 0xd1, 0x8b, 0xcb, 0x19, 0xbd, 0x5b, 0xeb, 0x5f, 0x8c, - 0x65, 0x9e, 0x83, 0xaa, 0x58, 0x8b, 0xaf, 0x49, 0x55, 0xac, 0xa5, 0x77, 0x84, 0x6f, 0xb1, 0x0b, - 0xd8, 0xb6, 0x8b, 0xc7, 0x2a, 0xc0, 0x4b, 0x2b, 0x1d, 0x35, 0xd7, 0x03, 0xe6, 0xe1, 0xba, 0xb0, - 0x5b, 0xae, 0x1c, 0xab, 0xe8, 0xcb, 0xa3, 0x0d, 0x8d, 0xf8, 0x26, 0x48, 0x19, 0xb4, 0xb7, 0x6d, - 0x7e, 0x04, 0x27, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x66, 0x66, 0x42, 0x5f, 0x68, 0x06, 0x00, - 0x00, + // 678 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xdd, 0x4e, 0xdb, 0x30, + 0x14, 0xa6, 0x6d, 0x52, 0xe0, 0x8c, 0x14, 0x66, 0x4d, 0x2c, 0xca, 0xc5, 0xe8, 0x2c, 0x2e, 0xd0, + 0x34, 0xd2, 0x09, 0x34, 0x6d, 0x1a, 0x1a, 0x42, 0x43, 0x68, 0xd2, 0x24, 0x10, 0x4b, 0x5f, 0x60, + 0x21, 0xb1, 0x4a, 0x05, 0x8d, 0x8b, 0xe3, 0x80, 0xfa, 0x04, 0x7b, 0xd3, 0xbd, 0xc4, 0x6e, 0x26, + 0xdb, 0x27, 0x21, 0xa1, 0x49, 0x57, 0xee, 0x72, 0xec, 0xef, 0x3b, 0xc7, 0xe7, 0xef, 0x0b, 0x38, + 0x09, 0x93, 0x0f, 0x5c, 0xdc, 0xf8, 0x53, 0xc1, 0x25, 0x27, 0x5b, 0x23, 0xee, 0x4f, 0xc6, 0x91, + 0xe0, 0x3e, 0x9e, 0x7b, 0x47, 0xa3, 0xb1, 0xbc, 0xce, 0xae, 0xfc, 0x88, 0x4f, 0x06, 0xfa, 0x66, + 0x30, 0xe2, 0xfb, 0xe6, 0x43, 0xf0, 0x4c, 0x32, 0x31, 0x48, 0x99, 0xb8, 0x1f, 0x47, 0x6c, 0xa0, + 0x3d, 0xe0, 0xa1, 0x71, 0x47, 0x7f, 0xb7, 0xc0, 0xfe, 0x99, 0x31, 0x31, 0x23, 0x2e, 0xac, 0x22, + 0xce, 0x6d, 0xf5, 0x5b, 0x7b, 0xeb, 0x41, 0x6e, 0xaa, 0x9b, 0x30, 0x8e, 0x05, 0x4b, 0x53, 0xb7, + 0x6d, 0x6e, 0xd0, 0x54, 0x37, 0xa3, 0x50, 0xb2, 0x87, 0x70, 0xe6, 0x76, 0xcc, 0x0d, 0x9a, 0x64, + 0x1b, 0xba, 0x26, 0x8e, 0x6b, 0xe9, 0x0b, 0xb4, 0x14, 0x03, 0xdf, 0xed, 0xda, 0x86, 0x81, 0x26, + 0x3d, 0x86, 0xde, 0x29, 0x4f, 0x12, 0x16, 0xc9, 0x80, 0xdd, 0x65, 0x2c, 0x95, 0xe4, 0x3d, 0xd8, + 0x09, 0x8f, 0x59, 0xea, 0xb6, 0xfa, 0x9d, 0xbd, 0x17, 0x07, 0xdb, 0xfe, 0xd3, 0xd4, 0xfd, 0x0b, + 0x1e, 0xb3, 0xc0, 0x80, 0xe8, 0x4b, 0xd8, 0x2c, 0xf8, 0xe9, 0x94, 0x27, 0x29, 0xa3, 0xbb, 0xb0, + 0xa1, 0x10, 0x69, 0xee, 0xf0, 0x15, 0xd8, 0x31, 0x9b, 0xca, 0x6b, 0x9d, 0xa0, 0x13, 0x18, 0x83, + 0x7e, 0x05, 0x07, 0x51, 0x86, 0xf6, 0xcc, 0xb8, 0xbb, 0xb0, 0xf1, 0x5d, 0x84, 0xd3, 0xeb, 0xc5, + 0x41, 0x8e, 0xc0, 0x41, 0x14, 0x06, 0x79, 0x07, 0x96, 0xe0, 0x5c, 0x6a, 0x54, 0x6d, 0x8c, 0x4b, + 0xc6, 0x44, 0xa0, 0x31, 0xf4, 0x18, 0x9c, 0x40, 0x95, 0xaf, 0x48, 0x64, 0x1f, 0xec, 0x3b, 0xd5, + 0x34, 0x64, 0xbf, 0x9e, 0x67, 0xeb, 0x9e, 0x06, 0x06, 0x45, 0x4f, 0xa0, 0x97, 0xf3, 0x31, 0xba, + 0x8f, 0xed, 0xa9, 0xc9, 0x11, 0xc7, 0x43, 0x13, 0xb0, 0x6d, 0xba, 0xb8, 0x43, 0x33, 0x0d, 0xf9, + 0x1b, 0xa8, 0x0f, 0x5b, 0x8f, 0x47, 0xe8, 0xd6, 0x83, 0x35, 0x1c, 0x1a, 0xe3, 0x78, 0x3d, 0x28, + 0x6c, 0xba, 0x09, 0xce, 0x50, 0x86, 0x32, 0x2b, 0x1c, 0x7c, 0x83, 0x5e, 0x7e, 0x80, 0xf4, 0x0f, + 0xd0, 0x4d, 0xf5, 0x09, 0xe6, 0xe5, 0xce, 0xe7, 0x85, 0x0c, 0xc4, 0xd1, 0x01, 0xd8, 0x67, 0x42, + 0x70, 0xa1, 0xaa, 0x1e, 0xf1, 0x2c, 0x91, 0x79, 0xd5, 0xb5, 0x41, 0xb6, 0xa0, 0x33, 0x49, 0x47, + 0x38, 0xb5, 0xea, 0x93, 0x7e, 0x82, 0xae, 0x71, 0xa1, 0x6a, 0xc8, 0x14, 0xb5, 0xb9, 0x86, 0xda, + 0x73, 0x60, 0x50, 0xf4, 0x6f, 0x0b, 0x2c, 0xd5, 0x76, 0xd2, 0x83, 0xf6, 0x38, 0xc6, 0x15, 0x69, + 0x8f, 0xe3, 0xc5, 0xdb, 0x91, 0xcf, 0x7a, 0xa7, 0x32, 0xeb, 0xe4, 0x04, 0xd6, 0x26, 0x4c, 0x86, + 0x71, 0x28, 0x43, 0xd7, 0xd2, 0x0d, 0xd8, 0xad, 0x1f, 0x32, 0xff, 0x1c, 0x61, 0x67, 0x89, 0x14, + 0xb3, 0xa0, 0x60, 0x95, 0x4a, 0x65, 0x2f, 0x57, 0x2a, 0xef, 0x08, 0x9c, 0x8a, 0x33, 0x55, 0x9c, + 0x1b, 0x36, 0xc3, 0x4c, 0xd4, 0xa7, 0x2a, 0xe2, 0x7d, 0x78, 0x9b, 0x31, 0x4c, 0xc4, 0x18, 0x5f, + 0xda, 0x9f, 0x5b, 0xf4, 0x23, 0xac, 0xe2, 0x72, 0xa9, 0xc1, 0x55, 0x83, 0xdf, 0x3c, 0xb8, 0x7a, + 0x39, 0x34, 0x86, 0x1e, 0x82, 0x7d, 0x7a, 0xcb, 0xcd, 0xb4, 0x2f, 0x4d, 0xfa, 0x05, 0x96, 0x9a, + 0xfd, 0xe7, 0x70, 0xd4, 0xca, 0x4e, 0x19, 0x13, 0xaa, 0x05, 0x9d, 0x05, 0xeb, 0x64, 0x40, 0xf4, + 0x0a, 0xac, 0xe1, 0x2c, 0x89, 0x54, 0x04, 0x75, 0xf0, 0xbf, 0x1d, 0x54, 0x98, 0xd2, 0xc6, 0xb4, + 0x97, 0xd9, 0x98, 0x83, 0x3f, 0x1d, 0x58, 0xbd, 0xc0, 0x76, 0x5f, 0x3e, 0x56, 0xaf, 0x3f, 0x1f, + 0xa4, 0xaa, 0x7a, 0xde, 0xdb, 0x05, 0x08, 0xd4, 0xb5, 0x15, 0xf2, 0x03, 0x6c, 0x2d, 0x27, 0xe4, + 0xcd, 0x3c, 0xba, 0xac, 0x46, 0xde, 0x4e, 0xe3, 0x7d, 0xd9, 0x97, 0xd6, 0xbf, 0x3a, 0x5f, 0x65, + 0xf9, 0xac, 0xf3, 0x55, 0x11, 0x4e, 0xba, 0x42, 0xce, 0xa1, 0x6b, 0x94, 0x86, 0xd4, 0x80, 0x2b, + 0x1a, 0xe6, 0xf5, 0x9b, 0x01, 0x85, 0xbb, 0x21, 0xac, 0xe5, 0x1a, 0x43, 0x6a, 0xea, 0xf2, 0x44, + 0x92, 0x3c, 0xba, 0x08, 0x52, 0x7e, 0x23, 0x4a, 0xc0, 0x4e, 0xe3, 0xd2, 0x34, 0xbf, 0xb1, 0x2a, + 0x59, 0x74, 0xe5, 0xaa, 0xab, 0x7f, 0xa4, 0x87, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x11, 0x45, + 0xe6, 0xf4, 0xa8, 0x07, 0x00, 0x00, } diff --git a/network/service/proto/network.pb.micro.go b/network/service/proto/network.pb.micro.go index cc01d095..a6df00c6 100644 --- a/network/service/proto/network.pb.micro.go +++ b/network/service/proto/network.pb.micro.go @@ -45,6 +45,8 @@ type NetworkService interface { Routes(ctx context.Context, in *RoutesRequest, opts ...client.CallOption) (*RoutesResponse, error) // Returns a list of known services based on routes Services(ctx context.Context, in *ServicesRequest, opts ...client.CallOption) (*ServicesResponse, error) + // Status returns network status + Status(ctx context.Context, in *StatusRequest, opts ...client.CallOption) (*StatusResponse, error) } type networkService struct { @@ -115,6 +117,16 @@ func (c *networkService) Services(ctx context.Context, in *ServicesRequest, opts return out, nil } +func (c *networkService) Status(ctx context.Context, in *StatusRequest, opts ...client.CallOption) (*StatusResponse, error) { + req := c.c.NewRequest(c.name, "Network.Status", in) + out := new(StatusResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for Network service type NetworkHandler interface { @@ -128,6 +140,8 @@ type NetworkHandler interface { Routes(context.Context, *RoutesRequest, *RoutesResponse) error // Returns a list of known services based on routes Services(context.Context, *ServicesRequest, *ServicesResponse) error + // Status returns network status + Status(context.Context, *StatusRequest, *StatusResponse) error } func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error { @@ -137,6 +151,7 @@ func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server Nodes(ctx context.Context, in *NodesRequest, out *NodesResponse) error Routes(ctx context.Context, in *RoutesRequest, out *RoutesResponse) error Services(ctx context.Context, in *ServicesRequest, out *ServicesResponse) error + Status(ctx context.Context, in *StatusRequest, out *StatusResponse) error } type Network struct { network @@ -168,3 +183,7 @@ func (h *networkHandler) Routes(ctx context.Context, in *RoutesRequest, out *Rou func (h *networkHandler) Services(ctx context.Context, in *ServicesRequest, out *ServicesResponse) error { return h.NetworkHandler.Services(ctx, in, out) } + +func (h *networkHandler) Status(ctx context.Context, in *StatusRequest, out *StatusResponse) error { + return h.NetworkHandler.Status(ctx, in, out) +} diff --git a/network/service/proto/network.proto b/network/service/proto/network.proto index ee02cf8f..da08293a 100644 --- a/network/service/proto/network.proto +++ b/network/service/proto/network.proto @@ -16,6 +16,8 @@ service Network { rpc Routes(RoutesRequest) returns (RoutesResponse) {}; // Returns a list of known services based on routes rpc Services(ServicesRequest) returns (ServicesResponse) {}; + // Status returns network status + rpc Status(StatusRequest) returns (StatusResponse) {}; } // Query is passed in a LookupRequest @@ -69,6 +71,23 @@ message ServicesResponse { repeated string services = 1; } +message StatusRequest {} + +message StatusResponse { + Status status = 1; +} + +// Error tracks network errors +message Error { + uint32 count = 1; + string msg = 2; +} + +// Status is node status +message Status { + Error error = 1; +} + // Node is network node message Node { // node id @@ -79,6 +98,8 @@ message Node { string network = 3; // associated metadata map metadata = 4; + // node status + Status status = 5; } // Connect is sent when the node connects to the network From 0ea56a5ffef7eb79e850c1573336aaa32548aa56 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 14 Jan 2020 18:22:58 +0000 Subject: [PATCH 13/15] Fixed tests --- network/node_test.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/network/node_test.go b/network/node_test.go index d4849a1c..99975415 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -21,7 +21,7 @@ func testSetup() *node { address: testNodeAddress, peers: make(map[string]*node), network: newNetwork(Name(testNodeNetName)), - err: new(nodeError), + status: newStatus(), } // add some peers to the node @@ -31,7 +31,7 @@ func testSetup() *node { address: testNode.address + "-" + id, peers: make(map[string]*node), network: testNode.network, - err: new(nodeError), + status: newStatus(), } } @@ -43,7 +43,7 @@ func testSetup() *node { address: testNode.address + "-" + id, peers: make(map[string]*node), network: testNode.network, - err: new(nodeError), + status: newStatus(), } } @@ -272,7 +272,9 @@ func TestUnpackPeerTopology(t *testing.T) { Node: &pb.Node{ Id: "newPeer", Address: "newPeerAddress", - Err: &pb.NodeError{}, + Status: &pb.Status{ + Error: &pb.Error{}, + }, }, Peers: make([]*pb.Peer, 0), } @@ -288,14 +290,18 @@ func TestUnpackPeerTopology(t *testing.T) { pbPeer1Node := &pb.Node{ Id: peer1.id, Address: peer1.address, - Err: &pb.NodeError{}, + Status: &pb.Status{ + Error: &pb.Error{}, + }, } pbPeer111 := &pb.Peer{ Node: &pb.Node{ Id: "peer111", Address: "peer111Address", - Err: &pb.NodeError{}, + Status: &pb.Status{ + Error: &pb.Error{}, + }, }, Peers: make([]*pb.Peer, 0), } @@ -304,7 +310,9 @@ func TestUnpackPeerTopology(t *testing.T) { Node: &pb.Node{ Id: "peer121", Address: "peer121Address", - Err: &pb.NodeError{}, + Status: &pb.Status{ + Error: &pb.Error{}, + }, }, Peers: make([]*pb.Peer, 0), } @@ -331,7 +339,7 @@ func TestPeersToProto(t *testing.T) { address: testNodeAddress, peers: make(map[string]*node), network: newNetwork(Name(testNodeNetName)), - err: &nodeError{}, + status: newStatus(), } topCount := 0 From dcd925f1e53a9512d9b7b995df9aaf90261bb51c Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 14 Jan 2020 18:48:42 +0000 Subject: [PATCH 14/15] Code cleanup; Indentation. --- network/default.go | 95 +++++++++++++---------------- network/network.go | 2 +- network/node.go | 9 +++ network/service/proto/network.proto | 16 ++--- 4 files changed, 62 insertions(+), 60 deletions(-) diff --git a/network/default.go b/network/default.go index 39e29b4b..06137041 100644 --- a/network/default.go +++ b/network/default.go @@ -876,16 +876,11 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address) peer := &node{ - id: pbNetPeer.Node.Id, - address: pbNetPeer.Node.Address, - link: m.msg.Header["Micro-Link"], - peers: make(map[string]*node), - status: &status{ - err: &nerr{ - count: int(pbNetPeer.Node.Status.Error.Count), - msg: errors.New(pbNetPeer.Node.Status.Error.Msg), - }, - }, + id: pbNetPeer.Node.Id, + address: pbNetPeer.Node.Address, + link: m.msg.Header["Micro-Link"], + peers: make(map[string]*node), + status: newPeerStatus(pbNetPeer), lastSeen: now, } @@ -981,16 +976,11 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network received sync message from: %s", pbNetSync.Peer.Node.Id) peer := &node{ - id: pbNetSync.Peer.Node.Id, - address: pbNetSync.Peer.Node.Address, - link: m.msg.Header["Micro-Link"], - peers: make(map[string]*node), - status: &status{ - err: &nerr{ - count: int(pbNetSync.Peer.Node.Status.Error.Count), - msg: errors.New(pbNetSync.Peer.Node.Status.Error.Msg), - }, - }, + id: pbNetSync.Peer.Node.Id, + address: pbNetSync.Peer.Node.Address, + link: m.msg.Header["Micro-Link"], + peers: make(map[string]*node), + status: newPeerStatus(pbNetSync.Peer), lastSeen: now, } @@ -1279,38 +1269,41 @@ func (n *network) manage() { // get a list of node peers peers := n.Peers() // pick a random peer from the list of peers and request full sync - if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { - go func() { - // get node peer graph to send back to the connecting node - node := PeersToProto(n.node, MaxDepth) - - msg := &pbNet.Sync{ - Peer: node, - } - - // get a list of all of our routes - routes, err := n.options.Router.Table().List() - switch err { - case nil: - // encode the routes to protobuf - pbRoutes := make([]*pbRtr.Route, 0, len(routes)) - for _, route := range routes { - pbRoute := pbUtil.RouteToProto(route) - pbRoutes = append(pbRoutes, pbRoute) - } - // pack the routes into the sync message - msg.Routes = pbRoutes - default: - // we can't list the routes - log.Debugf("Network node %s failed listing routes: %v", n.id, err) - } - - // send sync message to the newly connected peer - if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to send sync message: %v", err) - } - }() + peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()) + if peer != nil { + continue } + + go func() { + // get node peer graph to send back to the connecting node + node := PeersToProto(n.node, MaxDepth) + + msg := &pbNet.Sync{ + Peer: node, + } + + // get a list of all of our routes + routes, err := n.options.Router.Table().List() + switch err { + case nil: + // encode the routes to protobuf + pbRoutes := make([]*pbRtr.Route, 0, len(routes)) + for _, route := range routes { + pbRoute := pbUtil.RouteToProto(route) + pbRoutes = append(pbRoutes, pbRoute) + } + // pack the routes into the sync message + msg.Routes = pbRoutes + default: + // we can't list the routes + log.Debugf("Network node %s failed listing routes: %v", n.id, err) + } + + // send sync message to the newly connected peer + if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { + log.Debugf("Network failed to send sync message: %v", err) + } + }() case <-resolve.C: n.initNodes(false) } diff --git a/network/network.go b/network/network.go index e06cd2c6..535870f0 100644 --- a/network/network.go +++ b/network/network.go @@ -20,7 +20,7 @@ var ( // KeepAliveTime is the time in which we want to have sent a message to a peer KeepAliveTime = 30 * time.Second // SyncTime is the time a network node requests full sync from the network - SyncTime = 5 * time.Minute + SyncTime = 1 * time.Minute // PruneTime defines time interval to periodically check nodes that need to be pruned // due to their not announcing their presence within this time interval PruneTime = 90 * time.Second diff --git a/network/node.go b/network/node.go index a7f3c51a..d632e299 100644 --- a/network/node.go +++ b/network/node.go @@ -69,6 +69,15 @@ func newStatus() *status { } } +func newPeerStatus(peer *pb.Peer) *status { + return &status{ + err: &nerr{ + count: int(peer.Node.Status.Error.Count), + msg: errors.New(peer.Node.Status.Error.Msg), + }, + } +} + func (s *status) Error() Error { s.RLock() defer s.RUnlock() diff --git a/network/service/proto/network.proto b/network/service/proto/network.proto index da08293a..f2bbdb9e 100644 --- a/network/service/proto/network.proto +++ b/network/service/proto/network.proto @@ -6,16 +6,16 @@ import "github.com/micro/go-micro/router/service/proto/router.proto"; // Network service is usesd to gain visibility into networks service Network { - // Connect to the network - rpc Connect(ConnectRequest) returns (ConnectResponse) {}; - // Returns the entire network graph - rpc Graph(GraphRequest) returns (GraphResponse) {}; - // Returns a list of known nodes in the network + // Connect to the network + rpc Connect(ConnectRequest) returns (ConnectResponse) {}; + // Returns the entire network graph + rpc Graph(GraphRequest) returns (GraphResponse) {}; + // Returns a list of known nodes in the network rpc Nodes(NodesRequest) returns (NodesResponse) {}; - // Returns a list of known routes in the network + // Returns a list of known routes in the network rpc Routes(RoutesRequest) returns (RoutesResponse) {}; - // Returns a list of known services based on routes - rpc Services(ServicesRequest) returns (ServicesResponse) {}; + // Returns a list of known services based on routes + rpc Services(ServicesRequest) returns (ServicesResponse) {}; // Status returns network status rpc Status(StatusRequest) returns (StatusResponse) {}; } From c67ef7e01705335658ef86a20d3261ff530de372 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 14 Jan 2020 19:37:50 +0000 Subject: [PATCH 15/15] Bug fix: skip sending sync message if the peer is not in our graph --- network/default.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/default.go b/network/default.go index 06137041..2b49331f 100644 --- a/network/default.go +++ b/network/default.go @@ -1270,7 +1270,8 @@ func (n *network) manage() { peers := n.Peers() // pick a random peer from the list of peers and request full sync peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()) - if peer != nil { + // skip if we can't find randmly selected peer + if peer == nil { continue }