1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00

Tunnel discover/announce/open/session/close

This commit is contained in:
Asim Aslam 2019-09-04 09:48:05 +01:00
parent eb4a709195
commit b9c437fbfe
7 changed files with 565 additions and 43 deletions

View File

@ -718,7 +718,7 @@ func (n *network) Connect() error {
)
// dial into ControlChannel to send route adverts
ctrlClient, err := n.Tunnel.Dial(ControlChannel)
ctrlClient, err := n.Tunnel.Dial(ControlChannel, tunnel.DialMulticast())
if err != nil {
return err
}
@ -732,7 +732,7 @@ func (n *network) Connect() error {
}
// dial into NetworkChannel to send network messages
netClient, err := n.Tunnel.Dial(NetworkChannel)
netClient, err := n.Tunnel.Dial(NetworkChannel, tunnel.DialMulticast())
if err != nil {
return err
}

View File

@ -87,6 +87,12 @@ func (t *tun) getSession(channel, session string) (*session, bool) {
return s, ok
}
func (t *tun) delSession(channel, session string) {
t.Lock()
delete(t.sessions, channel+session)
t.Unlock()
}
// newSession creates a new session and saves it
func (t *tun) newSession(channel, sessionId string) (*session, bool) {
// new session
@ -150,7 +156,9 @@ func (t *tun) monitor() {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
continue
}
// set the link id to the node
// TODO: hash it
link.id = node
// save the link
t.Lock()
t.links[node] = link
@ -169,11 +177,14 @@ func (t *tun) process() {
case msg := <-t.send:
newMsg := &transport.Message{
Header: make(map[string]string),
Body: msg.data.Body,
}
for k, v := range msg.data.Header {
newMsg.Header[k] = v
// set the data
if msg.data != nil {
for k, v := range msg.data.Header {
newMsg.Header[k] = v
}
newMsg.Body = msg.data.Body
}
// set message head
@ -195,7 +206,7 @@ func (t *tun) process() {
t.Lock()
if len(t.links) == 0 {
log.Debugf("No links to send to")
log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel)
}
var sent bool
@ -232,25 +243,55 @@ func (t *tun) process() {
continue
}
// check the multicast mappings
if msg.multicast {
link.RLock()
_, ok := link.channels[msg.channel]
link.RUnlock()
// channel mapping not found in link
if !ok {
continue
}
}
// send the message via the current link
log.Debugf("Sending %+v to %s", newMsg, node)
if errr := link.Send(newMsg); errr != nil {
log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, errr)
err = errors.New(errr.Error())
// kill the link
link.Close()
// delete the link
delete(t.links, node)
continue
}
// is sent
sent = true
// keep sending broadcast messages
if msg.broadcast || msg.multicast {
continue
}
// break on unicast
break
}
t.Unlock()
// set the error if not sent
var gerr error
if !sent {
gerr = err
}
// skip if its not been set
if msg.errChan == nil {
continue
}
// return error non blocking
select {
case msg.errChan <- gerr:
@ -262,14 +303,25 @@ func (t *tun) process() {
}
}
func (t *tun) delLink(id string) {
t.Lock()
defer t.Unlock()
// get the link
link, ok := t.links[id]
if !ok {
return
}
// close and delete
link.Close()
delete(t.links, id)
}
// process incoming messages
func (t *tun) listen(link *link) {
// remove the link on exit
defer func() {
log.Debugf("Tunnel deleting connection from %s", link.Remote())
t.Lock()
delete(t.links, link.Remote())
t.Unlock()
t.delLink(link.Remote())
}()
// let us know if its a loopback
@ -301,6 +353,13 @@ func (t *tun) listen(link *link) {
// the session id
sessionId := msg.Header["Micro-Tunnel-Session"]
// if its not connected throw away the link
// the first message we process needs to be connect
if !link.connected && mtype != "connect" {
log.Debugf("Tunnel link %s not connected", link.id)
return
}
switch mtype {
case "connect":
log.Debugf("Tunnel link %s received connect message", link.Remote())
@ -311,6 +370,8 @@ func (t *tun) listen(link *link) {
loopback = true
}
// set to remote node
link.id = id
// set as connected
link.connected = true
@ -322,10 +383,31 @@ func (t *tun) listen(link *link) {
// nothing more to do
continue
case "close":
log.Debugf("Tunnel link %s closing connection", link.Remote())
// TODO: handle the close message
// maybe report io.EOF or kill the link
return
// close the link entirely
if len(channel) == 0 {
log.Debugf("Tunnel link %s received close message", link.Remote())
return
}
// the entire listener was closed so remove it from the mapping
if sessionId == "listener" {
link.Lock()
delete(link.channels, channel)
link.Unlock()
continue
}
// try get the dialing socket
s, exists := t.getSession(channel, sessionId)
if exists {
// close and continue
s.Close()
continue
}
// otherwise its a session mapping of sorts
case "keepalive":
log.Debugf("Tunnel link %s received keepalive", link.Remote())
t.Lock()
@ -333,20 +415,64 @@ func (t *tun) listen(link *link) {
link.lastKeepAlive = time.Now()
t.Unlock()
continue
// a new connection dialled outbound
case "open":
// we just let it pass through to be processed
// an accept returned by the listener
case "accept":
// a continued session
case "session":
// process message
log.Debugf("Received %+v from %s", msg, link.Remote())
// an announcement of a channel listener
case "announce":
// update mapping in the link
link.Lock()
link.channels[channel] = time.Now()
link.Unlock()
// get the session that asked for the discovery
s, exists := t.getSession(channel, sessionId)
if exists {
// don't bother it's already discovered
if s.discovered {
continue
}
// send the announce back to the caller
s.recv <- &message{
typ: "announce",
tunnel: id,
channel: channel,
session: sessionId,
link: link.id,
}
}
continue
case "discover":
// looking for existing mapping
_, exists := t.getSession(channel, "listener")
if exists {
log.Debugf("Tunnel sending announce for discovery of channel %s", channel)
// send back the announcement
link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "announce",
"Micro-Tunnel-Id": t.id,
"Micro-Tunnel-Channel": channel,
"Micro-Tunnel-Session": sessionId,
"Micro-Tunnel-Link": link.id,
"Micro-Tunnel-Token": t.token,
},
})
}
continue
default:
// blackhole it
continue
}
// if its not connected throw away the link
if !link.connected {
log.Debugf("Tunnel link %s not connected", link.id)
return
}
// strip tunnel message header
for k, _ := range msg.Header {
if strings.HasPrefix(k, "Micro-Tunnel") {
@ -368,8 +494,15 @@ func (t *tun) listen(link *link) {
// If its a loopback connection then we've enabled link direction
// listening side is used for listening, the dialling side for dialling
switch {
case loopback:
case loopback, mtype == "open":
s, exists = t.getSession(channel, "listener")
// only return accept to the session
case mtype == "accept":
log.Debugf("Received accept message for %s %s", channel, sessionId)
s, exists = t.getSession(channel, sessionId)
if exists && s.accepted {
continue
}
default:
// get the session based on the tunnel id and session
// this could be something we dialed in which case
@ -383,7 +516,7 @@ func (t *tun) listen(link *link) {
}
}
// bail if no session has been found
// bail if no session or listener has been found
if !exists {
log.Debugf("Tunnel skipping no session exists")
// drop it, we don't care about
@ -391,8 +524,6 @@ func (t *tun) listen(link *link) {
continue
}
log.Debugf("Tunnel using session %s %s", s.channel, s.session)
// is the session closed?
select {
case <-s.closed:
@ -403,6 +534,8 @@ func (t *tun) listen(link *link) {
// process
}
log.Debugf("Tunnel using channel %s session %s", s.channel, s.session)
// is the session new?
select {
// if its new the session is actually blocked waiting
@ -462,9 +595,7 @@ func (t *tun) keepalive(link *link) {
},
}); err != nil {
log.Debugf("Error sending keepalive to link %v: %v", link.Remote(), err)
t.Lock()
delete(t.links, link.Remote())
t.Unlock()
t.delLink(link.Remote())
return
}
}
@ -482,6 +613,7 @@ func (t *tun) setupLink(node string) (*link, error) {
}
log.Debugf("Tunnel connected to %s", node)
// send the first connect message
if err := c.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "connect",
@ -494,9 +626,11 @@ func (t *tun) setupLink(node string) (*link, error) {
// create a new link
link := newLink(c)
link.connected = true
// set link id to remote side
link.id = c.Remote()
// we made the outbound connection
// and sent the connect message
link.connected = true
// process incoming messages
go t.listen(link)
@ -554,7 +688,7 @@ func (t *tun) connect() error {
}
// save the link
t.links[node] = link
t.links[link.Remote()] = link
}
// process outbound messages to be sent
@ -628,6 +762,8 @@ func (t *tun) Close() error {
return nil
}
log.Debug("Tunnel closing")
select {
case <-t.closed:
return nil
@ -651,7 +787,7 @@ func (t *tun) Close() error {
}
// Dial an address
func (t *tun) Dial(channel string) (Session, error) {
func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
log.Debugf("Tunnel dialing %s", channel)
c, ok := t.newSession(channel, t.newSessionId())
if !ok {
@ -664,18 +800,87 @@ func (t *tun) Dial(channel string) (Session, error) {
// outbound session
c.outbound = true
// get opts
options := DialOptions{
Timeout: DefaultDialTimeout,
}
for _, o := range opts {
o(&options)
}
// set the multicast option
c.multicast = options.Multicast
// set the dial timeout
c.timeout = options.Timeout
t.RLock()
for _, link := range t.links {
link.RLock()
_, ok := link.channels[channel]
link.RUnlock()
// we have at least one channel mapping
if ok {
c.discovered = true
break
}
}
t.RUnlock()
// shit fuck
if !c.discovered {
t.send <- &message{
typ: "discover",
tunnel: t.id,
channel: channel,
session: c.session,
broadcast: true,
outbound: true,
errChan: c.errChan,
}
select {
case err := <-c.errChan:
if err != nil {
return nil, err
}
}
// wait for announce
select {
case msg := <-c.recv:
if msg.typ != "announce" {
return nil, errors.New("failed to discover channel")
}
}
}
// try to open the session
err := c.Open()
if err != nil {
// delete the session
t.delSession(c.channel, c.session)
return nil, err
}
return c, nil
}
// Accept a connection on the address
func (t *tun) Listen(channel string) (Listener, error) {
log.Debugf("Tunnel listening on %s", channel)
// create a new session by hashing the address
c, ok := t.newSession(channel, "listener")
if !ok {
return nil, errors.New("already listening on " + channel)
}
delFunc := func() {
t.delSession(channel, "listener")
}
// set remote. it will be replaced by the first message received
c.remote = "remote"
// set local
@ -691,6 +896,8 @@ func (t *tun) Listen(channel string) (Listener, error) {
tunClosed: t.closed,
// the listener session
session: c,
// delete session
delFunc: delFunc,
}
// this kicks off the internal message processor
@ -699,6 +906,9 @@ func (t *tun) Listen(channel string) (Listener, error) {
// to the existign sessions
go tl.process()
// announces the listener channel to others
go tl.announce()
// return the listener
return tl, nil
}

View File

@ -9,9 +9,10 @@ import (
)
type link struct {
transport.Socket
sync.RWMutex
transport.Socket
// unique id of this link e.g uuid
// which we define for ourselves
id string
@ -27,11 +28,67 @@ type link struct {
// the last time we received a keepalive
// on this link from the remote side
lastKeepAlive time.Time
// channels keeps a mapping of channels and last seen
channels map[string]time.Time
// stop the link
closed chan bool
}
func newLink(s transport.Socket) *link {
return &link{
Socket: s,
id: uuid.New().String(),
l := &link{
Socket: s,
id: uuid.New().String(),
channels: make(map[string]time.Time),
closed: make(chan bool),
}
go l.run()
return l
}
func (l *link) run() {
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-l.closed:
return
case <-t.C:
// drop any channel mappings older than 2 minutes
var kill []string
killTime := time.Minute * 2
l.RLock()
for ch, t := range l.channels {
if d := time.Since(t); d > killTime {
kill = append(kill, ch)
}
}
l.RUnlock()
// if nothing to kill don't both with a wasted lock
if len(kill) == 0 {
continue
}
// kill the channels!
l.Lock()
for _, ch := range kill {
delete(l.channels, ch)
}
l.Unlock()
}
}
}
func (l *link) Close() {
l.Lock()
defer l.Unlock()
select {
case <-l.closed:
return
default:
close(l.closed)
}
}

View File

@ -2,6 +2,7 @@ package tunnel
import (
"io"
"time"
"github.com/micro/go-micro/util/log"
)
@ -17,22 +18,77 @@ type tunListener struct {
tunClosed chan bool
// the listener session
session *session
// del func to kill listener
delFunc func()
}
// periodically announce self
func (t *tunListener) announce() {
tick := time.NewTicker(time.Minute)
defer tick.Stop()
announce := func() {
msg := &message{
typ: "announce",
tunnel: t.session.tunnel,
channel: t.session.channel,
session: t.session.session,
outbound: t.session.outbound,
loopback: t.session.loopback,
multicast: t.session.multicast,
}
select {
case t.session.send <- msg:
case <-t.session.closed:
return
case <-t.closed:
return
}
}
// first announcement
announce()
for {
select {
case <-tick.C:
announce()
case <-t.closed:
return
}
}
}
func (t *tunListener) process() {
// our connection map for session
conns := make(map[string]*session)
defer func() {
// close the sessions
for _, conn := range conns {
conn.Close()
}
}()
for {
select {
case <-t.closed:
return
case <-t.tunClosed:
t.Close()
return
// receive a new message
case m := <-t.session.recv:
// get a session
sess, ok := conns[m.session]
log.Debugf("Tunnel listener received channel %s session %s exists: %t", m.channel, m.session, ok)
if !ok {
// only create new sessions on open message
if m.typ != "open" {
continue
}
// create a new session session
sess = &session{
// the id of the remote side
@ -45,6 +101,8 @@ func (t *tunListener) process() {
loopback: m.loopback,
// the link the message was received on
link: m.link,
// set multicast
multicast: m.multicast,
// close chan
closed: make(chan bool),
// recv called by the acceptor
@ -60,12 +118,39 @@ func (t *tunListener) process() {
// save the session
conns[m.session] = sess
// send to accept chan
select {
case <-t.closed:
return
// send to accept chan
case t.accept <- sess:
}
// continue
continue
}
// an existing session was found
// received a close message
switch m.typ {
case "close":
select {
case <-sess.closed:
// no op
delete(conns, m.session)
default:
// close and delete session
close(sess.closed)
delete(conns, m.session)
}
// continue
continue
case "session":
// operate on this
default:
// non operational type
continue
}
// send this to the accept chan
@ -89,6 +174,9 @@ func (t *tunListener) Close() error {
case <-t.closed:
return nil
default:
// close and delete
t.delFunc()
t.session.Close()
close(t.closed)
}
return nil
@ -102,13 +190,17 @@ func (t *tunListener) Accept() (Session, error) {
return nil, io.EOF
case <-t.tunClosed:
// close the listener when the tunnel closes
t.Close()
return nil, io.EOF
// wait for a new connection
case c, ok := <-t.accept:
// check if the accept chan is closed
if !ok {
return nil, io.EOF
}
// send back the accept
if err := c.Accept(); err != nil {
return nil, err
}
return c, nil
}
return nil, nil

View File

@ -1,6 +1,8 @@
package tunnel
import (
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/transport/quic"
@ -29,6 +31,15 @@ type Options struct {
Transport transport.Transport
}
type DialOption func(*DialOptions)
type DialOptions struct {
// specify a multicast connection
Multicast bool
// the dial timeout
Timeout time.Duration
}
// The tunnel id
func Id(id string) Option {
return func(o *Options) {
@ -73,3 +84,18 @@ func DefaultOptions() Options {
Transport: quic.NewTransport(),
}
}
// Dial options
// Dial multicast sets the multicast option to send only to those mapped
func DialMulticast() DialOption {
return func(o *DialOptions) {
o.Multicast = true
}
}
func DialTimeout(t time.Duration) DialOption {
return func(o *DialOptions) {
o.Timeout = t
}
}

View File

@ -3,6 +3,7 @@ package tunnel
import (
"errors"
"io"
"time"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/util/log"
@ -28,10 +29,20 @@ type session struct {
recv chan *message
// wait until we have a connection
wait chan bool
// if the discovery worked
discovered bool
// if the session was accepted
accepted bool
// outbound marks the session as outbound dialled connection
outbound bool
// lookback marks the session as a loopback on the inbound
loopback bool
// if the session is multicast
multicast bool
// if the session is broadcast
broadcast bool
// the timeout
timeout time.Duration
// the link on which this message was received
link string
// the error response
@ -52,6 +63,10 @@ type message struct {
outbound bool
// loopback marks the message intended for loopback
loopback bool
// whether to send as multicast
multicast bool
// broadcast sets the broadcast type
broadcast bool
// the link to send the message on
link string
// transport data
@ -76,10 +91,97 @@ func (s *session) Channel() string {
return s.channel
}
// Open will fire the open message for the session
func (s *session) Open() error {
msg := &message{
typ: "open",
tunnel: s.tunnel,
channel: s.channel,
session: s.session,
outbound: s.outbound,
loopback: s.loopback,
multicast: s.multicast,
link: s.link,
errChan: s.errChan,
}
// send open message
s.send <- msg
// wait for an error response for send
select {
case err := <-msg.errChan:
if err != nil {
return err
}
case <-s.closed:
return io.EOF
}
// we don't wait on multicast
if s.multicast {
s.accepted = true
return nil
}
// now wait for the accept
select {
case msg = <-s.recv:
if msg.typ != "accept" {
log.Debugf("Received non accept message in Open %s", msg.typ)
return errors.New("failed to connect")
}
// set to accepted
s.accepted = true
// set link
s.link = msg.link
case <-time.After(s.timeout):
return ErrDialTimeout
case <-s.closed:
return io.EOF
}
return nil
}
func (s *session) Accept() error {
msg := &message{
typ: "accept",
tunnel: s.tunnel,
channel: s.channel,
session: s.session,
outbound: s.outbound,
loopback: s.loopback,
multicast: s.multicast,
link: s.link,
errChan: s.errChan,
}
// send the accept message
select {
case <-s.closed:
return io.EOF
case s.send <- msg:
return nil
}
// wait for send response
select {
case err := <-s.errChan:
if err != nil {
return err
}
case <-s.closed:
return io.EOF
}
return nil
}
func (s *session) Send(m *transport.Message) error {
select {
case <-s.closed:
return errors.New("session is closed")
return io.EOF
default:
// no op
}
@ -96,19 +198,26 @@ func (s *session) Send(m *transport.Message) error {
// append to backlog
msg := &message{
typ: "session",
tunnel: s.tunnel,
channel: s.channel,
session: s.session,
outbound: s.outbound,
loopback: s.loopback,
data: data,
typ: "session",
tunnel: s.tunnel,
channel: s.channel,
session: s.session,
outbound: s.outbound,
loopback: s.loopback,
multicast: s.multicast,
data: data,
// specify the link on which to send this
// it will be blank for dialled sessions
link: s.link,
// error chan
errChan: s.errChan,
}
// if not multicast then set link
if !s.multicast {
msg.link = s.link
}
log.Debugf("Appending %+v to send backlog", msg)
s.send <- msg
@ -154,6 +263,25 @@ func (s *session) Close() error {
// no op
default:
close(s.closed)
// append to backlog
msg := &message{
typ: "close",
tunnel: s.tunnel,
channel: s.channel,
session: s.session,
outbound: s.outbound,
loopback: s.loopback,
multicast: s.multicast,
link: s.link,
}
// send the close message
select {
case s.send <- msg:
default:
}
}
return nil
}

View File

@ -2,6 +2,9 @@
package tunnel
import (
"errors"
"time"
"github.com/micro/go-micro/transport"
)
@ -18,7 +21,7 @@ type Tunnel interface {
// Close closes the tunnel
Close() error
// Connect to a channel
Dial(channel string) (Session, error)
Dial(channel string, opts ...DialOption) (Session, error)
// Accept connections on a channel
Listen(channel string) (Listener, error)
// Name of the tunnel implementation
@ -42,6 +45,12 @@ type Session interface {
transport.Socket
}
var (
ErrDialTimeout = errors.New("dial timeout")
DefaultDialTimeout = time.Second * 5
)
// NewTunnel creates a new tunnel
func NewTunnel(opts ...Option) Tunnel {
return newTunnel(opts...)