1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Send and receive on loopback tunnel interface

This commit is contained in:
Milos Gajdos 2019-08-14 13:00:10 +01:00
parent e607485c6b
commit 151bcf0ea1
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
2 changed files with 47 additions and 50 deletions

View File

@ -131,6 +131,9 @@ func (t *tun) process() {
// set the session id // set the session id
newMsg.Header["Micro-Tunnel-Session"] = msg.session newMsg.Header["Micro-Tunnel-Session"] = msg.session
// set the tunnel token
newMsg.Header["Micro-Tunnel-Token"] = t.token
// send the message via the interface // send the message via the interface
t.RLock() t.RLock()
if len(t.links) == 0 { if len(t.links) == 0 {
@ -150,6 +153,9 @@ func (t *tun) process() {
// process incoming messages // process incoming messages
func (t *tun) listen(link transport.Socket) { func (t *tun) listen(link transport.Socket) {
// loopback flag
var loopback bool
for { for {
// process anything via the net interface // process anything via the net interface
msg := new(transport.Message) msg := new(transport.Message)
@ -159,26 +165,19 @@ func (t *tun) listen(link transport.Socket) {
return return
} }
// loopback flag switch msg.Header["Micro-Tunnel"] {
var loopback bool case "connect":
// check the Micro-Tunnel-Token
token, ok := msg.Header["Micro-Tunnel-Token"]
if !ok {
continue
}
// TODO: figure out the way how to populate Micro-Tunnel-Token for every message
// check the Micro-Tunnel-Token
token, ok := msg.Header["Micro-Tunnel-Token"]
if ok {
// are we connecting to ourselves? // are we connecting to ourselves?
if token == t.token { if token == t.token {
loopback = true loopback = true
} }
} continue
switch msg.Header["Micro-Tunnel"] {
case "connect":
// connecting without token is not allowed
if token == "" {
continue
}
case "close": case "close":
// TODO: handle the close message // TODO: handle the close message
// maybe report io.EOF or kill the link // maybe report io.EOF or kill the link
@ -193,8 +192,6 @@ func (t *tun) listen(link transport.Socket) {
session := msg.Header["Micro-Tunnel-Session"] session := msg.Header["Micro-Tunnel-Session"]
delete(msg.Header, "Micro-Tunnel-Session") delete(msg.Header, "Micro-Tunnel-Session")
// TODO: should we delete Micro-Tunnel-Token header, too?
// if the session id is blank there's nothing we can do // if the session id is blank there's nothing we can do
// TODO: check this is the case, is there any reason // TODO: check this is the case, is there any reason
// why we'd have a blank session? Is the tunnel // why we'd have a blank session? Is the tunnel
@ -366,7 +363,8 @@ func (t *tun) close() error {
for id, link := range t.links { for id, link := range t.links {
link.Send(&transport.Message{ link.Send(&transport.Message{
Header: map[string]string{ Header: map[string]string{
"Micro-Tunnel": "close", "Micro-Tunnel": "close",
"Micro-Tunnel-Token": t.token,
}, },
}) })
link.Close() link.Close()
@ -444,7 +442,6 @@ func (t *tun) Dial(addr string) (Conn, error) {
if !ok { if !ok {
return nil, errors.New("error dialing " + addr) return nil, errors.New("error dialing " + addr)
} }
// set remote // set remote
c.remote = addr c.remote = addr
// set local // set local

View File

@ -99,34 +99,34 @@ func TestTunnel(t *testing.T) {
wg.Wait() wg.Wait()
} }
//func TestLoopbackTunnel(t *testing.T) { func TestLoopbackTunnel(t *testing.T) {
// // create a new tunnel client // create a new tunnel client
// tun := NewTunnel( tun := NewTunnel(
// Address("127.0.0.1:9096"), Address("127.0.0.1:9096"),
// Nodes("127.0.0.1:9096"), Nodes("127.0.0.1:9096"),
// ) )
//
// // start tunB // start tunB
// err := tun.Connect() err := tun.Connect()
// if err != nil { if err != nil {
// t.Fatal(err) t.Fatal(err)
// } }
// defer tun.Close() defer tun.Close()
//
// time.Sleep(time.Millisecond * 50) time.Sleep(time.Millisecond * 50)
//
// var wg sync.WaitGroup var wg sync.WaitGroup
//
// // start accepting connections // start accepting connections
// // on tunnel A // on tunnel A
// wg.Add(1) wg.Add(1)
// go testAccept(t, tun, &wg) go testAccept(t, tun, &wg)
//
// time.Sleep(time.Millisecond * 50) time.Sleep(time.Millisecond * 50)
//
// // dial and send via B // dial and send via B
// testSend(t, tun) testSend(t, tun)
//
// // wait until done // wait until done
// wg.Wait() wg.Wait()
//} }