diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index d6753eb14..8eb9a1ad7 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -92,7 +92,7 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer rs = rc } else { // Read the chunk into buffered reader - rw := multipart.NewRW() + rw := multipart.NewRW().Reserve(size) defer fs.CheckClose(rw, &err) _, err = io.CopyN(rw, rc, size) if err != nil { diff --git a/lib/multipart/multipart.go b/lib/multipart/multipart.go index f3b959ce5..b8d0a6a84 100644 --- a/lib/multipart/multipart.go +++ b/lib/multipart/multipart.go @@ -73,7 +73,7 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U for partNum := int64(0); !finished; partNum++ { // Get a block of memory from the pool and token which limits concurrency. tokens.Get() - rw := NewRW() + rw := NewRW().Reserve(chunkSize) if acc != nil { rw.SetAccounting(acc.AccountRead) } diff --git a/lib/pool/pool.go b/lib/pool/pool.go index 6a94bb814..5491f992d 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -192,27 +192,62 @@ func (bp *Pool) release(mem int64) { totalMemory.Release(mem) } +// Reserve buffers for use. Blocks until they are free. +// +// Doesn't allocate any memory. +// +// Must be released by calling GetReserved() which releases 1 buffer or +// Release() to release any number of buffers. +func (bp *Pool) Reserve(buffers int) { + waitTime := time.Millisecond + for { + err := bp.acquire(int64(buffers) * int64(bp.bufferSize)) + if err == nil { + break + } + fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err) + time.Sleep(waitTime) + waitTime *= 2 + } +} + +// Release previously Reserved buffers. +// +// Doesn't free any memory. +func (bp *Pool) Release(buffers int) { + bp.release(int64(buffers) * int64(bp.bufferSize)) +} + // Get a buffer from the pool or allocate one -func (bp *Pool) Get() []byte { +func (bp *Pool) getBlock(reserved bool) []byte { bp.mu.Lock() var buf []byte waitTime := time.Millisecond for { if len(bp.cache) > 0 { buf = bp.get() + if reserved { + // If got reserved memory from the cache we + // can release the reservation of one buffer. + bp.release(int64(bp.bufferSize)) + } break } else { var err error - bp.mu.Unlock() - err = bp.acquire(int64(bp.bufferSize)) - bp.mu.Lock() + if !reserved { + bp.mu.Unlock() + err = bp.acquire(int64(bp.bufferSize)) + bp.mu.Lock() + } if err == nil { buf, err = bp.alloc(bp.bufferSize) if err == nil { bp.alloced++ break } - bp.release(int64(bp.bufferSize)) + if !reserved { + bp.release(int64(bp.bufferSize)) + } } fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err) bp.mu.Unlock() @@ -227,6 +262,16 @@ func (bp *Pool) Get() []byte { return buf } +// Get a buffer from the pool or allocate one +func (bp *Pool) Get() []byte { + return bp.getBlock(false) +} + +// GetReserved gets a reserved buffer from the pool or allocates one. +func (bp *Pool) GetReserved() []byte { + return bp.getBlock(true) +} + // freeBuffer returns mem to the os if required - call with lock held func (bp *Pool) freeBuffer(mem []byte) { err := bp.free(mem) diff --git a/lib/pool/reader_writer.go b/lib/pool/reader_writer.go index f7c263bd1..ac266b37c 100644 --- a/lib/pool/reader_writer.go +++ b/lib/pool/reader_writer.go @@ -35,6 +35,8 @@ type RW struct { // Read side Variables out int // offset we are reading from reads int // count how many times the data has been read + + reserved int // number of buffers reserved } var ( @@ -59,6 +61,20 @@ func NewRW(pool *Pool) *RW { return rw } +// Reserve bytes of memory. +// +// Reserve, but don't allocate n bytes of memory. +// +// This is rounded up to the nearest buffer page size. +func (rw *RW) Reserve(n int64) *RW { + rw.mu.Lock() + defer rw.mu.Unlock() + buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize)) + rw.pool.Reserve(buffers) + rw.reserved += buffers + return rw +} + // SetAccounting should be provided with a function which will be // called after every read from the RW. // @@ -200,7 +216,12 @@ func (rw *RW) writePage() (page []byte) { if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize { return rw.pages[len(rw.pages)-1][rw.lastOffset:] } - page = rw.pool.Get() + if rw.reserved > 0 { + page = rw.pool.GetReserved() + rw.reserved-- + } else { + page = rw.pool.Get() + } rw.pages = append(rw.pages, page) rw.lastOffset = 0 return page @@ -321,6 +342,10 @@ func (rw *RW) Close() error { rw.pool.Put(page) } rw.pages = nil + if rw.reserved > 0 { + rw.pool.Release(rw.reserved) + rw.reserved = 0 + } return nil }