1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-30 10:10:44 +02:00

Merge pull request #875 from micro/tun-measure

Measure roundtrip times on link
This commit is contained in:
Asim Aslam 2019-10-22 21:20:57 +01:00 committed by GitHub
commit f07a6ac29b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 4 deletions

View File

@ -966,7 +966,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
c.mode = options.Mode
// set the dial timeout
c.timeout = options.Timeout
// get the current time
now := time.Now()
after := func() time.Duration {
@ -980,6 +980,8 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
}
var links []string
// did we measure the rtt
var measured bool
// non multicast so we need to find the link
if id := options.Link; id != "" {
@ -1021,6 +1023,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// shit fuck
if !c.discovered {
// piggy back roundtrip
nowRTT := time.Now()
// create a new discovery message for this channel
msg := c.newMessage("discover")
msg.mode = Broadcast
@ -1075,12 +1080,30 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
return nil, err
}
// set roundtrip
d := time.Since(nowRTT)
// set the link time
t.RLock()
link, ok := t.links[c.link]
t.RUnlock()
if ok {
// set the rountrip time
link.setRTT(d)
// set measured to true
measured = true
}
// set discovered to true
c.discovered = true
}
// a unicast session so we call "open" and wait for an "accept"
// reset now in case we use it
now = time.Now()
// try to open the session
err := c.Open()
if err != nil {
@ -1089,6 +1112,18 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
return nil, err
}
// if we haven't measured the roundtrip do it now
if !measured && c.mode == Unicast {
// set the link time
t.RLock()
link, ok := t.links[c.link]
t.RUnlock()
if ok {
// set the rountrip time
link.setRTT(time.Since(now))
}
}
return c, nil
}

View File

@ -32,6 +32,12 @@ type link struct {
// channels keeps a mapping of channels and last seen
channels map[string]time.Time
// the weighted moving average roundtrip
rtt int64
// weighted moving average of bits flowing
rate float64
// keep an error count on the link
errCount int
}
@ -48,6 +54,21 @@ func newLink(s transport.Socket) *link {
return l
}
func (l *link) setRTT(d time.Duration) {
l.Lock()
defer l.Unlock()
if l.rtt <= 0 {
l.rtt = d.Nanoseconds()
return
}
// https://fishi.devtail.io/weblog/2015/04/12/measuring-bandwidth-and-round-trip-time-tcp-connection-inside-application-layer/
rtt := 0.8*float64(l.rtt) + 0.2*float64(d.Nanoseconds())
// set new rtt
l.rtt = int64(rtt)
}
// watches the channel expiry
func (l *link) expiry() {
t := time.NewTicker(time.Minute)
@ -92,12 +113,18 @@ func (l *link) Delay() int64 {
// Current transfer rate as bits per second (lower is better)
func (l *link) Rate() float64 {
return float64(10e8)
l.RLock()
defer l.RUnlock()
return l.rate
}
// Length returns the roundtrip time as nanoseconds (lower is better)
// Length returns the roundtrip time as nanoseconds (lower is better).
// Returns 0 where no measurement has been taken.
func (l *link) Length() int64 {
return time.Second.Nanoseconds()
l.RLock()
defer l.RUnlock()
return l.rtt
}
func (l *link) Id() string {
@ -119,11 +146,43 @@ func (l *link) Close() error {
}
func (l *link) Send(m *transport.Message) error {
dataSent := len(m.Body)
// set header length
for k, v := range m.Header {
dataSent += (len(k) + len(v))
}
// get time now
now := time.Now()
// send the message
err := l.Socket.Send(m)
l.Lock()
defer l.Unlock()
// calculate based on data
if dataSent > 0 {
// measure time taken
delta := time.Since(now)
// bit sent
bits := dataSent * 1024
// rate of send in bits per nanosecond
rate := float64(bits) / float64(delta.Nanoseconds())
//
if l.rate == 0 {
// rate per second
l.rate = rate * 1e9
} else {
// set new rate per second
l.rate = 0.8*l.rate + 0.2*(rate*1e9)
}
}
// if theres no error reset the counter
if err == nil {
l.errCount = 0

View File

@ -292,3 +292,53 @@ func TestReconnectTunnel(t *testing.T) {
// wait until done
wg.Wait()
}
func TestTunnelRTTRate(t *testing.T) {
// create a new tunnel client
tunA := NewTunnel(
Address("127.0.0.1:9096"),
Nodes("127.0.0.1:9097"),
)
// create a new tunnel server
tunB := NewTunnel(
Address("127.0.0.1:9097"),
)
// start tunB
err := tunB.Connect()
if err != nil {
t.Fatal(err)
}
defer tunB.Close()
// start tunA
err = tunA.Connect()
if err != nil {
t.Fatal(err)
}
defer tunA.Close()
wait := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
// start the listener
go testAccept(t, tunB, wait, &wg)
wg.Add(1)
// start the client
go testSend(t, tunA, wait, &wg)
// wait until done
wg.Wait()
for _, link := range tunA.Links() {
t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate())
}
for _, link := range tunB.Links() {
t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate())
}
}