diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 9cc5ce0fe..24725b355 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -131,6 +131,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, openChunkWriter := f.Features().OpenChunkWriter ci := fs.GetConfig(ctx) noBuffering := false + usingOpenWriterAt := false if openChunkWriter == nil { openWriterAt := f.Features().OpenWriterAt if openWriterAt == nil { @@ -140,6 +141,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, // If we are using OpenWriterAt we don't seek the chunks so don't need to buffer fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt") noBuffering = true + usingOpenWriterAt = true } else if src.Fs().Features().IsLocal { // If the source fs is local we don't need to buffer fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk") @@ -241,12 +243,32 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err) } - if f.Features().PartialUploads { - err = obj.SetModTime(ctx, src.ModTime(ctx)) - switch err { - case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete: - default: - return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err) + // OpenWriterAt doesn't set metadata so we need to set it on completion + if usingOpenWriterAt { + setModTime := true + if ci.Metadata { + do, ok := obj.(fs.SetMetadataer) + if ok { + meta, err := fs.GetMetadataOptions(ctx, f, src, options) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to read metadata from source object: %w", err) + } + err = do.SetMetadata(ctx, meta) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to set metadata: %w", err) + } + setModTime = false + } else { + fs.Errorf(obj, "multi-thread copy: can't set metadata as SetMetadata isn't implemented in: %v", f) + } + } + if setModTime { + err = obj.SetModTime(ctx, src.ModTime(ctx)) + switch err { + case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete: + default: + return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err) + } } } diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index 59ff59d9d..6b4c86f7d 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -146,6 +146,9 @@ func TestMultithreadCopy(t *testing.T) { r := fstest.NewRun(t) ctx := context.Background() chunkSize := skipIfNotMultithread(ctx, t, r) + // Check every other transfer for metadata + checkMetadata := false + ctx, ci := fs.AddConfig(ctx) for _, upload := range []bool{false, true} { for _, test := range []struct { @@ -156,32 +159,54 @@ func TestMultithreadCopy(t *testing.T) { {size: chunkSize * 2, streams: 2}, {size: chunkSize*2 + 1, streams: 2}, } { + checkMetadata = !checkMetadata + ci.Metadata = checkMetadata fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams) t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) { if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit { t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit) } var ( - contents = random.String(test.size) - t1 = fstest.Time("2001-02-03T04:05:06.499999999Z") - file1 fstest.Item - src, dst fs.Object - err error + contents = random.String(test.size) + t1 = fstest.Time("2001-02-03T04:05:06.499999999Z") + file1 fstest.Item + src, dst fs.Object + err error + testMetadata = fs.Metadata{ + // System metadata supported by all backends + "mtime": t1.Format(time.RFC3339Nano), + // User metadata + "potato": "jersey", + } ) + var fSrc, fDst fs.Fs if upload { file1 = r.WriteFile(fileName, contents, t1) r.CheckRemoteItems(t) r.CheckLocalItems(t, file1) - src, err = r.Flocal.NewObject(ctx, fileName) + fDst, fSrc = r.Fremote, r.Flocal } else { file1 = r.WriteObject(ctx, fileName, contents, t1) r.CheckRemoteItems(t, file1) r.CheckLocalItems(t) - src, err = r.Fremote.NewObject(ctx, fileName) + fDst, fSrc = r.Flocal, r.Fremote } + src, err = fSrc.NewObject(ctx, fileName) require.NoError(t, err) + do, canSetMetadata := src.(fs.SetMetadataer) + if checkMetadata && canSetMetadata { + // Set metadata on the source if required + err := do.SetMetadata(ctx, testMetadata) + if err == fs.ErrorNotImplemented { + canSetMetadata = false + } else { + require.NoError(t, err) + fstest.CheckEntryMetadata(ctx, t, r.Flocal, src, testMetadata) + } + } + accounting.GlobalStats().ResetCounters() tr := accounting.GlobalStats().NewTransfer(src, nil) @@ -189,19 +214,21 @@ func TestMultithreadCopy(t *testing.T) { tr.Done(ctx, err) }() - if upload { - dst, err = multiThreadCopy(ctx, r.Fremote, fileName, src, test.streams, tr) - } else { - dst, err = multiThreadCopy(ctx, r.Flocal, fileName, src, test.streams, tr) - } - + dst, err = multiThreadCopy(ctx, fDst, fileName, src, test.streams, tr) require.NoError(t, err) + assert.Equal(t, src.Size(), dst.Size()) assert.Equal(t, fileName, dst.Remote()) - fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote)) - fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote)) + fstest.CheckListingWithPrecision(t, fSrc, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc)) + fstest.CheckListingWithPrecision(t, fDst, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc)) + + if checkMetadata && canSetMetadata && fDst.Features().ReadMetadata { + fstest.CheckEntryMetadata(ctx, t, fDst, dst, testMetadata) + } + require.NoError(t, dst.Remove(ctx)) require.NoError(t, src.Remove(ctx)) + }) } }