diff --git a/docs/content/docs.md b/docs/content/docs.md index b855809ab..0887b0530 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -743,6 +743,10 @@ above. **NB** that this **only** works for a local destination but will work with any source. +**NB** that multi thread copies are disabled for local to local copies +as they are faster without unless `--multi-thread-streams` is set +explicitly. + ### --multi-thread-streams=N ### When using multi thread downloads (see above `--multi-thread-cutoff`) diff --git a/fs/config.go b/fs/config.go index b08dcc2c9..fdd9b7e45 100644 --- a/fs/config.go +++ b/fs/config.go @@ -100,6 +100,7 @@ type ConfigInfo struct { ClientKey string // Client Side Key MultiThreadCutoff SizeSuffix MultiThreadStreams int + MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags) } // NewConfig creates a new config with everything set to the default diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index e1670a892..58c70b4d5 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -213,4 +213,9 @@ func SetFlags() { if err == nil { config.ConfigPath = configPath } + + // Set whether multi-thread-streams was set + multiThreadStreamsFlag := pflag.Lookup("multi-thread-streams") + fs.Config.MultiThreadSet = multiThreadStreamsFlag != nil && multiThreadStreamsFlag.Changed + } diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index bac7435eb..1e8f5ec82 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -16,6 +16,32 @@ const ( multithreadBufferSize = 32 * 1024 ) +// Return a boolean as to whether we should use multi thread copy for +// this transfer +func doMultiThreadCopy(f fs.Fs, src fs.Object) bool { + // Disable multi thread if... + + // ...it isn't configured + if fs.Config.MultiThreadStreams <= 1 { + return false + } + // ...size of object is less than cutoff + if src.Size() < int64(fs.Config.MultiThreadCutoff) { + return false + } + // ...source doesn't support it + dstFeatures := f.Features() + if dstFeatures.OpenWriterAt == nil { + return false + } + // ...if --multi-thread-streams not in use and source and + // destination are both local + if !fs.Config.MultiThreadSet && dstFeatures.IsLocal && src.Fs().Features().IsLocal { + return false + } + return true +} + // state for a multi-thread copy type multiThreadCopyState struct { ctx context.Context diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index 727b1b7f5..a54b19f5e 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -6,6 +6,8 @@ import ( "testing" "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fstest/mockfs" + "github.com/rclone/rclone/fstest/mockobject" "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/fs" @@ -14,6 +16,67 @@ import ( "github.com/stretchr/testify/require" ) +func TestDoMultiThreadCopy(t *testing.T) { + f := mockfs.NewFs("potato", "") + src := mockobject.New("file.txt").WithContent([]byte(random.String(100)), mockobject.SeekModeNone) + srcFs := mockfs.NewFs("sausage", "") + src.SetFs(srcFs) + + oldStreams := fs.Config.MultiThreadStreams + oldCutoff := fs.Config.MultiThreadCutoff + oldIsSet := fs.Config.MultiThreadSet + defer func() { + fs.Config.MultiThreadStreams = oldStreams + fs.Config.MultiThreadCutoff = oldCutoff + fs.Config.MultiThreadSet = oldIsSet + }() + + fs.Config.MultiThreadStreams, fs.Config.MultiThreadCutoff = 4, 50 + fs.Config.MultiThreadSet = false + + nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) { + panic("don't call me") + } + f.Features().OpenWriterAt = nullWriterAt + + assert.True(t, doMultiThreadCopy(f, src)) + + fs.Config.MultiThreadStreams = 0 + assert.False(t, doMultiThreadCopy(f, src)) + fs.Config.MultiThreadStreams = 1 + assert.False(t, doMultiThreadCopy(f, src)) + fs.Config.MultiThreadStreams = 2 + assert.True(t, doMultiThreadCopy(f, src)) + + fs.Config.MultiThreadCutoff = 200 + assert.False(t, doMultiThreadCopy(f, src)) + fs.Config.MultiThreadCutoff = 101 + assert.False(t, doMultiThreadCopy(f, src)) + fs.Config.MultiThreadCutoff = 100 + assert.True(t, doMultiThreadCopy(f, src)) + + f.Features().OpenWriterAt = nil + assert.False(t, doMultiThreadCopy(f, src)) + f.Features().OpenWriterAt = nullWriterAt + assert.True(t, doMultiThreadCopy(f, src)) + + f.Features().IsLocal = true + srcFs.Features().IsLocal = true + assert.False(t, doMultiThreadCopy(f, src)) + fs.Config.MultiThreadSet = true + assert.True(t, doMultiThreadCopy(f, src)) + fs.Config.MultiThreadSet = false + assert.False(t, doMultiThreadCopy(f, src)) + srcFs.Features().IsLocal = false + assert.True(t, doMultiThreadCopy(f, src)) + srcFs.Features().IsLocal = true + assert.False(t, doMultiThreadCopy(f, src)) + f.Features().IsLocal = false + assert.True(t, doMultiThreadCopy(f, src)) + srcFs.Features().IsLocal = false + assert.True(t, doMultiThreadCopy(f, src)) +} + func TestMultithreadCalculateChunks(t *testing.T) { for _, test := range []struct { size int64 diff --git a/fs/operations/operations.go b/fs/operations/operations.go index f67ef087d..62c40a036 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -310,7 +310,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj } // If can't server side copy, do it manually if err == fs.ErrorCantCopy { - if doOpenWriterAt := f.Features().OpenWriterAt; doOpenWriterAt != nil && src.Size() >= int64(fs.Config.MultiThreadCutoff) && fs.Config.MultiThreadStreams > 1 { + if doMultiThreadCopy(f, src) { // Number of streams proportional to size streams := src.Size() / int64(fs.Config.MultiThreadCutoff) // With maximum