1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

fix broken test

This commit is contained in:
Asim Aslam 2019-12-08 13:45:24 +00:00
parent ce578800d0
commit 398acc67ca
2 changed files with 179 additions and 107 deletions

View File

@ -202,9 +202,6 @@ func (t *tun) manage() {
reconnect := time.NewTicker(ReconnectTime) reconnect := time.NewTicker(ReconnectTime)
defer reconnect.Stop() defer reconnect.Stop()
// do immediately
t.manageLinks()
for { for {
select { select {
case <-t.closed: case <-t.closed:
@ -231,23 +228,13 @@ func (t *tun) manageLink(link *link) {
return return
case <-discover.C: case <-discover.C:
// send a discovery message to all links // send a discovery message to all links
if err := link.Send(&transport.Message{ if err := t.sendMsg("discover", link); err != nil {
Header: map[string]string{
"Micro-Tunnel": "discover",
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
} }
case <-keepalive.C: case <-keepalive.C:
// send keepalive message // send keepalive message
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
if err := link.Send(&transport.Message{ if err := t.sendMsg("keepalive", link); err != nil {
Header: map[string]string{
"Micro-Tunnel": "keepalive",
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
t.delLink(link.Remote()) t.delLink(link.Remote())
return return
@ -570,8 +557,10 @@ func (t *tun) listen(link *link) {
t.links[link.Remote()] = link t.links[link.Remote()] = link
t.Unlock() t.Unlock()
// send back a discovery // send back an announcement of our channels discovery
go t.announce("", "", link) go t.announce("", "", link)
// ask for the things on the other wise
go t.sendMsg("discover", link)
// nothing more to do // nothing more to do
continue continue
case "close": case "close":
@ -771,6 +760,15 @@ func (t *tun) listen(link *link) {
} }
} }
func (t *tun) sendMsg(method string, link *link) error {
return link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": method,
"Micro-Tunnel-Id": t.id,
},
})
}
// setupLink connects to node and returns link if successful // setupLink connects to node and returns link if successful
// It returns error if the link failed to be established // It returns error if the link failed to be established
func (t *tun) setupLink(node string) (*link, error) { func (t *tun) setupLink(node string) (*link, error) {
@ -790,12 +788,7 @@ func (t *tun) setupLink(node string) (*link, error) {
link.id = c.Remote() link.id = c.Remote()
// send the first connect message // send the first connect message
if err := link.Send(&transport.Message{ if err := t.sendMsg("connect", link); err != nil {
Header: map[string]string{
"Micro-Tunnel": "connect",
"Micro-Tunnel-Id": t.id,
},
}); err != nil {
link.Close() link.Close()
return nil, err return nil, err
} }
@ -813,6 +806,36 @@ func (t *tun) setupLink(node string) (*link, error) {
return link, nil return link, nil
} }
func (t *tun) setupLinks() {
var wg sync.WaitGroup
for _, node := range t.options.Nodes {
wg.Add(1)
go func(node string) {
defer wg.Done()
// we're not trying to fix existing links
if _, ok := t.links[node]; ok {
return
}
// create new link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
return
}
// save the link
t.links[node] = link
}(node)
}
// wait for all threads to finish
wg.Wait()
}
// connect the tunnel to all the nodes and listen for incoming tunnel connections // connect the tunnel to all the nodes and listen for incoming tunnel connections
func (t *tun) connect() error { func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address) l, err := t.options.Transport.Listen(t.options.Address)
@ -860,7 +883,7 @@ func (t *tun) Connect() error {
// already connected // already connected
if t.connected { if t.connected {
// do it immediately // do it immediately
t.manageLinks() t.setupLinks()
// setup links // setup links
return nil return nil
} }
@ -879,6 +902,9 @@ func (t *tun) Connect() error {
// process sends to all links // process sends to all links
go t.process() go t.process()
// call setup before managing them
t.setupLinks()
// manage the links // manage the links
go t.manage() go t.manage()

View File

@ -8,6 +8,90 @@ import (
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
defer wg.Done()
// listen on some virtual address
tl, err := tun.Listen("test-tunnel")
if err != nil {
t.Fatal(err)
}
// receiver ready; notify sender
wait <- true
// accept a connection
c, err := tl.Accept()
if err != nil {
t.Fatal(err)
}
// accept the message and close the tunnel
// we do this to simulate loss of network connection
m := new(transport.Message)
if err := c.Recv(m); err != nil {
t.Fatal(err)
}
// close all the links
for _, link := range tun.Links() {
link.Close()
}
// receiver ready; notify sender
wait <- true
// accept the message
m = new(transport.Message)
if err := c.Recv(m); err != nil {
t.Fatal(err)
}
// notify the sender we have received
wait <- true
}
func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) {
defer wg.Done()
// wait for the listener to get ready
<-wait
// dial a new session
c, err := tun.Dial("test-tunnel")
if err != nil {
t.Fatal(err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"test": "send",
},
}
// send the message
if err := c.Send(&m); err != nil {
t.Fatal(err)
}
// wait for the listener to get ready
<-wait
// give it time to reconnect
time.Sleep(reconnect)
// send the message
if err := c.Send(&m); err != nil {
t.Fatal(err)
}
// wait for the listener to receive the message
// c.Send merely enqueues the message to the link send queue and returns
// in order to verify it was received we wait for the listener to tell us
<-wait
}
// testAccept will accept connections on the transport, create a new link and tunnel on top // testAccept will accept connections on the transport, create a new link and tunnel on top
func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
@ -163,90 +247,6 @@ func TestLoopbackTunnel(t *testing.T) {
wg.Wait() wg.Wait()
} }
func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
defer wg.Done()
// listen on some virtual address
tl, err := tun.Listen("test-tunnel")
if err != nil {
t.Fatal(err)
}
// receiver ready; notify sender
wait <- true
// accept a connection
c, err := tl.Accept()
if err != nil {
t.Fatal(err)
}
// accept the message and close the tunnel
// we do this to simulate loss of network connection
m := new(transport.Message)
if err := c.Recv(m); err != nil {
t.Fatal(err)
}
// close all the links
for _, link := range tun.Links() {
link.Close()
}
// receiver ready; notify sender
wait <- true
// accept the message
m = new(transport.Message)
if err := c.Recv(m); err != nil {
t.Fatal(err)
}
// notify the sender we have received
wait <- true
}
func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) {
defer wg.Done()
// wait for the listener to get ready
<-wait
// dial a new session
c, err := tun.Dial("test-tunnel")
if err != nil {
t.Fatal(err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"test": "send",
},
}
// send the message
if err := c.Send(&m); err != nil {
t.Fatal(err)
}
// wait for the listener to get ready
<-wait
// give it time to reconnect
time.Sleep(10 * reconnect)
// send the message
if err := c.Send(&m); err != nil {
t.Fatal(err)
}
// wait for the listener to receive the message
// c.Send merely enqueues the message to the link send queue and returns
// in order to verify it was received we wait for the listener to tell us
<-wait
}
func TestTunnelRTTRate(t *testing.T) { func TestTunnelRTTRate(t *testing.T) {
// create a new tunnel client // create a new tunnel client
tunA := NewTunnel( tunA := NewTunnel(
@ -296,3 +296,49 @@ func TestTunnelRTTRate(t *testing.T) {
t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate()) t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate())
} }
} }
func TestReconnectTunnel(t *testing.T) {
// we manually override the tunnel.ReconnectTime value here
// this is so that we make the reconnects faster than the default 5s
ReconnectTime = 200 * time.Millisecond
// create a new tunnel client
tunA := NewTunnel(
Address("127.0.0.1:9098"),
Nodes("127.0.0.1:9099"),
)
// create a new tunnel server
tunB := NewTunnel(
Address("127.0.0.1:9099"),
)
// start tunnel
err := tunB.Connect()
if err != nil {
t.Fatal(err)
}
defer tunB.Close()
// start tunnel
err = tunA.Connect()
if err != nil {
t.Fatal(err)
}
defer tunA.Close()
wait := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
// start tunnel listener
go testBrokenTunAccept(t, tunB, wait, &wg)
wg.Add(1)
// start tunnel sender
go testBrokenTunSend(t, tunA, wait, &wg, ReconnectTime*5)
// wait until done
wg.Wait()
}