mirror of
				https://github.com/go-micro/go-micro.git
				synced 2025-10-30 23:27:41 +02:00 
			
		
		
		
	Fix loopback cruft
This commit is contained in:
		| @@ -53,6 +53,8 @@ type link struct { | |||||||
| 	// which we define for ourselves | 	// which we define for ourselves | ||||||
| 	id string | 	id string | ||||||
| 	// whether its a loopback connection | 	// whether its a loopback connection | ||||||
|  | 	// this flag is used by the transport listener | ||||||
|  | 	// which accepts inbound quic connections | ||||||
| 	loopback bool | 	loopback bool | ||||||
| 	// whether its actually connected | 	// whether its actually connected | ||||||
| 	// dialled side sets it to connected | 	// dialled side sets it to connected | ||||||
| @@ -183,7 +185,7 @@ func (t *tun) process() { | |||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// set message head | 			// set message head | ||||||
| 			newMsg.Header["Micro-Tunnel"] = "message" | 			newMsg.Header["Micro-Tunnel"] = msg.typ | ||||||
|  |  | ||||||
| 			// set the tunnel id on the outgoing message | 			// set the tunnel id on the outgoing message | ||||||
| 			newMsg.Header["Micro-Tunnel-Id"] = msg.id | 			newMsg.Header["Micro-Tunnel-Id"] = msg.id | ||||||
| @@ -196,9 +198,11 @@ func (t *tun) process() { | |||||||
|  |  | ||||||
| 			// send the message via the interface | 			// send the message via the interface | ||||||
| 			t.Lock() | 			t.Lock() | ||||||
|  |  | ||||||
| 			if len(t.links) == 0 { | 			if len(t.links) == 0 { | ||||||
| 				log.Debugf("No links to send to") | 				log.Debugf("No links to send to") | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			for node, link := range t.links { | 			for node, link := range t.links { | ||||||
| 				// if the link is not connected skip it | 				// if the link is not connected skip it | ||||||
| 				if !link.connected { | 				if !link.connected { | ||||||
| @@ -206,6 +210,13 @@ func (t *tun) process() { | |||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
|  | 				// if we're picking the link check the id | ||||||
|  | 				// this is where we explicitly set the link | ||||||
|  | 				// in a message received via the listen method | ||||||
|  | 				if len(msg.link) > 0 && link.id != msg.link { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  |  | ||||||
| 				// if the link was a loopback accepted connection | 				// if the link was a loopback accepted connection | ||||||
| 				// and the message is being sent outbound via | 				// and the message is being sent outbound via | ||||||
| 				// a dialled connection don't use this link | 				// a dialled connection don't use this link | ||||||
| @@ -219,6 +230,7 @@ func (t *tun) process() { | |||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
|  | 				// send the message via the current link | ||||||
| 				log.Debugf("Sending %+v to %s", newMsg, node) | 				log.Debugf("Sending %+v to %s", newMsg, node) | ||||||
| 				if err := link.Send(newMsg); err != nil { | 				if err := link.Send(newMsg); err != nil { | ||||||
| 					log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, err) | 					log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, err) | ||||||
| @@ -226,6 +238,7 @@ func (t *tun) process() { | |||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			t.Unlock() | 			t.Unlock() | ||||||
| 		case <-t.closed: | 		case <-t.closed: | ||||||
| 			return | 			return | ||||||
| @@ -283,10 +296,11 @@ func (t *tun) listen(link *link) { | |||||||
| 			log.Debugf("Tunnel link %s closing connection", link.Remote()) | 			log.Debugf("Tunnel link %s closing connection", link.Remote()) | ||||||
| 			// TODO: handle the close message | 			// TODO: handle the close message | ||||||
| 			// maybe report io.EOF or kill the link | 			// maybe report io.EOF or kill the link | ||||||
| 			continue | 			return | ||||||
| 		case "keepalive": | 		case "keepalive": | ||||||
| 			log.Debugf("Tunnel link %s received keepalive", link.Remote()) | 			log.Debugf("Tunnel link %s received keepalive", link.Remote()) | ||||||
| 			t.Lock() | 			t.Lock() | ||||||
|  | 			// save the keepalive | ||||||
| 			link.lastKeepAlive = time.Now() | 			link.lastKeepAlive = time.Now() | ||||||
| 			t.Unlock() | 			t.Unlock() | ||||||
| 			continue | 			continue | ||||||
| @@ -300,6 +314,7 @@ func (t *tun) listen(link *link) { | |||||||
|  |  | ||||||
| 		// if its not connected throw away the link | 		// if its not connected throw away the link | ||||||
| 		if !link.connected { | 		if !link.connected { | ||||||
|  | 			log.Debugf("Tunnel link %s not connected", link.id) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| @@ -388,6 +403,7 @@ func (t *tun) listen(link *link) { | |||||||
| 			id:       id, | 			id:       id, | ||||||
| 			session:  session, | 			session:  session, | ||||||
| 			data:     tmsg, | 			data:     tmsg, | ||||||
|  | 			link:     link.id, | ||||||
| 			loopback: loopback, | 			loopback: loopback, | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| @@ -449,13 +465,10 @@ func (t *tun) setupLink(node string) (*link, error) { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// create a new link | 	// create a new link | ||||||
| 	link := &link{ | 	link := newLink(c) | ||||||
| 		Socket: c, | 	link.connected = true | ||||||
| 		id:     uuid.New().String(), | 	// we made the outbound connection | ||||||
| 		// we made the outbound connection | 	// and sent the connect message | ||||||
| 		// and sent the connect message |  | ||||||
| 		connected: true, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// process incoming messages | 	// process incoming messages | ||||||
| 	go t.listen(link) | 	go t.listen(link) | ||||||
| @@ -482,10 +495,7 @@ func (t *tun) connect() error { | |||||||
| 			log.Debugf("Tunnel accepted connection from %s", sock.Remote()) | 			log.Debugf("Tunnel accepted connection from %s", sock.Remote()) | ||||||
|  |  | ||||||
| 			// create a new link | 			// create a new link | ||||||
| 			link := &link{ | 			link := newLink(sock) | ||||||
| 				Socket: sock, |  | ||||||
| 				id:     uuid.New().String(), |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// listen for inbound messages. | 			// listen for inbound messages. | ||||||
| 			// only save the link once connected. | 			// only save the link once connected. | ||||||
| @@ -493,8 +503,8 @@ func (t *tun) connect() error { | |||||||
| 			t.listen(link) | 			t.listen(link) | ||||||
| 		}) | 		}) | ||||||
|  |  | ||||||
| 		t.Lock() | 		t.RLock() | ||||||
| 		defer t.Unlock() | 		defer t.RUnlock() | ||||||
|  |  | ||||||
| 		// still connected but the tunnel died | 		// still connected but the tunnel died | ||||||
| 		if err != nil && t.connected { | 		if err != nil && t.connected { | ||||||
|   | |||||||
							
								
								
									
										13
									
								
								tunnel/link.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								tunnel/link.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,13 @@ | |||||||
|  | package tunnel | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/micro/go-micro/transport" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func newLink(s transport.Socket) *link { | ||||||
|  | 	return &link{ | ||||||
|  | 		Socket: s, | ||||||
|  | 		id:     uuid.New().String(), | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -43,6 +43,8 @@ func (t *tunListener) process() { | |||||||
| 					session: m.session, | 					session: m.session, | ||||||
| 					// is loopback conn | 					// is loopback conn | ||||||
| 					loopback: m.loopback, | 					loopback: m.loopback, | ||||||
|  | 					// the link the message was received on | ||||||
|  | 					link: m.link, | ||||||
| 					// close chan | 					// close chan | ||||||
| 					closed: make(chan bool), | 					closed: make(chan bool), | ||||||
| 					// recv called by the acceptor | 					// recv called by the acceptor | ||||||
|   | |||||||
| @@ -29,10 +29,14 @@ type socket struct { | |||||||
| 	outbound bool | 	outbound bool | ||||||
| 	// lookback marks the socket as a loopback on the inbound | 	// lookback marks the socket as a loopback on the inbound | ||||||
| 	loopback bool | 	loopback bool | ||||||
|  | 	// the link on which this message was received | ||||||
|  | 	link string | ||||||
| } | } | ||||||
|  |  | ||||||
| // message is sent over the send channel | // message is sent over the send channel | ||||||
| type message struct { | type message struct { | ||||||
|  | 	// type of message | ||||||
|  | 	typ string | ||||||
| 	// tunnel id | 	// tunnel id | ||||||
| 	id string | 	id string | ||||||
| 	// the session id | 	// the session id | ||||||
| @@ -41,6 +45,8 @@ type message struct { | |||||||
| 	outbound bool | 	outbound bool | ||||||
| 	// loopback marks the message intended for loopback | 	// loopback marks the message intended for loopback | ||||||
| 	loopback bool | 	loopback bool | ||||||
|  | 	// the link to send the message on | ||||||
|  | 	link string | ||||||
| 	// transport data | 	// transport data | ||||||
| 	data *transport.Message | 	data *transport.Message | ||||||
| } | } | ||||||
| @@ -81,11 +87,15 @@ func (s *socket) Send(m *transport.Message) error { | |||||||
|  |  | ||||||
| 	// append to backlog | 	// append to backlog | ||||||
| 	msg := &message{ | 	msg := &message{ | ||||||
|  | 		typ:      "message", | ||||||
| 		id:       s.id, | 		id:       s.id, | ||||||
| 		session:  s.session, | 		session:  s.session, | ||||||
| 		outbound: s.outbound, | 		outbound: s.outbound, | ||||||
| 		loopback: s.loopback, | 		loopback: s.loopback, | ||||||
| 		data:     data, | 		data:     data, | ||||||
|  | 		// specify the link on which to send this | ||||||
|  | 		// it will be blank for dialled sockets | ||||||
|  | 		link: s.link, | ||||||
| 	} | 	} | ||||||
| 	log.Debugf("Appending %+v to send backlog", msg) | 	log.Debugf("Appending %+v to send backlog", msg) | ||||||
| 	s.send <- msg | 	s.send <- msg | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user