1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Remove dead link code

This commit is contained in:
Asim Aslam 2019-09-11 11:57:41 -07:00
parent 9ca7d90f11
commit 6819386e05
4 changed files with 27 additions and 119 deletions

View File

@ -932,6 +932,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// non multicast so we need to find the link // non multicast so we need to find the link
t.RLock() t.RLock()
for _, link := range t.links { for _, link := range t.links {
// use the link specified it its available
if id := options.Link; len(id) > 0 && link.id != id {
continue
}
link.RLock() link.RLock()
_, ok := link.channels[channel] _, ok := link.channels[channel]
link.RUnlock() link.RUnlock()
@ -944,6 +949,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
t.RUnlock() t.RUnlock()
// link not found
if len(links) == 0 && len(options.Link) > 0 {
return nil, ErrLinkNotFound
}
// discovered so set the link if not multicast // discovered so set the link if not multicast
// TODO: pick the link efficiently based // TODO: pick the link efficiently based
// on link status and saturation. // on link status and saturation.

View File

@ -1,7 +1,6 @@
package tunnel package tunnel
import ( import (
"io"
"sync" "sync"
"time" "time"
@ -32,98 +31,19 @@ type link struct {
lastKeepAlive time.Time lastKeepAlive time.Time
// channels keeps a mapping of channels and last seen // channels keeps a mapping of channels and last seen
channels map[string]time.Time channels map[string]time.Time
// the send queue to the socket
sendQueue chan *transport.Message
// the recv queue to the socket
recvQueue chan *transport.Message
// determines the cost of the link
// based on queue length and roundtrip
length int
weight int
} }
func newLink(s transport.Socket) *link { func newLink(s transport.Socket) *link {
l := &link{ l := &link{
Socket: s, Socket: s,
id: uuid.New().String(), id: uuid.New().String(),
channels: make(map[string]time.Time), channels: make(map[string]time.Time),
closed: make(chan bool), closed: make(chan bool),
sendQueue: make(chan *transport.Message, 128),
recvQueue: make(chan *transport.Message, 128),
} }
go l.expiry() go l.expiry()
go l.process()
return l return l
} }
// process processes messages on the send and receive queues.
func (l *link) process() {
go func() {
for {
m := new(transport.Message)
if err := l.Socket.Recv(m); err != nil {
return
}
select {
case l.recvQueue <- m:
case <-l.closed:
return
}
}
}()
// messages sent
i := 0
length := 0
for {
select {
case m := <-l.sendQueue:
t := time.Now()
// send the message
if err := l.Socket.Send(m); err != nil {
return
}
// get header size, body size and time taken
hl := len(m.Header)
bl := len(m.Body)
d := time.Since(t)
// don't calculate on empty messages
if hl == 0 && bl == 0 {
continue
}
// increment sent
i++
// time take to send some bits and bytes
td := float64(hl+bl) / float64(d.Nanoseconds())
// increase the scale
td += 1
// judge the length
length = int(td) / (length + int(td))
// every 10 messages update length
if (i % 10) == 1 {
// cost average the length
// save it
l.Lock()
l.length = length
l.Unlock()
}
case <-l.closed:
return
}
}
}
// watches the channel expiry // watches the channel expiry
func (l *link) expiry() { func (l *link) expiry() {
t := time.NewTicker(time.Minute) t := time.NewTicker(time.Minute)
@ -180,41 +100,6 @@ func (l *link) Close() error {
return nil return nil
} }
// length/rate of the link
func (l *link) Length() int {
l.RLock()
defer l.RUnlock()
return l.length
}
// weight checks the size of the queues
func (l *link) Weight() int {
return len(l.sendQueue) + len(l.recvQueue)
}
// Accept accepts a message on the socket
func (l *link) Recv(m *transport.Message) error {
select {
case <-l.closed:
return io.EOF
case rm := <-l.recvQueue:
*m = *rm
return nil
}
// never reach
return nil
}
// Send sends a message on the socket immediately
func (l *link) Send(m *transport.Message) error {
select {
case <-l.closed:
return io.EOF
case l.sendQueue <- m:
}
return nil
}
func (l *link) Status() string { func (l *link) Status() string {
select { select {
case <-l.closed: case <-l.closed:

View File

@ -34,6 +34,8 @@ type Options struct {
type DialOption func(*DialOptions) type DialOption func(*DialOptions)
type DialOptions struct { type DialOptions struct {
// Link specifies the link to use
Link string
// specify a multicast connection // specify a multicast connection
Multicast bool Multicast bool
// the dial timeout // the dial timeout
@ -94,8 +96,17 @@ func DialMulticast() DialOption {
} }
} }
// DialTimeout sets the dial timeout of the connection
func DialTimeout(t time.Duration) DialOption { func DialTimeout(t time.Duration) DialOption {
return func(o *DialOptions) { return func(o *DialOptions) {
o.Timeout = t o.Timeout = t
} }
} }
// DialLink specifies the link to pin this connection to.
// This is not applicable if the multicast option is set.
func DialLink(id string) DialOption {
return func(o *DialOptions) {
o.Link = id
}
}

View File

@ -15,6 +15,8 @@ var (
ErrDialTimeout = errors.New("dial timeout") ErrDialTimeout = errors.New("dial timeout")
// ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery // ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery
ErrDiscoverChan = errors.New("failed to discover channel") ErrDiscoverChan = errors.New("failed to discover channel")
// ErrLinkNotFound is returned when a link is specified at dial time and does not exist
ErrLinkNotFound = errors.New("link not found")
) )
// Tunnel creates a gre tunnel on top of the go-micro/transport. // Tunnel creates a gre tunnel on top of the go-micro/transport.