mirror of
https://github.com/rclone/rclone.git
synced 2025-08-10 06:09:44 +02:00
pool: add --max-buffer-memory to limit total buffer memory usage
This commit is contained in:
@@ -3,12 +3,14 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/lib/mmap"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
// Pool of internal buffers
|
||||
@@ -33,6 +35,14 @@ type Pool struct {
|
||||
free func([]byte) error
|
||||
}
|
||||
|
||||
// totalMemory is a semaphore used to control total buffer usage of
|
||||
// all Pools. It it may be nil in which case the total buffer usage
|
||||
// will not be controlled.
|
||||
var totalMemory *semaphore.Weighted
|
||||
|
||||
// Make sure we initialise the totalMemory semaphore once
|
||||
var totalMemoryInit sync.Once
|
||||
|
||||
// New makes a buffer pool
|
||||
//
|
||||
// flushTime is the interval the buffer pools is flushed
|
||||
@@ -145,6 +155,33 @@ func (bp *Pool) updateMinFill() {
|
||||
}
|
||||
}
|
||||
|
||||
// acquire mem bytes of memory
|
||||
func (bp *Pool) acquire(mem int64) error {
|
||||
ctx := context.Background()
|
||||
|
||||
totalMemoryInit.Do(func() {
|
||||
ci := fs.GetConfig(ctx)
|
||||
|
||||
// Set max buffer memory limiter
|
||||
if ci.MaxBufferMemory > 0 {
|
||||
totalMemory = semaphore.NewWeighted(int64(ci.MaxBufferMemory))
|
||||
}
|
||||
})
|
||||
|
||||
if totalMemory == nil {
|
||||
return nil
|
||||
}
|
||||
return totalMemory.Acquire(ctx, mem)
|
||||
}
|
||||
|
||||
// release mem bytes of memory
|
||||
func (bp *Pool) release(mem int64) {
|
||||
if totalMemory == nil {
|
||||
return
|
||||
}
|
||||
totalMemory.Release(mem)
|
||||
}
|
||||
|
||||
// Get a buffer from the pool or allocate one
|
||||
func (bp *Pool) Get() []byte {
|
||||
bp.mu.Lock()
|
||||
@@ -156,10 +193,16 @@ func (bp *Pool) Get() []byte {
|
||||
break
|
||||
} else {
|
||||
var err error
|
||||
buf, err = bp.alloc(bp.bufferSize)
|
||||
bp.mu.Unlock()
|
||||
err = bp.acquire(int64(bp.bufferSize))
|
||||
bp.mu.Lock()
|
||||
if err == nil {
|
||||
bp.alloced++
|
||||
break
|
||||
buf, err = bp.alloc(bp.bufferSize)
|
||||
if err == nil {
|
||||
bp.alloced++
|
||||
break
|
||||
}
|
||||
bp.release(int64(bp.bufferSize))
|
||||
}
|
||||
fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err)
|
||||
bp.mu.Unlock()
|
||||
@@ -179,6 +222,8 @@ func (bp *Pool) freeBuffer(mem []byte) {
|
||||
err := bp.free(mem)
|
||||
if err != nil {
|
||||
fs.Logf(nil, "Failed to free memory: %v", err)
|
||||
} else {
|
||||
bp.release(int64(bp.bufferSize))
|
||||
}
|
||||
bp.alloced--
|
||||
}
|
||||
|
@@ -1,12 +1,15 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest/testy"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@@ -225,3 +228,55 @@ func TestPool(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoolMaxBufferMemory(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ci := fs.GetConfig(ctx)
|
||||
ci.MaxBufferMemory = 4 * 4096
|
||||
defer func() {
|
||||
ci.MaxBufferMemory = 0
|
||||
totalMemory = nil
|
||||
}()
|
||||
totalMemoryInit = sync.Once{} // reset the sync.Once as it likely has been used
|
||||
totalMemory = nil
|
||||
bp := New(60*time.Second, 4096, 2, true)
|
||||
|
||||
assert.Equal(t, bp.alloced, 0)
|
||||
assert.Nil(t, totalMemory)
|
||||
buf := bp.Get()
|
||||
assert.NotNil(t, totalMemory)
|
||||
bp.Put(buf)
|
||||
assert.Equal(t, bp.alloced, 1)
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
bufs int
|
||||
maxBufs int
|
||||
countBuf = func(i int) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
bufs += i
|
||||
if bufs > maxBufs {
|
||||
maxBufs = bufs
|
||||
}
|
||||
}
|
||||
)
|
||||
for i := 0; i < 20; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
buf := bp.Get()
|
||||
countBuf(1)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
bp.Put(buf)
|
||||
countBuf(-1)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
assert.Equal(t, bufs, 0)
|
||||
assert.Equal(t, maxBufs, 4)
|
||||
assert.Equal(t, bp.alloced, 2)
|
||||
}
|
||||
|
Reference in New Issue
Block a user