diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 1820b925f..4e69681ef 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -32,22 +32,28 @@ type Account struct { // in http transport calls Read() after Do() returns on // CancelRequest so this race can happen when it apparently // shouldn't. - mu sync.Mutex + mu sync.Mutex // mutex protects these values in io.Reader origIn io.ReadCloser close io.Closer size int64 name string - statmu sync.Mutex // Separate mutex for stat values. - bytes int64 // Total number of bytes read - max int64 // if >=0 the max number of bytes to transfer - start time.Time // Start time of first read - lpTime time.Time // Time of last average measurement - lpBytes int // Number of bytes read since last measurement - avg float64 // Moving average of last few measurements in bytes/s closed bool // set if the file is closed exit chan struct{} // channel that will be closed when transfer is finished withBuf bool // is using a buffered in + + values accountValues +} + +// accountValues holds statistics for this Account +type accountValues struct { + mu sync.Mutex // Mutex for stat values. + bytes int64 // Total number of bytes read + max int64 // if >=0 the max number of bytes to transfer + start time.Time // Start time of first read + lpTime time.Time // Time of last average measurement + lpBytes int // Number of bytes read since last measurement + avg float64 // Moving average of last few measurements in bytes/s } const averagePeriod = 16 // period to do exponentially weighted averages over @@ -63,12 +69,14 @@ func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name str size: size, name: name, exit: make(chan struct{}), - avg: 0, - lpTime: time.Now(), - max: -1, + values: accountValues{ + avg: 0, + lpTime: time.Now(), + max: -1, + }, } if fs.Config.CutoffMode == fs.CutoffModeHard { - acc.max = int64((fs.Config.MaxTransfer)) + acc.values.max = int64((fs.Config.MaxTransfer)) } go acc.averageLoop() stats.inProgress.set(acc.name, acc) @@ -154,19 +162,19 @@ func (acc *Account) averageLoop() { for { select { case now := <-tick.C: - acc.statmu.Lock() + acc.values.mu.Lock() // Add average of last second. - elapsed := now.Sub(acc.lpTime).Seconds() - avg := float64(acc.lpBytes) / elapsed + elapsed := now.Sub(acc.values.lpTime).Seconds() + avg := float64(acc.values.lpBytes) / elapsed // Soft start the moving average if period < averagePeriod { period++ } - acc.avg = (avg + (period-1)*acc.avg) / period - acc.lpBytes = 0 - acc.lpTime = now + acc.values.avg = (avg + (period-1)*acc.values.avg) / period + acc.values.lpBytes = 0 + acc.values.lpTime = now // Unlock stats - acc.statmu.Unlock() + acc.values.mu.Unlock() case <-acc.exit: return } @@ -176,21 +184,21 @@ func (acc *Account) averageLoop() { // Check the read before it has happened is valid returning the number // of bytes remaining to read. func (acc *Account) checkReadBefore() (bytesUntilLimit int64, err error) { - acc.statmu.Lock() - if acc.max >= 0 { - bytesUntilLimit = acc.max - acc.stats.GetBytes() + acc.values.mu.Lock() + if acc.values.max >= 0 { + bytesUntilLimit = acc.values.max - acc.stats.GetBytes() if bytesUntilLimit < 0 { - acc.statmu.Unlock() + acc.values.mu.Unlock() return bytesUntilLimit, ErrorMaxTransferLimitReachedFatal } } else { bytesUntilLimit = 1 << 62 } // Set start time. - if acc.start.IsZero() { - acc.start = time.Now() + if acc.values.start.IsZero() { + acc.values.start = time.Now() } - acc.statmu.Unlock() + acc.values.mu.Unlock() return bytesUntilLimit, nil } @@ -212,20 +220,20 @@ func checkReadAfter(bytesUntilLimit int64, n int, err error) (outN int, outErr e // // This pretends a transfer has started func (acc *Account) ServerSideCopyStart() { - acc.statmu.Lock() + acc.values.mu.Lock() // Set start time. - if acc.start.IsZero() { - acc.start = time.Now() + if acc.values.start.IsZero() { + acc.values.start = time.Now() } - acc.statmu.Unlock() + acc.values.mu.Unlock() } // ServerSideCopyEnd accounts for a read of n bytes in a sever side copy func (acc *Account) ServerSideCopyEnd(n int64) { // Update Stats - acc.statmu.Lock() - acc.bytes += n - acc.statmu.Unlock() + acc.values.mu.Lock() + acc.values.bytes += n + acc.values.mu.Unlock() acc.stats.Bytes(n) } @@ -233,10 +241,10 @@ func (acc *Account) ServerSideCopyEnd(n int64) { // Account the read and limit bandwidth func (acc *Account) accountRead(n int) { // Update Stats - acc.statmu.Lock() - acc.lpBytes += n - acc.bytes += int64(n) - acc.statmu.Unlock() + acc.values.mu.Lock() + acc.values.lpBytes += n + acc.values.bytes += int64(n) + acc.values.mu.Unlock() acc.stats.Bytes(int64(n)) @@ -340,9 +348,9 @@ func (acc *Account) progress() (bytes, size int64) { if acc == nil { return 0, 0 } - acc.statmu.Lock() - bytes, size = acc.bytes, acc.size - acc.statmu.Unlock() + acc.values.mu.Lock() + bytes, size = acc.values.bytes, acc.size + acc.values.mu.Unlock() return bytes, size } @@ -353,15 +361,15 @@ func (acc *Account) speed() (bps, current float64) { if acc == nil { return 0, 0 } - acc.statmu.Lock() - defer acc.statmu.Unlock() - if acc.bytes == 0 { + acc.values.mu.Lock() + defer acc.values.mu.Unlock() + if acc.values.bytes == 0 { return 0, 0 } // Calculate speed from first read. - total := float64(time.Now().Sub(acc.start)) / float64(time.Second) - bps = float64(acc.bytes) / total - current = acc.avg + total := float64(time.Now().Sub(acc.values.start)) / float64(time.Second) + bps = float64(acc.values.bytes) / total + current = acc.values.avg return } @@ -372,9 +380,9 @@ func (acc *Account) eta() (etaDuration time.Duration, ok bool) { if acc == nil { return 0, false } - acc.statmu.Lock() - defer acc.statmu.Unlock() - return eta(acc.bytes, acc.size, acc.avg) + acc.values.mu.Lock() + defer acc.values.mu.Unlock() + return eta(acc.values.bytes, acc.size, acc.values.avg) } // shortenName shortens in to size runes long diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index d476ff88b..bd6c101ac 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -91,11 +91,11 @@ func TestAccountRead(t *testing.T) { stats := NewStats() acc := newAccountSizeName(stats, in, 1, "test") - assert.True(t, acc.start.IsZero()) - acc.statmu.Lock() - assert.Equal(t, 0, acc.lpBytes) - assert.Equal(t, int64(0), acc.bytes) - acc.statmu.Unlock() + assert.True(t, acc.values.start.IsZero()) + acc.values.mu.Lock() + assert.Equal(t, 0, acc.values.lpBytes) + assert.Equal(t, int64(0), acc.values.bytes) + acc.values.mu.Unlock() assert.Equal(t, int64(0), stats.bytes) var buf = make([]byte, 2) @@ -104,11 +104,11 @@ func TestAccountRead(t *testing.T) { assert.Equal(t, 2, n) assert.Equal(t, []byte{1, 2}, buf[:n]) - assert.False(t, acc.start.IsZero()) - acc.statmu.Lock() - assert.Equal(t, 2, acc.lpBytes) - assert.Equal(t, int64(2), acc.bytes) - acc.statmu.Unlock() + assert.False(t, acc.values.start.IsZero()) + acc.values.mu.Lock() + assert.Equal(t, 2, acc.values.lpBytes) + assert.Equal(t, int64(2), acc.values.bytes) + acc.values.mu.Unlock() assert.Equal(t, int64(2), stats.bytes) n, err = acc.Read(buf) @@ -135,11 +135,11 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) { acc = acc.WithBuffer() } - assert.True(t, acc.start.IsZero()) - acc.statmu.Lock() - assert.Equal(t, 0, acc.lpBytes) - assert.Equal(t, int64(0), acc.bytes) - acc.statmu.Unlock() + assert.True(t, acc.values.start.IsZero()) + acc.values.mu.Lock() + assert.Equal(t, 0, acc.values.lpBytes) + assert.Equal(t, int64(0), acc.values.bytes) + acc.values.mu.Unlock() assert.Equal(t, int64(0), stats.bytes) var out bytes.Buffer @@ -149,11 +149,11 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) { assert.Equal(t, int64(len(buf)), n) assert.Equal(t, buf, out.Bytes()) - assert.False(t, acc.start.IsZero()) - acc.statmu.Lock() - assert.Equal(t, len(buf), acc.lpBytes) - assert.Equal(t, int64(len(buf)), acc.bytes) - acc.statmu.Unlock() + assert.False(t, acc.values.start.IsZero()) + acc.values.mu.Lock() + assert.Equal(t, len(buf), acc.values.lpBytes) + assert.Equal(t, int64(len(buf)), acc.values.bytes) + acc.values.mu.Unlock() assert.Equal(t, int64(len(buf)), stats.bytes) assert.NoError(t, acc.Close())