diff --git a/tunnel/default.go b/tunnel/default.go index d0eecf51..eb26eaa1 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -1,79 +1,436 @@ package tunnel import ( + "crypto/sha256" + "errors" + "fmt" "sync" + "github.com/google/uuid" "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/util/log" ) +// tun represents a network tunnel type tun struct { + options Options + sync.RWMutex - tr transport.Transport - options Options + + // to indicate if we're connected or not connected bool - closed chan bool + + // the send channel for all messages + send chan *message + + // close channel + closed chan bool + + // a map of sockets based on Micro-Tunnel-Id + sockets map[string]*socket + + // outbound links + links map[string]*link + + // listener + listener transport.Listener } -func newTunnel(opts ...Option) Tunnel { - // initialize default options - options := DefaultOptions() +type link struct { + transport.Socket + id string +} +// create new tunnel on top of a link +func newTunnel(opts ...Option) *tun { + options := DefaultOptions() for _, o := range opts { o(&options) } - // tunnel transport - tr := newTransport() - - t := &tun{ - tr: tr, + return &tun{ options: options, + send: make(chan *message, 128), closed: make(chan bool), + sockets: make(map[string]*socket), + links: make(map[string]*link), + } +} + +// getSocket returns a socket from the internal socket map. +// It does this based on the Micro-Tunnel-Id and Micro-Tunnel-Session +func (t *tun) getSocket(id, session string) (*socket, bool) { + // get the socket + t.RLock() + s, ok := t.sockets[id+session] + t.RUnlock() + return s, ok +} + +// newSocket creates a new socket and saves it +func (t *tun) newSocket(id, session string) (*socket, bool) { + // hash the id + h := sha256.New() + h.Write([]byte(id)) + id = fmt.Sprintf("%x", h.Sum(nil)) + + // new socket + s := &socket{ + id: id, + session: session, + closed: make(chan bool), + recv: make(chan *message, 128), + send: t.send, + wait: make(chan bool), } - return t + // save socket + t.Lock() + _, ok := t.sockets[id+session] + if ok { + // socket already exists + t.Unlock() + return nil, false + } + t.sockets[id+session] = s + t.Unlock() + + // return socket + return s, true } -// Id returns tunnel id -func (t *tun) Id() string { - return t.options.Id +// TODO: use tunnel id as part of the session +func (t *tun) newSession() string { + return uuid.New().String() } -// Options returns tunnel options -func (t *tun) Options() Options { - return t.options +// process outgoing messages sent by all local sockets +func (t *tun) process() { + // manage the send buffer + // all pseudo sockets throw everything down this + for { + select { + case msg := <-t.send: + nmsg := &transport.Message{ + Header: msg.data.Header, + Body: msg.data.Body, + } + + // set the tunnel id on the outgoing message + nmsg.Header["Micro-Tunnel-Id"] = msg.id + + // set the session id + nmsg.Header["Micro-Tunnel-Session"] = msg.session + + // send the message via the interface + t.RLock() + for _, link := range t.links { + link.Send(nmsg) + } + t.RUnlock() + case <-t.closed: + return + } + } } -// Address returns tunnel listen address -func (t *tun) Address() string { - return t.options.Address +// process incoming messages +func (t *tun) listen(link transport.Socket, listener bool) { + for { + // process anything via the net interface + msg := new(transport.Message) + err := link.Recv(msg) + if err != nil { + return + } + + // first check Micro-Tunnel + switch msg.Header["Micro-Tunnel"] { + case "connect": + // assuming new connection + // TODO: do something with this + continue + case "close": + // assuming connection closed + // TODO: do something with this + continue + } + + // the tunnel id + id := msg.Header["Micro-Tunnel-Id"] + + // the session id + session := msg.Header["Micro-Tunnel-Session"] + + // if the session id is blank there's nothing we can do + // TODO: check this is the case, is there any reason + // why we'd have a blank session? Is the tunnel + // used for some other purpose? + if len(id) == 0 || len(session) == 0 { + continue + } + + var s *socket + var exists bool + + // if its a local listener then we use that as the session id + // e.g we're using a loopback connecting to ourselves + if listener { + s, exists = t.getSocket(id, "listener") + } else { + // get the socket based on the tunnel id and session + // this could be something we dialed in which case + // we have a session for it otherwise its a listener + s, exists = t.getSocket(id, session) + if !exists { + // try get it based on just the tunnel id + // the assumption here is that a listener + // has no session but its set a listener session + s, exists = t.getSocket(id, "listener") + } + } + + // no socket in existence + if !exists { + // drop it, we don't care about + // messages we don't know about + continue + } + + // is the socket closed? + select { + case <-s.closed: + // closed + delete(t.sockets, id) + continue + default: + // process + } + + // is the socket new? + select { + // if its new the socket is actually blocked waiting + // for a connection. so we check if its waiting. + case <-s.wait: + // if its waiting e.g its new then we close it + default: + // set remote address of the socket + s.remote = msg.Header["Remote"] + close(s.wait) + } + + // construct a new transport message + tmsg := &transport.Message{ + Header: msg.Header, + Body: msg.Body, + } + + // construct the internal message + imsg := &message{ + id: id, + session: session, + data: tmsg, + } + + // append to recv backlog + // we don't block if we can't pass it on + select { + case s.recv <- imsg: + default: + } + } } -// Transport returns tunnel client transport -func (t *tun) Transport() transport.Transport { - return t.tr -} +func (t *tun) connect() error { + l, err := t.options.Transport.Listen(t.options.Address) + if err != nil { + return err + } + + // save the listener + t.listener = l + + go func() { + // accept inbound connections + err := l.Accept(func(sock transport.Socket) { + // save the link + id := uuid.New().String() + t.Lock() + t.links[id] = &link{ + Socket: sock, + id: id, + } + t.Unlock() + + // delete the link + defer func() { + t.Lock() + delete(t.links, id) + t.Unlock() + }() + + // listen for inbound messages + t.listen(sock, true) + }) + + t.Lock() + defer t.Unlock() + + // still connected but the tunnel died + if err != nil && t.connected { + log.Logf("Tunnel listener died: %v", err) + } + }() + + for _, node := range t.options.Nodes { + c, err := t.options.Transport.Dial(node) + if err != nil { + log.Debugf("Tunnel failed to connect to %s: %v", node, err) + continue + } + + err = c.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "connect", + }, + }) + + if err != nil { + continue + } + + // process incoming messages + go t.listen(c, false) + + // save the link + id := uuid.New().String() + t.links[id] = &link{ + Socket: c, + id: id, + } + } + + // process outbound messages to be sent + // process sends to all links + go t.process() -// Connect connects establishes point to point tunnel -func (t *tun) Connect() error { return nil } -// Close closes the tunnel +func (t *tun) close() error { + // close all the links + for id, link := range t.links { + link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "close", + }, + }) + link.Close() + delete(t.links, id) + } + + // close the listener + return t.listener.Close() +} + +// Close the tunnel func (t *tun) Close() error { - return nil -} + t.Lock() + defer t.Unlock() + + if !t.connected { + return nil + } -// Status returns tunnel status -func (t *tun) Status() Status { select { case <-t.closed: - return Closed + return nil default: - return Connected + // close all the sockets + for _, s := range t.sockets { + s.Close() + } + // close the connection + close(t.closed) + t.connected = false + + // send a close message + // we don't close the link + // just the tunnel + return t.close() } + + return nil } -func (t *tun) String() string { - return "micro" +// Connect the tunnel +func (t *tun) Connect() error { + t.Lock() + defer t.Unlock() + + // already connected + if t.connected { + return nil + } + + // send the connect message + if err := t.connect(); err != nil { + return err + } + + // set as connected + t.connected = true + // create new close channel + t.closed = make(chan bool) + + return nil +} + +// Dial an address +func (t *tun) Dial(addr string) (Conn, error) { + c, ok := t.newSocket(addr, t.newSession()) + if !ok { + return nil, errors.New("error dialing " + addr) + } + // set remote + c.remote = addr + // set local + c.local = "local" + + return c, nil +} + +// Accept a connection on the address +func (t *tun) Listen(addr string) (Listener, error) { + // create a new socket by hashing the address + c, ok := t.newSocket(addr, "listener") + if !ok { + return nil, errors.New("already listening on " + addr) + } + + // set remote. it will be replaced by the first message received + c.remote = "remote" + // set local + c.local = addr + + tl := &tunListener{ + addr: addr, + // the accept channel + accept: make(chan *socket, 128), + // the channel to close + closed: make(chan bool), + // the connection + conn: c, + // the listener socket + socket: c, + } + + // this kicks off the internal message processor + // for the listener so it can create pseudo sockets + // per session if they do not exist or pass messages + // to the existign sessions + go tl.process() + + // return the listener + return tl, nil } diff --git a/tunnel/listener.go b/tunnel/listener.go new file mode 100644 index 00000000..6c803eb9 --- /dev/null +++ b/tunnel/listener.go @@ -0,0 +1,101 @@ +package tunnel + +import ( + "io" +) + +type tunListener struct { + // address of the listener + addr string + // the accept channel + accept chan *socket + // the channel to close + closed chan bool + // the connection + conn Conn + // the listener socket + socket *socket +} + +func (t *tunListener) process() { + // our connection map for session + conns := make(map[string]*socket) + + for { + select { + case <-t.closed: + return + // receive a new message + case m := <-t.socket.recv: + // get a socket + sock, ok := conns[m.session] + if !ok { + // create a new socket session + sock = &socket{ + // our tunnel id + id: m.id, + // the session id + session: m.session, + // close chan + closed: make(chan bool), + // recv called by the acceptor + recv: make(chan *message, 128), + // use the internal send buffer + send: t.socket.send, + // wait + wait: make(chan bool), + } + + // first message + sock.recv <- m + + // save the socket + conns[m.session] = sock + + // send to accept chan + select { + case <-t.closed: + return + case t.accept <- sock: + } + } + + // send this to the accept chan + select { + case <-sock.closed: + delete(conns, m.session) + case sock.recv <- m: + } + } + } +} + +func (t *tunListener) Addr() string { + return t.addr +} + +func (t *tunListener) Close() error { + select { + case <-t.closed: + return nil + default: + close(t.closed) + } + return nil +} + +// Everytime accept is called we essentially block till we get a new connection +func (t *tunListener) Accept() (Conn, error) { + select { + // if the socket is closed return + case <-t.closed: + return nil, io.EOF + // wait for a new connection + case c, ok := <-t.accept: + if !ok { + return nil, io.EOF + } + return c, nil + } + return nil, nil +} diff --git a/tunnel/options.go b/tunnel/options.go index 86db722e..f152c8ac 100644 --- a/tunnel/options.go +++ b/tunnel/options.go @@ -3,6 +3,7 @@ package tunnel import ( "github.com/google/uuid" "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/transport/quic" ) var ( @@ -39,7 +40,7 @@ func Address(a string) Option { } // Nodes specify remote network nodes -func Nodes(n []string) Option { +func Nodes(n ...string) Option { return func(o *Options) { o.Nodes = n } @@ -57,7 +58,6 @@ func DefaultOptions() Options { return Options{ Id: uuid.New().String(), Address: DefaultAddress, - Nodes: make([]string, 0), - Transport: transport.DefaultTransport, + Transport: quic.NewTransport(), } } diff --git a/tunnel/socket.go b/tunnel/socket.go index ad98287f..b1c55797 100644 --- a/tunnel/socket.go +++ b/tunnel/socket.go @@ -1,25 +1,90 @@ package tunnel -import "github.com/micro/go-micro/transport" +import ( + "errors" -type tunSocket struct{} + "github.com/micro/go-micro/transport" +) -func (s *tunSocket) Recv(m *transport.Message) error { +// socket is our pseudo socket for transport.Socket +type socket struct { + // socket id based on Micro-Tunnel + id string + // the session id based on Micro.Tunnel-Session + session string + // closed + closed chan bool + // remote addr + remote string + // local addr + local string + // send chan + send chan *message + // recv chan + recv chan *message + // wait until we have a connection + wait chan bool +} + +// message is sent over the send channel +type message struct { + // tunnel id + id string + // the session id + session string + // transport data + data *transport.Message +} + +func (s *socket) Remote() string { + return s.remote +} + +func (s *socket) Local() string { + return s.local +} + +func (s *socket) Id() string { + return s.id +} + +func (s *socket) Session() string { + return s.session +} + +func (s *socket) Send(m *transport.Message) error { + select { + case <-s.closed: + return errors.New("socket is closed") + default: + // no op + } + // append to backlog + s.send <- &message{id: s.id, session: s.session, data: m} return nil } -func (s *tunSocket) Send(m *transport.Message) error { +func (s *socket) Recv(m *transport.Message) error { + select { + case <-s.closed: + return errors.New("socket is closed") + default: + // no op + } + // recv from backlog + msg := <-s.recv + // set message + *m = *msg.data + // return nil return nil } -func (s *tunSocket) Close() error { +func (s *socket) Close() error { + select { + case <-s.closed: + // no op + default: + close(s.closed) + } return nil } - -func (s *tunSocket) Local() string { - return "" -} - -func (s *tunSocket) Remote() string { - return "" -} diff --git a/tunnel/transport.go b/tunnel/transport.go deleted file mode 100644 index fc03398a..00000000 --- a/tunnel/transport.go +++ /dev/null @@ -1,51 +0,0 @@ -package tunnel - -import "github.com/micro/go-micro/transport" - -type tunTransport struct { - options transport.Options -} - -type tunClient struct { - *tunSocket - options transport.DialOptions -} - -type tunListener struct { - conn chan *tunSocket -} - -func newTransport(opts ...transport.Option) transport.Transport { - var options transport.Options - - for _, o := range opts { - o(&options) - } - - return &tunTransport{ - options: options, - } -} - -func (t *tunTransport) Init(opts ...transport.Option) error { - for _, o := range opts { - o(&t.options) - } - return nil -} - -func (t *tunTransport) Options() transport.Options { - return t.options -} - -func (t *tunTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { - return nil, nil -} - -func (t *tunTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { - return nil, nil -} - -func (t *tunTransport) String() string { - return "micro" -} diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 55bcd809..bcb54fd1 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -1,39 +1,43 @@ -// Package tunnel provides micro network tunnelling +// Package tunnel provides gre network tunnelling package tunnel import ( "github.com/micro/go-micro/transport" ) -// Status is tunnel status -type Status int - -const ( - // Connected means the tunnel is alive - Connected Status = iota - // Closed meands the tunnel has been disconnected - Closed -) - -// Tunnel creates a p2p network tunnel. +// Tunnel creates a gre network tunnel on top of a link. +// It establishes multiple streams using the Micro-Tunnel-Id header +// and Micro-Tunnel-Session header. The tunnel id is a hash of +// the address being requested. type Tunnel interface { - // Id returns tunnel id - Id() string - // Options returns the tunnel options - Options() Options - // Address returns tunnel address - Address() string - // Transport to use by tunne clients - Transport() transport.Transport // Connect connects the tunnel Connect() error // Close closes the tunnel Close() error - // Status returns tunnel status - Status() Status + // Dial an endpoint + Dial(addr string) (Conn, error) + // Accept connections + Listen(addr string) (Listener, error) } -// NewTunnel creates a new tunnel on top of a link +// The listener provides similar constructs to the transport.Listener +type Listener interface { + Addr() string + Close() error + Accept() (Conn, error) +} + +// Conn is a connection dialed or accepted which includes the tunnel id and session +type Conn interface { + // Specifies the tunnel id + Id() string + // The session + Session() string + // a transport socket + transport.Socket +} + +// NewTunnel creates a new tunnel func NewTunnel(opts ...Option) Tunnel { return newTunnel(opts...) } diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go new file mode 100644 index 00000000..5dd527c5 --- /dev/null +++ b/tunnel/tunnel_test.go @@ -0,0 +1,73 @@ +package tunnel + +import ( + "testing" + + "github.com/micro/go-micro/transport" +) + +// testAccept will accept connections on the transport, create a new link and tunnel on top +func testAccept(t *testing.T, tun Tunnel, wait chan bool) { + // listen on some virtual address + tl, err := tun.Listen("test-tunnel") + if err != nil { + t.Fatal(err) + } + + // accept a connection + c, err := tl.Accept() + if err != nil { + t.Fatal(err) + } + + // get a message + for { + m := new(transport.Message) + if err := c.Recv(m); err != nil { + t.Fatal(err) + } + close(wait) + return + } +} + +// testSend will create a new link to an address and then a tunnel on top +func testSend(t *testing.T, tun Tunnel) { + // dial a new session + c, err := tun.Dial("test-tunnel") + if err != nil { + t.Fatal(err) + } + //defer c.Close() + + m := transport.Message{ + Header: map[string]string{ + "test": "header", + }, + } + + if err := c.Send(&m); err != nil { + t.Fatal(err) + } +} + +func TestTunnel(t *testing.T) { + // create a new listener + tun := NewTunnel(Nodes(":9096")) + err := tun.Connect() + if err != nil { + t.Fatal(err) + } + //defer tun.Close() + + wait := make(chan bool) + + // start accepting connections + go testAccept(t, tun, wait) + + // send a message + testSend(t, tun) + + // wait until message is received + <-wait +}