From 750267b30805c1ea5e7179c2e7e6e78db3ad81a6 Mon Sep 17 00:00:00 2001 From: Milos Gajdos <milosgajdos83@gmail.com> Date: Tue, 13 Aug 2019 20:11:23 +0100 Subject: [PATCH 1/4] first commit to draft up a way for Sending messages to loopback --- tunnel/default.go | 55 +++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 6ba7f878..26a4a4d0 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -17,6 +17,9 @@ type tun struct { sync.RWMutex + // tunnel token + token string + // to indicate if we're connected or not connected bool @@ -50,6 +53,7 @@ func newTunnel(opts ...Option) *tun { return &tun{ options: options, + token: uuid.New().String(), send: make(chan *message, 128), closed: make(chan bool), sockets: make(map[string]*socket), @@ -144,7 +148,7 @@ func (t *tun) process() { } // process incoming messages -func (t *tun) listen(link transport.Socket, listener bool) { +func (t *tun) listen(link transport.Socket) { for { // process anything via the net interface msg := new(transport.Message) @@ -154,11 +158,24 @@ func (t *tun) listen(link transport.Socket, listener bool) { return } + var loopback bool + switch msg.Header["Micro-Tunnel"] { - case "connect", "close": - // TODO: handle the connect/close message - // maybe used to create the dial/listen sockets - // or report io.EOF or maybe to kill the link + case "connect": + // TODO: handle the connect message + // check the Micro-Tunnel-Token + token, ok := msg.Header["Micro-Tunnel-Token"] + if !ok { + // no token found; bailing + continue + } + // are we connecting to ourselves? + if token == t.token { + loopback = true + } + case "close": + // TODO: handle the close message + // maybe report io.EOF or kill the link continue } @@ -182,18 +199,17 @@ func (t *tun) listen(link transport.Socket, listener bool) { var exists bool log.Debugf("Received %+v from %s", msg, link.Remote()) - // get the socket based on the tunnel id and session - // this could be something we dialed in which case - // we have a session for it otherwise its a listener - s, exists = t.getSocket(id, session) - if !exists { - // try get it based on just the tunnel id - // the assumption here is that a listener - // has no session but its set a listener session - s, exists = t.getSocket(id, "listener") - } - // no socket in existence + switch { + case loopback: + s, exists = t.getSocket(id, "listener") + default: + // get the socket based on the tunnel id and session + // this could be something we dialed in which case + // we have a session for it otherwise its a listener + s, exists = t.getSocket(id, session) + } + // bail if no socket has been found if !exists { log.Debugf("Tunnel skipping no socket exists") // drop it, we don't care about @@ -277,7 +293,7 @@ func (t *tun) connect() error { }() // listen for inbound messages - t.listen(sock, true) + t.listen(sock) }) t.Lock() @@ -306,14 +322,15 @@ func (t *tun) connect() error { if err := c.Send(&transport.Message{ Header: map[string]string{ - "Micro-Tunnel": "connect", + "Micro-Tunnel": "connect", + "Micro-Tunnel-Token": t.token, }, }); err != nil { continue } // process incoming messages - go t.listen(c, false) + go t.listen(c) // save the link id := uuid.New().String() From e607485c6b0dad0fd58d056881b2995c8e041303 Mon Sep 17 00:00:00 2001 From: Milos Gajdos <milosgajdos83@gmail.com> Date: Wed, 14 Aug 2019 01:23:03 +0100 Subject: [PATCH 2/4] Check for token in every received message. --- tunnel/default.go | 32 +++++++++++++++++++++++--------- tunnel/tunnel_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 26a4a4d0..ebb50365 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -96,6 +96,7 @@ func (t *tun) newSocket(id, session string) (*socket, bool) { t.Unlock() return nil, false } + t.sockets[id+session] = s t.Unlock() @@ -158,21 +159,26 @@ func (t *tun) listen(link transport.Socket) { return } + // loopback flag var loopback bool - switch msg.Header["Micro-Tunnel"] { - case "connect": - // TODO: handle the connect message - // check the Micro-Tunnel-Token - token, ok := msg.Header["Micro-Tunnel-Token"] - if !ok { - // no token found; bailing - continue - } + // TODO: figure out the way how to populate Micro-Tunnel-Token for every message + + // check the Micro-Tunnel-Token + token, ok := msg.Header["Micro-Tunnel-Token"] + if ok { // are we connecting to ourselves? if token == t.token { loopback = true } + } + + switch msg.Header["Micro-Tunnel"] { + case "connect": + // connecting without token is not allowed + if token == "" { + continue + } case "close": // TODO: handle the close message // maybe report io.EOF or kill the link @@ -187,6 +193,8 @@ func (t *tun) listen(link transport.Socket) { session := msg.Header["Micro-Tunnel-Session"] delete(msg.Header, "Micro-Tunnel-Session") + // TODO: should we delete Micro-Tunnel-Token header, too? + // if the session id is blank there's nothing we can do // TODO: check this is the case, is there any reason // why we'd have a blank session? Is the tunnel @@ -208,6 +216,12 @@ func (t *tun) listen(link transport.Socket) { // this could be something we dialed in which case // we have a session for it otherwise its a listener s, exists = t.getSocket(id, session) + if !exists { + // try get it based on just the tunnel id + // the assumption here is that a listener + // has no session but its set a listener session + s, exists = t.getSocket(id, "listener") + } } // bail if no socket has been found if !exists { diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 721479bb..19ddce2b 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -98,3 +98,35 @@ func TestTunnel(t *testing.T) { // wait until done wg.Wait() } + +//func TestLoopbackTunnel(t *testing.T) { +// // create a new tunnel client +// tun := NewTunnel( +// Address("127.0.0.1:9096"), +// Nodes("127.0.0.1:9096"), +// ) +// +// // start tunB +// err := tun.Connect() +// if err != nil { +// t.Fatal(err) +// } +// defer tun.Close() +// +// time.Sleep(time.Millisecond * 50) +// +// var wg sync.WaitGroup +// +// // start accepting connections +// // on tunnel A +// wg.Add(1) +// go testAccept(t, tun, &wg) +// +// time.Sleep(time.Millisecond * 50) +// +// // dial and send via B +// testSend(t, tun) +// +// // wait until done +// wg.Wait() +//} From 151bcf0ea1f303257a9e166fa05e152b211ea323 Mon Sep 17 00:00:00 2001 From: Milos Gajdos <milosgajdos83@gmail.com> Date: Wed, 14 Aug 2019 13:00:10 +0100 Subject: [PATCH 3/4] Send and receive on loopback tunnel interface --- tunnel/default.go | 35 +++++++++++------------- tunnel/tunnel_test.go | 62 +++++++++++++++++++++---------------------- 2 files changed, 47 insertions(+), 50 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index ebb50365..7400e0ae 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -131,6 +131,9 @@ func (t *tun) process() { // set the session id newMsg.Header["Micro-Tunnel-Session"] = msg.session + // set the tunnel token + newMsg.Header["Micro-Tunnel-Token"] = t.token + // send the message via the interface t.RLock() if len(t.links) == 0 { @@ -150,6 +153,9 @@ func (t *tun) process() { // process incoming messages func (t *tun) listen(link transport.Socket) { + // loopback flag + var loopback bool + for { // process anything via the net interface msg := new(transport.Message) @@ -159,26 +165,19 @@ func (t *tun) listen(link transport.Socket) { return } - // loopback flag - var loopback bool + switch msg.Header["Micro-Tunnel"] { + case "connect": + // check the Micro-Tunnel-Token + token, ok := msg.Header["Micro-Tunnel-Token"] + if !ok { + continue + } - // TODO: figure out the way how to populate Micro-Tunnel-Token for every message - - // check the Micro-Tunnel-Token - token, ok := msg.Header["Micro-Tunnel-Token"] - if ok { // are we connecting to ourselves? if token == t.token { loopback = true } - } - - switch msg.Header["Micro-Tunnel"] { - case "connect": - // connecting without token is not allowed - if token == "" { - continue - } + continue case "close": // TODO: handle the close message // maybe report io.EOF or kill the link @@ -193,8 +192,6 @@ func (t *tun) listen(link transport.Socket) { session := msg.Header["Micro-Tunnel-Session"] delete(msg.Header, "Micro-Tunnel-Session") - // TODO: should we delete Micro-Tunnel-Token header, too? - // if the session id is blank there's nothing we can do // TODO: check this is the case, is there any reason // why we'd have a blank session? Is the tunnel @@ -366,7 +363,8 @@ func (t *tun) close() error { for id, link := range t.links { link.Send(&transport.Message{ Header: map[string]string{ - "Micro-Tunnel": "close", + "Micro-Tunnel": "close", + "Micro-Tunnel-Token": t.token, }, }) link.Close() @@ -444,7 +442,6 @@ func (t *tun) Dial(addr string) (Conn, error) { if !ok { return nil, errors.New("error dialing " + addr) } - // set remote c.remote = addr // set local diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 19ddce2b..3bf17ef1 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -99,34 +99,34 @@ func TestTunnel(t *testing.T) { wg.Wait() } -//func TestLoopbackTunnel(t *testing.T) { -// // create a new tunnel client -// tun := NewTunnel( -// Address("127.0.0.1:9096"), -// Nodes("127.0.0.1:9096"), -// ) -// -// // start tunB -// err := tun.Connect() -// if err != nil { -// t.Fatal(err) -// } -// defer tun.Close() -// -// time.Sleep(time.Millisecond * 50) -// -// var wg sync.WaitGroup -// -// // start accepting connections -// // on tunnel A -// wg.Add(1) -// go testAccept(t, tun, &wg) -// -// time.Sleep(time.Millisecond * 50) -// -// // dial and send via B -// testSend(t, tun) -// -// // wait until done -// wg.Wait() -//} +func TestLoopbackTunnel(t *testing.T) { + // create a new tunnel client + tun := NewTunnel( + Address("127.0.0.1:9096"), + Nodes("127.0.0.1:9096"), + ) + + // start tunB + err := tun.Connect() + if err != nil { + t.Fatal(err) + } + defer tun.Close() + + time.Sleep(time.Millisecond * 50) + + var wg sync.WaitGroup + + // start accepting connections + // on tunnel A + wg.Add(1) + go testAccept(t, tun, &wg) + + time.Sleep(time.Millisecond * 50) + + // dial and send via B + testSend(t, tun) + + // wait until done + wg.Wait() +} From 9f2f0e3ceae77f3e0a642907c73912f17499b093 Mon Sep 17 00:00:00 2001 From: Milos Gajdos <milosgajdos83@gmail.com> Date: Wed, 14 Aug 2019 13:26:23 +0100 Subject: [PATCH 4/4] Moved Close method to the bottom --- tunnel/default.go | 62 ++++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 7400e0ae..90a641f0 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -61,6 +61,14 @@ func newTunnel(opts ...Option) *tun { } } +// Init initializes tunnel options +func (t *tun) Init(opts ...Option) error { + for _, o := range opts { + o(&t.options) + } + return nil +} + // getSocket returns a socket from the internal socket map. // It does this based on the Micro-Tunnel-Id and Micro-Tunnel-Session func (t *tun) getSocket(id, session string) (*socket, bool) { @@ -273,6 +281,7 @@ func (t *tun) listen(link transport.Socket) { } } +// connect the tunnel to all the nodes and listen for incoming tunnel connections func (t *tun) connect() error { l, err := t.options.Transport.Listen(t.options.Address) if err != nil { @@ -358,6 +367,29 @@ func (t *tun) connect() error { return nil } +// Connect the tunnel +func (t *tun) Connect() error { + t.Lock() + defer t.Unlock() + + // already connected + if t.connected { + return nil + } + + // send the connect message + if err := t.connect(); err != nil { + return err + } + + // set as connected + t.connected = true + // create new close channel + t.closed = make(chan bool) + + return nil +} + func (t *tun) close() error { // close all the links for id, link := range t.links { @@ -405,36 +437,6 @@ func (t *tun) Close() error { return nil } -// Connect the tunnel -func (t *tun) Connect() error { - t.Lock() - defer t.Unlock() - - // already connected - if t.connected { - return nil - } - - // send the connect message - if err := t.connect(); err != nil { - return err - } - - // set as connected - t.connected = true - // create new close channel - t.closed = make(chan bool) - - return nil -} - -func (t *tun) Init(opts ...Option) error { - for _, o := range opts { - o(&t.options) - } - return nil -} - // Dial an address func (t *tun) Dial(addr string) (Conn, error) { log.Debugf("Tunnel dialing %s", addr)