diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 94462b4bc..c76c21f89 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -56,6 +56,7 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/rest" "golang.org/x/sync/errgroup" @@ -841,11 +842,24 @@ In Ceph, this can be increased with the "rgw list buckets max chunk" option. // - doubled / encoding // - trailing / encoding // so that AWS keys are always valid file names - Default: (encoder.EncodeInvalidUtf8 | + Default: encoder.EncodeInvalidUtf8 | encoder.EncodeSlash | - encoder.EncodeDot), - }}, - }) + encoder.EncodeDot, + }, { + Name: "memory_pool_flush_time", + Default: memoryPoolFlushTime, + Advanced: true, + Help: `How often internal memory buffer pools will be flushed. + +Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. +This option controls how often unused buffers will be removed from the pool.`, + }, { + Name: "memory_pool_use_mmap", + Default: memoryPoolUseMmap, + Advanced: true, + Help: `Whether to use mmap buffers in internal memory pool.`, + }, + }}) } // Constants @@ -859,6 +873,9 @@ const ( defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. + + memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long + memoryPoolUseMmap = false ) // Options defines the configuration for this backend @@ -887,21 +904,25 @@ type Options struct { LeavePartsOnError bool `config:"leave_parts_on_error"` ListChunk int64 `config:"list_chunk"` Enc encoder.MultiEncoder `config:"encoding"` + MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` + MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` } // Fs represents a remote s3 server type Fs struct { - name string // the name of the remote - root string // root of the bucket - ignore all objects above this - opt Options // parsed options - features *fs.Features // optional features - c *s3.S3 // the connection to the s3 server - ses *session.Session // the s3 session - rootBucket string // bucket part of root (if any) - rootDirectory string // directory part of root (if any) - cache *bucket.Cache // cache for bucket creation status - pacer *fs.Pacer // To pace the API calls - srv *http.Client // a plain http client + name string // the name of the remote + root string // root of the bucket - ignore all objects above this + opt Options // parsed options + features *fs.Features // optional features + c *s3.S3 // the connection to the s3 server + ses *session.Session // the s3 session + rootBucket string // bucket part of root (if any) + rootDirectory string // directory part of root (if any) + cache *bucket.Cache // cache for bucket creation status + pacer *fs.Pacer // To pace the API calls + srv *http.Client // a plain http client + poolMu sync.Mutex // mutex protecting memory pools map + pools map[int64]*pool.Pool // memory pools } // Object describes a s3 object @@ -1180,6 +1201,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } + f := &Fs{ name: name, opt: *opt, @@ -1188,7 +1210,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), cache: bucket.NewCache(), srv: fshttp.NewClient(fs.Config), + pools: make(map[int64]*pool.Pool), } + f.setRoot(root) f.features = (&fs.Features{ ReadMimeType: true, @@ -1875,6 +1899,22 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(hash.MD5) } +func (f *Fs) getMemoryPool(size int64) *pool.Pool { + f.poolMu.Lock() + defer f.poolMu.Unlock() + + _, ok := f.pools[size] + if !ok { + f.pools[size] = pool.New( + time.Duration(f.opt.MemoryPoolFlushTime), + int(f.opt.ChunkSize), + f.opt.UploadConcurrency*fs.Config.Transfers, + f.opt.MemoryPoolUseMmap, + ) + } + return f.pools[size] +} + // ------------------------------------------------------------ // Fs returns the parent Fs @@ -2078,16 +2118,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si if concurrency < 1 { concurrency = 1 } - bufs := make(chan []byte, concurrency) - defer func() { - // empty the channel on exit - close(bufs) - for range bufs { - } - }() - for i := 0; i < concurrency; i++ { - bufs <- nil - } + tokens := pacer.NewTokenDispenser(concurrency) // calculate size of parts partSize := int(f.opt.ChunkSize) @@ -2108,6 +2139,8 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si } } + memPool := f.getMemoryPool(int64(partSize)) + var cout *s3.CreateMultipartUploadOutput err = f.pacer.Call(func() (bool, error) { var err error @@ -2159,11 +2192,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si ) for partNum := int64(1); !finished; partNum++ { - // Get a block of memory from the channel (which limits concurrency) - buf := <-bufs - if buf == nil { - buf = make([]byte, partSize) - } + // Get a block of memory from the pool and token which limits concurrency. + tokens.Get() + buf := memPool.Get() // Fail fast, in case an errgroup managed function returns an error // gCtx is cancelled. There is no point in uploading all the other parts. @@ -2226,8 +2257,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return false, nil }) - // return the memory - bufs <- buf[:partSize] + // return the memory and token + memPool.Put(buf[:partSize]) + tokens.Put() if err != nil { return errors.Wrap(err, "multipart upload failed to upload part")