mirror of
https://github.com/rclone/rclone.git
synced 2025-01-13 20:38:12 +02:00
operations: disable multi thread copy for local to local copies #3419
...unless --multi-thread-streams has been set explicitly
This commit is contained in:
parent
ec8e0a6c58
commit
eb087a3b04
@ -743,6 +743,10 @@ above.
|
|||||||
**NB** that this **only** works for a local destination but will work
|
**NB** that this **only** works for a local destination but will work
|
||||||
with any source.
|
with any source.
|
||||||
|
|
||||||
|
**NB** that multi thread copies are disabled for local to local copies
|
||||||
|
as they are faster without unless `--multi-thread-streams` is set
|
||||||
|
explicitly.
|
||||||
|
|
||||||
### --multi-thread-streams=N ###
|
### --multi-thread-streams=N ###
|
||||||
|
|
||||||
When using multi thread downloads (see above `--multi-thread-cutoff`)
|
When using multi thread downloads (see above `--multi-thread-cutoff`)
|
||||||
|
@ -100,6 +100,7 @@ type ConfigInfo struct {
|
|||||||
ClientKey string // Client Side Key
|
ClientKey string // Client Side Key
|
||||||
MultiThreadCutoff SizeSuffix
|
MultiThreadCutoff SizeSuffix
|
||||||
MultiThreadStreams int
|
MultiThreadStreams int
|
||||||
|
MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig creates a new config with everything set to the default
|
// NewConfig creates a new config with everything set to the default
|
||||||
|
@ -213,4 +213,9 @@ func SetFlags() {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
config.ConfigPath = configPath
|
config.ConfigPath = configPath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set whether multi-thread-streams was set
|
||||||
|
multiThreadStreamsFlag := pflag.Lookup("multi-thread-streams")
|
||||||
|
fs.Config.MultiThreadSet = multiThreadStreamsFlag != nil && multiThreadStreamsFlag.Changed
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,32 @@ const (
|
|||||||
multithreadBufferSize = 32 * 1024
|
multithreadBufferSize = 32 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Return a boolean as to whether we should use multi thread copy for
|
||||||
|
// this transfer
|
||||||
|
func doMultiThreadCopy(f fs.Fs, src fs.Object) bool {
|
||||||
|
// Disable multi thread if...
|
||||||
|
|
||||||
|
// ...it isn't configured
|
||||||
|
if fs.Config.MultiThreadStreams <= 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// ...size of object is less than cutoff
|
||||||
|
if src.Size() < int64(fs.Config.MultiThreadCutoff) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// ...source doesn't support it
|
||||||
|
dstFeatures := f.Features()
|
||||||
|
if dstFeatures.OpenWriterAt == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// ...if --multi-thread-streams not in use and source and
|
||||||
|
// destination are both local
|
||||||
|
if !fs.Config.MultiThreadSet && dstFeatures.IsLocal && src.Fs().Features().IsLocal {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// state for a multi-thread copy
|
// state for a multi-thread copy
|
||||||
type multiThreadCopyState struct {
|
type multiThreadCopyState struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
@ -6,6 +6,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs/accounting"
|
"github.com/rclone/rclone/fs/accounting"
|
||||||
|
"github.com/rclone/rclone/fstest/mockfs"
|
||||||
|
"github.com/rclone/rclone/fstest/mockobject"
|
||||||
"github.com/rclone/rclone/lib/random"
|
"github.com/rclone/rclone/lib/random"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
@ -14,6 +16,67 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestDoMultiThreadCopy(t *testing.T) {
|
||||||
|
f := mockfs.NewFs("potato", "")
|
||||||
|
src := mockobject.New("file.txt").WithContent([]byte(random.String(100)), mockobject.SeekModeNone)
|
||||||
|
srcFs := mockfs.NewFs("sausage", "")
|
||||||
|
src.SetFs(srcFs)
|
||||||
|
|
||||||
|
oldStreams := fs.Config.MultiThreadStreams
|
||||||
|
oldCutoff := fs.Config.MultiThreadCutoff
|
||||||
|
oldIsSet := fs.Config.MultiThreadSet
|
||||||
|
defer func() {
|
||||||
|
fs.Config.MultiThreadStreams = oldStreams
|
||||||
|
fs.Config.MultiThreadCutoff = oldCutoff
|
||||||
|
fs.Config.MultiThreadSet = oldIsSet
|
||||||
|
}()
|
||||||
|
|
||||||
|
fs.Config.MultiThreadStreams, fs.Config.MultiThreadCutoff = 4, 50
|
||||||
|
fs.Config.MultiThreadSet = false
|
||||||
|
|
||||||
|
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
||||||
|
panic("don't call me")
|
||||||
|
}
|
||||||
|
f.Features().OpenWriterAt = nullWriterAt
|
||||||
|
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
|
||||||
|
fs.Config.MultiThreadStreams = 0
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
fs.Config.MultiThreadStreams = 1
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
fs.Config.MultiThreadStreams = 2
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
|
||||||
|
fs.Config.MultiThreadCutoff = 200
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
fs.Config.MultiThreadCutoff = 101
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
fs.Config.MultiThreadCutoff = 100
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
|
||||||
|
f.Features().OpenWriterAt = nil
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
f.Features().OpenWriterAt = nullWriterAt
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
|
||||||
|
f.Features().IsLocal = true
|
||||||
|
srcFs.Features().IsLocal = true
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
fs.Config.MultiThreadSet = true
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
fs.Config.MultiThreadSet = false
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
srcFs.Features().IsLocal = false
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
srcFs.Features().IsLocal = true
|
||||||
|
assert.False(t, doMultiThreadCopy(f, src))
|
||||||
|
f.Features().IsLocal = false
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
srcFs.Features().IsLocal = false
|
||||||
|
assert.True(t, doMultiThreadCopy(f, src))
|
||||||
|
}
|
||||||
|
|
||||||
func TestMultithreadCalculateChunks(t *testing.T) {
|
func TestMultithreadCalculateChunks(t *testing.T) {
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
size int64
|
size int64
|
||||||
|
@ -310,7 +310,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||||||
}
|
}
|
||||||
// If can't server side copy, do it manually
|
// If can't server side copy, do it manually
|
||||||
if err == fs.ErrorCantCopy {
|
if err == fs.ErrorCantCopy {
|
||||||
if doOpenWriterAt := f.Features().OpenWriterAt; doOpenWriterAt != nil && src.Size() >= int64(fs.Config.MultiThreadCutoff) && fs.Config.MultiThreadStreams > 1 {
|
if doMultiThreadCopy(f, src) {
|
||||||
// Number of streams proportional to size
|
// Number of streams proportional to size
|
||||||
streams := src.Size() / int64(fs.Config.MultiThreadCutoff)
|
streams := src.Size() / int64(fs.Config.MultiThreadCutoff)
|
||||||
// With maximum
|
// With maximum
|
||||||
|
Loading…
Reference in New Issue
Block a user