diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index d1c3619eb..ec18ede00 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -35,6 +35,7 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" ) const ( @@ -59,6 +60,8 @@ const ( emulatorAccount = "devstoreaccount1" emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1" + memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long + memoryPoolUseMmap = false ) // Register with Fs @@ -135,6 +138,18 @@ for data integrity checking but can cause long delays for large files to start uploading.`, Default: false, Advanced: true, + }, { + Name: "memory_pool_flush_time", + Default: memoryPoolFlushTime, + Advanced: true, + Help: `How often internal memory buffer pools will be flushed. +Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. +This option controls how often unused buffers will be removed from the pool.`, + }, { + Name: "memory_pool_use_mmap", + Default: memoryPoolUseMmap, + Advanced: true, + Help: `Whether to use mmap buffers in internal memory pool.`, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -151,17 +166,19 @@ to start uploading.`, // Options defines the configuration for this backend type Options struct { - Account string `config:"account"` - Key string `config:"key"` - Endpoint string `config:"endpoint"` - SASURL string `config:"sas_url"` - UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` - ChunkSize fs.SizeSuffix `config:"chunk_size"` - ListChunkSize uint `config:"list_chunk"` - AccessTier string `config:"access_tier"` - UseEmulator bool `config:"use_emulator"` - DisableCheckSum bool `config:"disable_checksum"` - Enc encoder.MultiEncoder `config:"encoding"` + Account string `config:"account"` + Key string `config:"key"` + Endpoint string `config:"endpoint"` + SASURL string `config:"sas_url"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + ListChunkSize uint `config:"list_chunk"` + AccessTier string `config:"access_tier"` + UseEmulator bool `config:"use_emulator"` + DisableCheckSum bool `config:"disable_checksum"` + MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` + MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` + Enc encoder.MultiEncoder `config:"encoding"` } // Fs represents a remote azure server @@ -180,6 +197,7 @@ type Fs struct { cache *bucket.Cache // cache for container creation status pacer *fs.Pacer // To pace and retry the API calls uploadToken *pacer.TokenDispenser // control concurrency + pool *pool.Pool // memory pool } // Object describes a azure object @@ -399,6 +417,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { client: fshttp.NewClient(fs.Config), cache: bucket.NewCache(), cntURLcache: make(map[string]*azblob.ContainerURL, 1), + pool: pool.New( + time.Duration(opt.MemoryPoolFlushTime), + int(opt.ChunkSize), + fs.Config.Transfers, + opt.MemoryPoolUseMmap, + ), } f.setRoot(root) f.features = (&fs.Features{ @@ -995,6 +1019,19 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, return f.NewObject(ctx, remote) } +func (f *Fs) getMemoryPool(size int64) *pool.Pool { + if size == int64(f.opt.ChunkSize) { + return f.pool + } + + return pool.New( + time.Duration(f.opt.MemoryPoolFlushTime), + int(size), + fs.Config.Transfers, + f.opt.MemoryPoolUseMmap, + ) +} + // ------------------------------------------------------------ // Fs returns the parent Fs @@ -1303,6 +1340,7 @@ func (o *Object) uploadMultipart(in io.Reader, size int64, blob *azblob.BlobURL, position := int64(0) errs := make(chan error, 1) var wg sync.WaitGroup + memPool := o.fs.getMemoryPool(chunkSize) outer: for part := 0; part < int(totalParts); part++ { // Check any errors @@ -1317,23 +1355,27 @@ outer: reqSize = chunkSize } - // Make a block of memory - buf := make([]byte, reqSize) + // Get a block of memory from the pool and a token which limits concurrency + o.fs.uploadToken.Get() + buf := memPool.Get() + buf = buf[:reqSize] // Read the chunk _, err = io.ReadFull(in, buf) if err != nil { err = errors.Wrap(err, "multipart upload failed to read source") + memPool.Put(buf) // return the buf + o.fs.uploadToken.Put() // return the token break outer } // Transfer the chunk nextID() wg.Add(1) - o.fs.uploadToken.Get() go func(part int, position int64, blockID string) { defer wg.Done() defer o.fs.uploadToken.Put() + defer memPool.Put(buf) fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) // Upload the block, with MD5 for check