1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00

Update tunnel to use id+session for the key

This commit is contained in:
Asim Aslam 2019-07-10 19:01:24 +01:00
parent 0f16eb2858
commit 0a39fe39c3

View File

@ -31,7 +31,7 @@ type tun struct {
sockets map[string]*socket
}
// create new tunnel
// create new tunnel on top of a link
func newTunnel(link link.Link) *tun {
return &tun{
link: link,
@ -41,22 +41,18 @@ func newTunnel(link link.Link) *tun {
}
}
// getSocket returns a socket from the internal socket map
func (t *tun) getSocket(id string) (*socket, bool) {
// getSocket returns a socket from the internal socket map.
// It does this based on the Micro-Tunnel-Id and Micro-Tunnel-Session
func (t *tun) getSocket(id, session string) (*socket, bool) {
// get the socket
t.RLock()
s, ok := t.sockets[id]
s, ok := t.sockets[id+session]
t.RUnlock()
return s, ok
}
// newSocket creates a new socket and saves it
func (t *tun) newSocket(id string) *socket {
// new id if it doesn't exist
if len(id) == 0 {
id = uuid.New().String()
}
func (t *tun) newSocket(id, session string) (*socket, bool) {
// hash the id
h := sha256.New()
h.Write([]byte(id))
@ -65,7 +61,7 @@ func (t *tun) newSocket(id string) *socket {
// new socket
s := &socket{
id: id,
session: t.newSession(),
session: session,
closed: make(chan bool),
recv: make(chan *message, 128),
send: t.send,
@ -73,11 +69,17 @@ func (t *tun) newSocket(id string) *socket {
// save socket
t.Lock()
t.sockets[id] = s
_, ok := t.sockets[id+session]
if ok {
// socket already exists
t.Unlock()
return nil, false
}
t.sockets[id+session] = s
t.Unlock()
// return socket
return s
return s, true
}
// TODO: use tunnel id as part of the session
@ -130,10 +132,19 @@ func (t *tun) listen() {
// the session id
session := msg.Header["Micro-Tunnel-Session"]
// get the socket
s, exists := t.getSocket(id)
// try get it based on just the tunnel id
// the assumption here is that a listener
// has no session but its set a listener session
if len(session) == 0 {
session = "listener"
}
// get the socket based on the tunnel id and session
// this could be something we dialed in which case
// we have a session for it otherwise its a listener
s, exists := t.getSocket(id, session)
if !exists {
// drop it, we don't care about
// drop it, we don't care about
// messages we don't know about
continue
}
@ -168,9 +179,9 @@ func (t *tun) listen() {
// construct the internal message
imsg := &message{
id: id,
id: id,
session: session,
data: tmsg,
data: tmsg,
}
// append to recv backlog
@ -232,7 +243,10 @@ func (t *tun) Connect() error {
// Dial an address
func (t *tun) Dial(addr string) (Conn, error) {
c := t.newSocket(addr)
c, ok := t.newSocket(addr, t.newSession())
if !ok {
return nil, errors.New("error dialing " + addr)
}
// set remote
c.remote = addr
// set local
@ -244,19 +258,16 @@ func (t *tun) Dial(addr string) (Conn, error) {
// Accept a connection on the address
func (t *tun) Listen(addr string) (Listener, error) {
// create a new socket by hashing the address
c := t.newSocket(addr)
c, ok := t.newSocket(addr, "listener")
if !ok {
return nil, errors.New("already listening on " + addr)
}
// set remote. it will be replaced by the first message received
c.remote = t.link.Remote()
// set local
c.local = addr
select {
case <-c.closed:
return nil, errors.New("error creating socket")
// wait for the first message
case <-c.wait:
}
tl := &tunListener{
addr: addr,
// the accept channel