From 117d8d9fdbf3d92b301f3e7adb4513ab2bade49b Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 22 Apr 2025 17:20:03 +0100 Subject: [PATCH] pool: fix deadlock with --max-memory and multipart transfers Because multipart transfers can need more than one buffer to complete, if transfers was set very high, it was possible for lots of multipart transfers to start, grab fewer buffers than chunk size, then deadlock because no more memory was available. This fixes the problem by introducing a reservation system which the multipart transfer uses to ensure it can reserve all the memory for one chunk before starting. --- fs/operations/multithread.go | 2 +- lib/multipart/multipart.go | 2 +- lib/pool/pool.go | 55 ++++++++++++++++++++++++++++++++---- lib/pool/reader_writer.go | 27 +++++++++++++++++- 4 files changed, 78 insertions(+), 8 deletions(-) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index d6753eb14..8eb9a1ad7 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -92,7 +92,7 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer rs = rc } else { // Read the chunk into buffered reader - rw := multipart.NewRW() + rw := multipart.NewRW().Reserve(size) defer fs.CheckClose(rw, &err) _, err = io.CopyN(rw, rc, size) if err != nil { diff --git a/lib/multipart/multipart.go b/lib/multipart/multipart.go index f3b959ce5..b8d0a6a84 100644 --- a/lib/multipart/multipart.go +++ b/lib/multipart/multipart.go @@ -73,7 +73,7 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U for partNum := int64(0); !finished; partNum++ { // Get a block of memory from the pool and token which limits concurrency. tokens.Get() - rw := NewRW() + rw := NewRW().Reserve(chunkSize) if acc != nil { rw.SetAccounting(acc.AccountRead) } diff --git a/lib/pool/pool.go b/lib/pool/pool.go index 6a94bb814..5491f992d 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -192,27 +192,62 @@ func (bp *Pool) release(mem int64) { totalMemory.Release(mem) } +// Reserve buffers for use. Blocks until they are free. +// +// Doesn't allocate any memory. +// +// Must be released by calling GetReserved() which releases 1 buffer or +// Release() to release any number of buffers. +func (bp *Pool) Reserve(buffers int) { + waitTime := time.Millisecond + for { + err := bp.acquire(int64(buffers) * int64(bp.bufferSize)) + if err == nil { + break + } + fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err) + time.Sleep(waitTime) + waitTime *= 2 + } +} + +// Release previously Reserved buffers. +// +// Doesn't free any memory. +func (bp *Pool) Release(buffers int) { + bp.release(int64(buffers) * int64(bp.bufferSize)) +} + // Get a buffer from the pool or allocate one -func (bp *Pool) Get() []byte { +func (bp *Pool) getBlock(reserved bool) []byte { bp.mu.Lock() var buf []byte waitTime := time.Millisecond for { if len(bp.cache) > 0 { buf = bp.get() + if reserved { + // If got reserved memory from the cache we + // can release the reservation of one buffer. + bp.release(int64(bp.bufferSize)) + } break } else { var err error - bp.mu.Unlock() - err = bp.acquire(int64(bp.bufferSize)) - bp.mu.Lock() + if !reserved { + bp.mu.Unlock() + err = bp.acquire(int64(bp.bufferSize)) + bp.mu.Lock() + } if err == nil { buf, err = bp.alloc(bp.bufferSize) if err == nil { bp.alloced++ break } - bp.release(int64(bp.bufferSize)) + if !reserved { + bp.release(int64(bp.bufferSize)) + } } fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err) bp.mu.Unlock() @@ -227,6 +262,16 @@ func (bp *Pool) Get() []byte { return buf } +// Get a buffer from the pool or allocate one +func (bp *Pool) Get() []byte { + return bp.getBlock(false) +} + +// GetReserved gets a reserved buffer from the pool or allocates one. +func (bp *Pool) GetReserved() []byte { + return bp.getBlock(true) +} + // freeBuffer returns mem to the os if required - call with lock held func (bp *Pool) freeBuffer(mem []byte) { err := bp.free(mem) diff --git a/lib/pool/reader_writer.go b/lib/pool/reader_writer.go index f7c263bd1..ac266b37c 100644 --- a/lib/pool/reader_writer.go +++ b/lib/pool/reader_writer.go @@ -35,6 +35,8 @@ type RW struct { // Read side Variables out int // offset we are reading from reads int // count how many times the data has been read + + reserved int // number of buffers reserved } var ( @@ -59,6 +61,20 @@ func NewRW(pool *Pool) *RW { return rw } +// Reserve bytes of memory. +// +// Reserve, but don't allocate n bytes of memory. +// +// This is rounded up to the nearest buffer page size. +func (rw *RW) Reserve(n int64) *RW { + rw.mu.Lock() + defer rw.mu.Unlock() + buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize)) + rw.pool.Reserve(buffers) + rw.reserved += buffers + return rw +} + // SetAccounting should be provided with a function which will be // called after every read from the RW. // @@ -200,7 +216,12 @@ func (rw *RW) writePage() (page []byte) { if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize { return rw.pages[len(rw.pages)-1][rw.lastOffset:] } - page = rw.pool.Get() + if rw.reserved > 0 { + page = rw.pool.GetReserved() + rw.reserved-- + } else { + page = rw.pool.Get() + } rw.pages = append(rw.pages, page) rw.lastOffset = 0 return page @@ -321,6 +342,10 @@ func (rw *RW) Close() error { rw.pool.Put(page) } rw.pages = nil + if rw.reserved > 0 { + rw.pool.Release(rw.reserved) + rw.reserved = 0 + } return nil }