1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

further cleanup of tunnel/network

This commit is contained in:
Asim Aslam 2019-12-08 12:12:20 +00:00
parent 283c85d256
commit 6307d6ba51
5 changed files with 98 additions and 119 deletions

View File

@ -202,6 +202,18 @@ func (n *network) Name() string {
return n.options.Name
}
func (n *network) initNodes() {
nodes, err := n.resolveNodes()
if err != nil {
log.Debugf("Network failed to resolve nodes: %v", err)
return
}
// initialize the tunnel
n.tunnel.Init(
tunnel.Nodes(nodes...),
)
}
// resolveNodes resolves network nodes to addresses
func (n *network) resolveNodes() ([]string, error) {
// resolve the network address to network nodes
@ -253,29 +265,6 @@ func (n *network) resolveNodes() ([]string, error) {
return nodes, err
}
// resolve continuously resolves network nodes and initializes network tunnel with resolved addresses
func (n *network) resolve() {
resolve := time.NewTicker(ResolveTime)
defer resolve.Stop()
for {
select {
case <-n.closed:
return
case <-resolve.C:
nodes, err := n.resolveNodes()
if err != nil {
log.Debugf("Network failed to resolve nodes: %v", err)
continue
}
// initialize the tunnel
n.tunnel.Init(
tunnel.Nodes(nodes...),
)
}
}
}
// handleNetConn handles network announcement messages
func (n *network) handleNetConn(s tunnel.Session, msg chan *message) {
for {
@ -565,12 +554,14 @@ func (n *network) prunePeerRoutes(peer *node) error {
// manage the process of announcing to peers and prune any peer nodes that have not been
// seen for a period of time. Also removes all the routes either originated by or routable
//by the stale nodes
//by the stale nodes. it also resolves nodes periodically and adds them to the tunnel
func (n *network) manage() {
announce := time.NewTicker(AnnounceTime)
defer announce.Stop()
prune := time.NewTicker(PruneTime)
defer prune.Stop()
resolve := time.NewTicker(ResolveTime)
defer resolve.Stop()
for {
select {
@ -584,11 +575,14 @@ func (n *network) manage() {
}
case <-prune.C:
pruned := n.PruneStalePeers(PruneTime)
for id, peer := range pruned {
log.Debugf("Network peer exceeded prune time: %s", id)
n.Lock()
delete(n.peerLinks, peer.address)
n.Unlock()
if err := n.prunePeerRoutes(peer); err != nil {
log.Debugf("Network failed pruning peer %s routes: %v", id, err)
}
@ -622,6 +616,8 @@ func (n *network) manage() {
log.Debugf("Network failed deleting routes by %s: %v", route.Router, err)
}
}
case <-resolve.C:
n.initNodes()
}
}
}
@ -1165,25 +1161,19 @@ func (n *network) Connect() error {
n.Lock()
defer n.Unlock()
// try to resolve network nodes
nodes, err := n.resolveNodes()
if err != nil {
log.Debugf("Network failed to resolve nodes: %v", err)
}
// initialize the tunnel to resolved nodes
n.tunnel.Init(
tunnel.Nodes(nodes...),
)
// connect network tunnel
if err := n.tunnel.Connect(); err != nil {
n.Unlock()
return err
}
// initialise the nodes
n.initNodes()
// return if already connected
if n.connected {
// immediately resolve
initNodes()
// send the connect message
n.sendConnect()
return nil
@ -1250,11 +1240,9 @@ func (n *network) Connect() error {
return err
}
// send connect after there's a link established
// manage connection once links are established
go n.connect()
// resolve network nodes and re-init the tunnel
go n.resolve()
// broadcast announcements and prune stale nodes
// resolve nodes, broadcast announcements and prune stale nodes
go n.manage()
// advertise service routes
go n.advertise(advertChan)

View File

@ -202,7 +202,7 @@ func (t *tun) manage() {
reconnect := time.NewTicker(ReconnectTime)
defer reconnect.Stop()
// do it immediately
// do immediately
t.manageLinks()
for {
@ -215,6 +215,47 @@ func (t *tun) manage() {
}
}
// manageLink sends channel discover requests periodically and
// keepalive messages to link
func (t *tun) manageLink(link *link) {
keepalive := time.NewTicker(KeepAliveTime)
defer keepalive.Stop()
discover := time.NewTicker(DiscoverTime)
defer discover.Stop()
for {
select {
case <-t.closed:
return
case <-link.closed:
return
case <-discover.C:
// send a discovery message to all links
if err := link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "discover",
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
}
case <-keepalive.C:
// send keepalive message
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
if err := link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "keepalive",
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
t.delLink(link.Remote())
return
}
}
}
}
// manageLinks is a function that can be called to immediately to link setup
func (t *tun) manageLinks() {
var delLinks []string
@ -278,8 +319,15 @@ func (t *tun) manageLinks() {
// save the link
t.Lock()
defer t.Unlock()
// just check nothing else was setup in the interim
if _, ok := t.links[node]; ok {
link.Close()
return
}
// save the link
t.links[node] = link
t.Unlock()
}(node)
}
@ -723,59 +771,6 @@ func (t *tun) listen(link *link) {
}
}
// discover sends channel discover requests periodically
func (t *tun) discover(link *link) {
tick := time.NewTicker(DiscoverTime)
defer tick.Stop()
for {
select {
case <-tick.C:
// send a discovery message to all links
if err := link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "discover",
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
}
case <-link.closed:
return
case <-t.closed:
return
}
}
}
// keepalive periodically sends keepalive messages to link
func (t *tun) keepalive(link *link) {
keepalive := time.NewTicker(KeepAliveTime)
defer keepalive.Stop()
for {
select {
case <-t.closed:
return
case <-link.closed:
return
case <-keepalive.C:
// send keepalive message
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
if err := link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "keepalive",
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
t.delLink(link.Remote())
return
}
}
}
}
// setupLink connects to node and returns link if successful
// It returns error if the link failed to be established
func (t *tun) setupLink(node string) (*link, error) {
@ -812,11 +807,8 @@ func (t *tun) setupLink(node string) (*link, error) {
// process incoming messages
go t.listen(link)
// start keepalive monitor
go t.keepalive(link)
// discover things on the remote side
go t.discover(link)
// manage keepalives and discovery messages
go t.manageLink(link)
return link, nil
}
@ -839,11 +831,8 @@ func (t *tun) connect() error {
// create a new link
link := newLink(sock)
// start keepalive monitor
go t.keepalive(link)
// discover things on the remote side
go t.discover(link)
// manage the link
go t.manageLink(link)
// listen for inbound messages.
// only save the link once connected.
@ -870,6 +859,8 @@ func (t *tun) Connect() error {
// already connected
if t.connected {
// do it immediately
t.manageLinks()
// setup links
return nil
}
@ -884,13 +875,13 @@ func (t *tun) Connect() error {
// create new close channel
t.closed = make(chan bool)
// manage the links
go t.manage()
// process outbound messages to be sent
// process sends to all links
go t.process()
// manage the links
go t.manage()
return nil
}

View File

@ -9,15 +9,19 @@ import (
)
func TestReconnectTunnel(t *testing.T) {
// we manually override the tunnel.ReconnectTime value here
// this is so that we make the reconnects faster than the default 5s
ReconnectTime = 100 * time.Millisecond
// create a new tunnel client
tunA := NewTunnel(
Address("127.0.0.1:9096"),
Nodes("127.0.0.1:9097"),
Address("127.0.0.1:9098"),
Nodes("127.0.0.1:9099"),
)
// create a new tunnel server
tunB := NewTunnel(
Address("127.0.0.1:9097"),
Address("127.0.0.1:9099"),
)
// start tunnel
@ -27,10 +31,6 @@ func TestReconnectTunnel(t *testing.T) {
}
defer tunB.Close()
// we manually override the tunnel.ReconnectTime value here
// this is so that we make the reconnects faster than the default 5s
ReconnectTime = 200 * time.Millisecond
// start tunnel
err = tunA.Connect()
if err != nil {
@ -48,7 +48,7 @@ func TestReconnectTunnel(t *testing.T) {
wg.Add(1)
// start tunnel sender
go testBrokenTunSend(t, tunA, wait, &wg)
go testBrokenTunSend(t, tunA, wait, &wg, ReconnectTime)
// wait until done
wg.Wait()

View File

@ -420,7 +420,7 @@ func (s *session) Close() error {
default:
close(s.closed)
// don't send close on multicast
// don't send close on multicast or broadcast
if s.mode != Unicast {
return nil
}

View File

@ -206,7 +206,7 @@ func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.Wait
wait <- true
}
func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) {
defer wg.Done()
// wait for the listener to get ready
@ -234,7 +234,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr
<-wait
// give it time to reconnect
time.Sleep(5 * ReconnectTime)
time.Sleep(10 * reconnect)
// send the message
if err := c.Send(&m); err != nil {