1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

Fix network startup connect

This commit is contained in:
Asim Aslam 2019-12-08 14:37:17 +00:00
parent 398acc67ca
commit d28a868e46
2 changed files with 38 additions and 84 deletions

View File

@ -202,13 +202,16 @@ func (n *network) Name() string {
return n.options.Name
}
func (n *network) initNodes() {
func (n *network) initNodes(startup bool) {
nodes, err := n.resolveNodes()
if err != nil {
if err != nil && !startup {
log.Debugf("Network failed to resolve nodes: %v", err)
return
}
// initialize the tunnel
log.Tracef("Network initialising nodes %+v\n", nodes)
n.tunnel.Init(
tunnel.Nodes(nodes...),
)
@ -218,6 +221,11 @@ func (n *network) initNodes() {
func (n *network) resolveNodes() ([]string, error) {
// resolve the network address to network nodes
records, err := n.options.Resolver.Resolve(n.options.Name)
if err != nil {
log.Debugf("Network failed to resolve nodes: %v", err)
}
// keep processing
nodeMap := make(map[string]bool)
@ -262,7 +270,7 @@ func (n *network) resolveNodes() ([]string, error) {
}
}
return nodes, err
return nodes, nil
}
// handleNetConn handles network announcement messages
@ -617,7 +625,7 @@ func (n *network) manage() {
}
}
case <-resolve.C:
n.initNodes()
n.initNodes(false)
}
}
}
@ -1137,6 +1145,8 @@ func (n *network) connect() {
n.sendConnect()
}
log.Tracef("connected %v discovered %v backoff %v\n", connected, discovered, backoff.Do(attempts))
// check if we've been discovered
select {
case <-n.discovered:
@ -1166,33 +1176,34 @@ func (n *network) Connect() error {
return err
}
// initialise the nodes
n.initNodes()
// return if already connected
if n.connected {
// immediately resolve
n.initNodes()
// initialise the nodes
n.initNodes(false)
// send the connect message
n.sendConnect()
go n.sendConnect()
return nil
}
// initialise the nodes
n.initNodes(true)
// set our internal node address
// if advertise address is not set
if len(n.options.Advertise) == 0 {
n.server.Init(server.Advertise(n.tunnel.Address()))
}
// dial into ControlChannel to send route adverts
ctrlClient, err := n.tunnel.Dial(ControlChannel, tunnel.DialMode(tunnel.Multicast))
// listen on NetworkChannel
netListener, err := n.tunnel.Listen(
NetworkChannel,
tunnel.ListenMode(tunnel.Multicast),
tunnel.ListenTimeout(AnnounceTime*2),
)
if err != nil {
return err
}
n.tunClient[ControlChannel] = ctrlClient
// listen on ControlChannel
ctrlListener, err := n.tunnel.Listen(
ControlChannel,
@ -1203,6 +1214,14 @@ func (n *network) Connect() error {
return err
}
// dial into ControlChannel to send route adverts
ctrlClient, err := n.tunnel.Dial(ControlChannel, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
n.tunClient[ControlChannel] = ctrlClient
// dial into NetworkChannel to send network messages
netClient, err := n.tunnel.Dial(NetworkChannel, tunnel.DialMode(tunnel.Multicast))
if err != nil {
@ -1211,16 +1230,6 @@ func (n *network) Connect() error {
n.tunClient[NetworkChannel] = netClient
// listen on NetworkChannel
netListener, err := n.tunnel.Listen(
NetworkChannel,
tunnel.ListenMode(tunnel.Multicast),
tunnel.ListenTimeout(AnnounceTime*2),
)
if err != nil {
return err
}
// create closed channel
n.closed = make(chan bool)
@ -1240,16 +1249,16 @@ func (n *network) Connect() error {
return err
}
// manage connection once links are established
go n.connect()
// resolve nodes, broadcast announcements and prune stale nodes
go n.manage()
// advertise service routes
go n.advertise(advertChan)
// listen to network messages
go n.processNetChan(netListener)
// accept and process routes
go n.processCtrlChan(ctrlListener)
// manage connection once links are established
go n.connect()
// resolve nodes, broadcast announcements and prune stale nodes
go n.manage()
// we're now connected
n.connected = true

View File

@ -1,55 +0,0 @@
// +build !race
package tunnel
import (
"sync"
"testing"
"time"
)
func TestReconnectTunnel(t *testing.T) {
// we manually override the tunnel.ReconnectTime value here
// this is so that we make the reconnects faster than the default 5s
ReconnectTime = 100 * time.Millisecond
// create a new tunnel client
tunA := NewTunnel(
Address("127.0.0.1:9098"),
Nodes("127.0.0.1:9099"),
)
// create a new tunnel server
tunB := NewTunnel(
Address("127.0.0.1:9099"),
)
// start tunnel
err := tunB.Connect()
if err != nil {
t.Fatal(err)
}
defer tunB.Close()
// start tunnel
err = tunA.Connect()
if err != nil {
t.Fatal(err)
}
defer tunA.Close()
wait := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
// start tunnel listener
go testBrokenTunAccept(t, tunB, wait, &wg)
wg.Add(1)
// start tunnel sender
go testBrokenTunSend(t, tunA, wait, &wg, ReconnectTime)
// wait until done
wg.Wait()
}