1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Replace send message code by one network method

This commit is contained in:
Milos Gajdos 2019-09-05 17:18:16 +01:00
parent 2522d8cb96
commit 5ddfd911ba
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -2,6 +2,7 @@ package network
import (
"container/list"
"errors"
"sync"
"time"
@ -26,6 +27,10 @@ var (
ControlChannel = "control"
// DefaultLink is default network link
DefaultLink = "network"
// ErrMsgUnknown is returned when unknown message is attempted to send or receive
ErrMsgUnknown = errors.New("unknown message")
// ErrChannelUnknown is returned when attempting to send or received on unknown channel
ErrChannelUnknown = errors.New("unknown channel")
)
// node is network node
@ -319,8 +324,8 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
lastSeen: now,
}
n.Unlock()
// advertise the new neighbour to the network
if err := n.advertiseNeighbours(client); err != nil {
// advertise yourself to the network
if err := n.sendMsg("neighbour", NetworkChannel); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err)
}
// advertise all the routes when a new node has connected
@ -342,43 +347,21 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
n.Lock()
log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id)
// only add the neighbour if it is NOT already in node's list of neighbours
if _, ok := n.neighbours[pbNetNeighbour.Node.Id]; !ok {
_, exists := n.neighbours[pbNetNeighbour.Node.Id]
if !exists {
n.neighbours[pbNetNeighbour.Node.Id] = &node{
id: pbNetNeighbour.Node.Id,
address: pbNetNeighbour.Node.Address,
neighbours: make(map[string]*node),
lastSeen: now,
}
// send a solicit message when discovering a new node
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetSolicit := &pbNet.Solicit{
Node: node,
}
if body, err := proto.Marshal(pbNetSolicit); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "solicit",
},
Body: body,
}
log.Debugf("Network sending solicit message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send solicit messsage: %v", err)
}
}
}
// update lastSeen timestamp
if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) {
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
}
// update/store the neighbour node neighbours
// NOTE: * we dont update lastSeen time for the neighbours of the neighbour
// NOTE: * we do NOT update lastSeen time for the neighbours of the neighbour
// * even though we are NOT interested in neighbours of neighbours here
// we still allocate the map of neighbours for each of them
for _, pbNeighbour := range pbNetNeighbour.Neighbours {
@ -390,6 +373,13 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
n.neighbours[pbNetNeighbour.Node.Id].neighbours[neighbourNode.id] = neighbourNode
}
n.Unlock()
// send a solicit message when discovering a new node
// NOTE: we need to send the solicit message here after the Lock is released as sendMsg locs
if !exists {
if err := n.sendMsg("solicit", NetworkChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
}
case "close":
pbNetClose := &pbNet.Close{}
if err := proto.Unmarshal(m.Body, pbNetClose); err != nil {
@ -414,8 +404,29 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
}
}
// advertiseNeighbours sends a neighbour message to the network
func (n *network) advertiseNeighbours(client transport.Client) error {
// sendMsg sends a message to the tunnel channel
func (n *network) sendMsg(msgType string, channel string) error {
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
var protoMsg proto.Message
switch msgType {
case "connect":
protoMsg = &pbNet.Connect{
Node: node,
}
case "close":
protoMsg = &pbNet.Close{
Node: node,
}
case "solicit":
protoMsg = &pbNet.Solicit{
Node: node,
}
case "neighbour":
n.RLock()
nodes := make([]*pbNet.Node, len(n.neighbours))
i := 0
@ -427,33 +438,36 @@ func (n *network) advertiseNeighbours(client transport.Client) error {
i++
}
n.RUnlock()
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetNeighbour := &pbNet.Neighbour{
protoMsg = &pbNet.Neighbour{
Node: node,
Neighbours: nodes,
}
default:
return ErrMsgUnknown
}
body, err := proto.Marshal(pbNetNeighbour)
body, err := proto.Marshal(protoMsg)
if err != nil {
// TODO: should we bail here?
log.Debugf("Network failed to marshal neighbour message: %v", err)
return err
}
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "neighbour",
"Micro-Method": msgType,
},
Body: body,
}
log.Debugf("Network sending neighbour message from: %s", node.Id)
n.RLock()
client, ok := n.tunClient[channel]
if !ok {
n.RUnlock()
return ErrChannelUnknown
}
n.RUnlock()
log.Debugf("Network sending %s message from: %s", msgType, node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send neighbour messsage: %v", err)
return err
}
@ -471,7 +485,7 @@ func (n *network) announce(client transport.Client) {
return
case <-announce.C:
// advertise yourself to the network
if err := n.advertiseNeighbours(client); err != nil {
if err := n.sendMsg("neighbour", NetworkChannel); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err)
continue
}
@ -644,27 +658,8 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
}
n.neighbours[pbRtrAdvert.Id] = advertNode
// send a solicit message when discovering a new node
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetSolicit := &pbNet.Solicit{
Node: node,
}
if body, err := proto.Marshal(pbNetSolicit); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "solicit",
},
Body: body,
}
log.Debugf("Network sending solicit message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send solicit messsage: %v", err)
}
if err := n.sendMsg("solicit", NetworkChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
}
n.RUnlock()
@ -794,8 +789,6 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A
// Connect connects the network
func (n *network) Connect() error {
n.Lock()
defer n.Unlock()
// return if already connected
if n.connected {
return nil
@ -863,32 +856,14 @@ func (n *network) Connect() error {
if err := n.server.Start(); err != nil {
return err
}
n.Unlock()
// send connect message to NetworkChannel
// NOTE: in theory we could do this as soon as
// Dial to NetworkChannel succeeds, but instead
// we initialize all other node resources first
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetConnect := &pbNet.Connect{
Node: node,
}
// only proceed with sending to NetworkChannel if marshal succeeds
if body, err := proto.Marshal(pbNetConnect); err == nil {
m := transport.Message{
Header: map[string]string{
"Micro-Method": "connect",
},
Body: body,
}
log.Debugf("Network sending connect message: %s", node.Id)
if err := netClient.Send(&m); err != nil {
log.Debugf("Network failed to send connect messsage: %v", err)
}
if err := n.sendMsg("connect", NetworkChannel); err != nil {
log.Debugf("Network failed to send connect message: %s", err)
}
// go resolving network nodes
@ -904,8 +879,9 @@ func (n *network) Connect() error {
// accept and process routes
go n.processCtrlChan(ctrlClient, ctrlListener)
// set connected to true
n.Lock()
n.connected = true
n.Unlock()
return nil
}
@ -982,31 +958,9 @@ func (n *network) Close() error {
return nil
default:
// send close message only if we managed to connect to NetworkChannel
if netClient, ok := n.tunClient[NetworkChannel]; ok {
// send connect message to NetworkChannel
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetClose := &pbNet.Close{
Node: node,
}
// only proceed with sending to NetworkChannel if marshal succeeds
if body, err := proto.Marshal(pbNetClose); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "close",
},
Body: body,
}
log.Debugf("Network sending close message from: %s", node.Id)
if err := netClient.Send(&m); err != nil {
log.Debugf("Network failed to send close messsage: %v", err)
}
}
log.Debugf("Sending close message from: %s", n.options.Id)
if err := n.sendMsg("close", NetworkChannel); err != nil {
log.Debugf("Network failed to send close message: %s", err)
}
// TODO: send close message to the network channel
close(n.closed)