1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

Monitor outbound links periodically and reconnect the failed links.

This commit is contained in:
Milos Gajdos 2019-08-15 18:18:58 +01:00
parent 740cfab8d0
commit f120452d28
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -14,8 +14,10 @@ import (
) )
var ( var (
// DefaultKeepAlive defines interval we check if the links are alive // KeepAliveTime defines time interval we send keepalive messages to outbound links
DefaultKeepAlive = 30 * time.Second KeepAliveTime = 30 * time.Second
// ReconnectTime defines time interval we periodically attempt to reconnect dead links
ReconnectTime = 5 * time.Second
) )
// tun represents a network tunnel // tun represents a network tunnel
@ -126,9 +128,30 @@ func (t *tun) newSession() string {
return uuid.New().String() return uuid.New().String()
} }
// monitor monitors all links and attempts to dial the failed ones // monitor monitors outbound links and attempts to reconnect to the failed ones
func (t *tun) monitor() { func (t *tun) monitor() {
// TODO: implement this reconnect := time.NewTicker(ReconnectTime)
defer reconnect.Stop()
for {
select {
case <-t.closed:
return
case <-reconnect.C:
for _, node := range t.options.Nodes {
t.Lock()
if _, ok := t.links[node]; !ok {
link, err := t.nodeLink(node)
if err != nil {
log.Debugf("Tunnel failed to establish node link to %s: %v", node, err)
continue
}
t.links[node] = link
}
t.Unlock()
}
}
}
} }
// process outgoing messages sent by all local sockets // process outgoing messages sent by all local sockets
@ -159,17 +182,17 @@ func (t *tun) process() {
// send the message via the interface // send the message via the interface
t.Lock() t.Lock()
if len(t.links) == 0 { if len(t.links) == 0 {
log.Debugf("Zero links to send to") log.Debugf("No links to send to")
} }
for _, link := range t.links { for node, link := range t.links {
if link.loopback && msg.outbound { if link.loopback && msg.outbound {
continue continue
} }
log.Debugf("Sending %+v to %s", newMsg, link.Remote()) log.Debugf("Sending %+v to %s", newMsg, node)
if err := link.Send(newMsg); err != nil { if err := link.Send(newMsg); err != nil {
log.Debugf("Error sending %+v to %s: %v", newMsg, link.Remote(), err) log.Debugf("Error sending %+v to %s: %v", newMsg, node, err)
if err == io.EOF { if err == io.EOF {
delete(t.links, link.id) delete(t.links, node)
continue continue
} }
} }
@ -189,6 +212,11 @@ func (t *tun) listen(link *link) {
err := link.Recv(msg) err := link.Recv(msg)
if err != nil { if err != nil {
log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err) log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err)
if err == io.EOF {
t.Lock()
delete(t.links, link.Remote())
t.Unlock()
}
return return
} }
@ -211,7 +239,7 @@ func (t *tun) listen(link *link) {
// TODO: handle the close message // TODO: handle the close message
// maybe report io.EOF or kill the link // maybe report io.EOF or kill the link
continue continue
case "ping": case "keepalive":
log.Debugf("Tunnel link %s received keepalive", link.Remote()) log.Debugf("Tunnel link %s received keepalive", link.Remote())
link.lastKeepAlive = time.Now() link.lastKeepAlive = time.Now()
continue continue
@ -306,27 +334,28 @@ func (t *tun) listen(link *link) {
} }
} }
// keepalive periodically sends ping messages to link // keepalive periodically sends keepalive messages to link
func (t *tun) keepalive(link *link) { func (t *tun) keepalive(link *link) {
ping := time.NewTicker(DefaultKeepAlive) keepalive := time.NewTicker(KeepAliveTime)
defer ping.Stop() defer keepalive.Stop()
for { for {
select { select {
case <-t.closed: case <-t.closed:
return return
case <-ping.C: case <-keepalive.C:
// send ping message // send keepalive message
log.Debugf("Tunnel sending ping to link %v: %v", link.Remote(), link.id) log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
if err := link.Send(&transport.Message{ if err := link.Send(&transport.Message{
Header: map[string]string{ Header: map[string]string{
"Micro-Tunnel": "ping", "Micro-Tunnel": "keepalive",
"Micro-Tunnel-Token": t.token, "Micro-Tunnel-Token": t.token,
}, },
}); err != nil { }); err != nil {
log.Debugf("Error sending keepalive to link %v: %v", link.Remote(), err)
if err == io.EOF { if err == io.EOF {
t.Lock() t.Lock()
delete(t.links, link.id) delete(t.links, link.Remote())
t.Unlock() t.Unlock()
return return
} }
@ -336,6 +365,43 @@ func (t *tun) keepalive(link *link) {
} }
} }
// nodeLink attempts to connect to node and returns link if successful
// it returns error if the link failed to be established
func (t *tun) nodeLink(node string) (*link, error) {
log.Debugf("Tunnel dialing %s", node)
c, err := t.options.Transport.Dial(node)
if err != nil {
log.Debugf("Tunnel failed to connect to %s: %v", node, err)
return nil, err
}
log.Debugf("Tunnel connected to %s", node)
if err := c.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "connect",
"Micro-Tunnel-Token": t.token,
},
}); err != nil {
return nil, err
}
// save the link
id := uuid.New().String()
link := &link{
Socket: c,
id: id,
}
t.links[node] = link
// process incoming messages
go t.listen(link)
// start keepalive monitor
go t.keepalive(link)
return link, nil
}
// connect the tunnel to all the nodes and listen for incoming tunnel connections // connect the tunnel to all the nodes and listen for incoming tunnel connections
func (t *tun) connect() error { func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address) l, err := t.options.Transport.Listen(t.options.Address)
@ -357,14 +423,14 @@ func (t *tun) connect() error {
Socket: sock, Socket: sock,
id: id, id: id,
} }
t.links[id] = link t.links[sock.Remote()] = link
t.Unlock() t.Unlock()
// delete the link // delete the link
defer func() { defer func() {
log.Debugf("Deleting connection from %s", sock.Remote()) log.Debugf("Deleting connection from %s", sock.Remote())
t.Lock() t.Lock()
delete(t.links, id) delete(t.links, sock.Remote())
t.Unlock() t.Unlock()
}() }()
@ -387,43 +453,23 @@ func (t *tun) connect() error {
continue continue
} }
log.Debugf("Tunnel dialing %s", node) // connect to node and return link
// TODO: reconnection logic is required to keep the tunnel established link, err := t.nodeLink(node)
c, err := t.options.Transport.Dial(node)
if err != nil { if err != nil {
log.Debugf("Tunnel failed to connect to %s: %v", node, err) log.Debugf("Tunnel failed to establish node link to %s: %v", node, err)
continue continue
} }
log.Debugf("Tunnel connected to %s", node)
if err := c.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "connect",
"Micro-Tunnel-Token": t.token,
},
}); err != nil {
continue
}
// save the link // save the link
id := uuid.New().String() t.links[node] = link
link := &link{
Socket: c,
id: id,
}
t.links[id] = link
// process incoming messages
go t.listen(link)
// start keepalive monitor
go t.keepalive(link)
} }
// process outbound messages to be sent // process outbound messages to be sent
// process sends to all links // process sends to all links
go t.process() go t.process()
// monitor links
go t.monitor()
return nil return nil
} }
@ -452,7 +498,7 @@ func (t *tun) Connect() error {
func (t *tun) close() error { func (t *tun) close() error {
// close all the links // close all the links
for id, link := range t.links { for node, link := range t.links {
link.Send(&transport.Message{ link.Send(&transport.Message{
Header: map[string]string{ Header: map[string]string{
"Micro-Tunnel": "close", "Micro-Tunnel": "close",
@ -460,7 +506,7 @@ func (t *tun) close() error {
}, },
}) })
link.Close() link.Close()
delete(t.links, id) delete(t.links, node)
} }
// close the listener // close the listener