From d28a868e46beafd5410fea16a061edbf05c6d444 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 8 Dec 2019 14:37:17 +0000 Subject: [PATCH] Fix network startup connect --- network/default.go | 67 +++++++++++++++++++++++----------------- tunnel/reconnect_test.go | 55 --------------------------------- 2 files changed, 38 insertions(+), 84 deletions(-) delete mode 100644 tunnel/reconnect_test.go diff --git a/network/default.go b/network/default.go index a5331b45..3fde3fdb 100644 --- a/network/default.go +++ b/network/default.go @@ -202,13 +202,16 @@ func (n *network) Name() string { return n.options.Name } -func (n *network) initNodes() { +func (n *network) initNodes(startup bool) { nodes, err := n.resolveNodes() - if err != nil { + if err != nil && !startup { log.Debugf("Network failed to resolve nodes: %v", err) return } + // initialize the tunnel + log.Tracef("Network initialising nodes %+v\n", nodes) + n.tunnel.Init( tunnel.Nodes(nodes...), ) @@ -218,6 +221,11 @@ func (n *network) initNodes() { func (n *network) resolveNodes() ([]string, error) { // resolve the network address to network nodes records, err := n.options.Resolver.Resolve(n.options.Name) + if err != nil { + log.Debugf("Network failed to resolve nodes: %v", err) + } + + // keep processing nodeMap := make(map[string]bool) @@ -262,7 +270,7 @@ func (n *network) resolveNodes() ([]string, error) { } } - return nodes, err + return nodes, nil } // handleNetConn handles network announcement messages @@ -617,7 +625,7 @@ func (n *network) manage() { } } case <-resolve.C: - n.initNodes() + n.initNodes(false) } } } @@ -1137,6 +1145,8 @@ func (n *network) connect() { n.sendConnect() } + log.Tracef("connected %v discovered %v backoff %v\n", connected, discovered, backoff.Do(attempts)) + // check if we've been discovered select { case <-n.discovered: @@ -1166,33 +1176,34 @@ func (n *network) Connect() error { return err } - // initialise the nodes - n.initNodes() - // return if already connected if n.connected { - // immediately resolve - n.initNodes() - + // initialise the nodes + n.initNodes(false) // send the connect message - n.sendConnect() + go n.sendConnect() return nil } + // initialise the nodes + n.initNodes(true) + // set our internal node address // if advertise address is not set if len(n.options.Advertise) == 0 { n.server.Init(server.Advertise(n.tunnel.Address())) } - // dial into ControlChannel to send route adverts - ctrlClient, err := n.tunnel.Dial(ControlChannel, tunnel.DialMode(tunnel.Multicast)) + // listen on NetworkChannel + netListener, err := n.tunnel.Listen( + NetworkChannel, + tunnel.ListenMode(tunnel.Multicast), + tunnel.ListenTimeout(AnnounceTime*2), + ) if err != nil { return err } - n.tunClient[ControlChannel] = ctrlClient - // listen on ControlChannel ctrlListener, err := n.tunnel.Listen( ControlChannel, @@ -1203,6 +1214,14 @@ func (n *network) Connect() error { return err } + // dial into ControlChannel to send route adverts + ctrlClient, err := n.tunnel.Dial(ControlChannel, tunnel.DialMode(tunnel.Multicast)) + if err != nil { + return err + } + + n.tunClient[ControlChannel] = ctrlClient + // dial into NetworkChannel to send network messages netClient, err := n.tunnel.Dial(NetworkChannel, tunnel.DialMode(tunnel.Multicast)) if err != nil { @@ -1211,16 +1230,6 @@ func (n *network) Connect() error { n.tunClient[NetworkChannel] = netClient - // listen on NetworkChannel - netListener, err := n.tunnel.Listen( - NetworkChannel, - tunnel.ListenMode(tunnel.Multicast), - tunnel.ListenTimeout(AnnounceTime*2), - ) - if err != nil { - return err - } - // create closed channel n.closed = make(chan bool) @@ -1240,16 +1249,16 @@ func (n *network) Connect() error { return err } - // manage connection once links are established - go n.connect() - // resolve nodes, broadcast announcements and prune stale nodes - go n.manage() // advertise service routes go n.advertise(advertChan) // listen to network messages go n.processNetChan(netListener) // accept and process routes go n.processCtrlChan(ctrlListener) + // manage connection once links are established + go n.connect() + // resolve nodes, broadcast announcements and prune stale nodes + go n.manage() // we're now connected n.connected = true diff --git a/tunnel/reconnect_test.go b/tunnel/reconnect_test.go deleted file mode 100644 index 3e9331c3..00000000 --- a/tunnel/reconnect_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// +build !race - -package tunnel - -import ( - "sync" - "testing" - "time" -) - -func TestReconnectTunnel(t *testing.T) { - // we manually override the tunnel.ReconnectTime value here - // this is so that we make the reconnects faster than the default 5s - ReconnectTime = 100 * time.Millisecond - - // create a new tunnel client - tunA := NewTunnel( - Address("127.0.0.1:9098"), - Nodes("127.0.0.1:9099"), - ) - - // create a new tunnel server - tunB := NewTunnel( - Address("127.0.0.1:9099"), - ) - - // start tunnel - err := tunB.Connect() - if err != nil { - t.Fatal(err) - } - defer tunB.Close() - - // start tunnel - err = tunA.Connect() - if err != nil { - t.Fatal(err) - } - defer tunA.Close() - - wait := make(chan bool) - - var wg sync.WaitGroup - - wg.Add(1) - // start tunnel listener - go testBrokenTunAccept(t, tunB, wait, &wg) - - wg.Add(1) - // start tunnel sender - go testBrokenTunSend(t, tunA, wait, &wg, ReconnectTime) - - // wait until done - wg.Wait() -}