diff --git a/cmd/rcat/rcat.go b/cmd/rcat/rcat.go index 937c90cfa..0d6be6aee 100644 --- a/cmd/rcat/rcat.go +++ b/cmd/rcat/rcat.go @@ -22,13 +22,24 @@ rclone rcat reads from standard input (stdin) and copies it to a single remote file. echo "hello world" | rclone rcat remote:path/to/file + ffmpeg - | rclone rcat --checksum remote:path/to/file -Note that since the size is not known in advance, chunking options -will likely be ignored. The upload can also not be retried because -the data is not kept around until the upload succeeds. If you need -to transfer a lot of data, you're better off caching locally and -then ` + "`rclone move`" + ` it to the destination. -`, +If the remote file already exists, it will be overwritten. + +rcat will try to upload small files in a single request, which is +usually more efficient than the streaming/chunked upload endpoints, +which use multiple requests. Exact behaviour depends on the remote. +What is considered a small file may be set through +` + "`--streaming-upload-cutoff`" + `. Uploading only starts after +the cutoff is reached or if the file ends before that. The data +must fit into RAM. The cutoff needs to be small enough to adhere +the limits of your remote, please see there. Generally speaking, +setting this cutoff too high will decrease your performance. + +Note that the upload can also not be retried because the data is +not kept around until the upload succeeds. If you need to transfer +a lot of data, you're better off caching locally and then +` + "`rclone move`" + ` it to the destination.`, Run: func(command *cobra.Command, args []string) { cmd.CheckArgs(1, 1, command, args) diff --git a/fs/config.go b/fs/config.go index 0ec998274..da8b51b67 100644 --- a/fs/config.go +++ b/fs/config.go @@ -64,48 +64,49 @@ var ( // Config is the global config Config = &ConfigInfo{} // Flags - verbose = CountP("verbose", "v", "Print lots more stuff (repeat for more)") - quiet = BoolP("quiet", "q", false, "Print as little stuff as possible") - modifyWindow = DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same") - checkers = IntP("checkers", "", 8, "Number of checkers to run in parallel.") - transfers = IntP("transfers", "", 4, "Number of file transfers to run in parallel.") - configFile = StringP("config", "", ConfigPath, "Config file.") - checkSum = BoolP("checksum", "c", false, "Skip based on checksum & size, not mod-time & size") - sizeOnly = BoolP("size-only", "", false, "Skip based on size only, not mod-time or checksum") - ignoreTimes = BoolP("ignore-times", "I", false, "Don't skip files that match size and time - transfer all files") - ignoreExisting = BoolP("ignore-existing", "", false, "Skip all files that exist on destination") - dryRun = BoolP("dry-run", "n", false, "Do a trial run with no permanent changes") - connectTimeout = DurationP("contimeout", "", 60*time.Second, "Connect timeout") - timeout = DurationP("timeout", "", 5*60*time.Second, "IO idle timeout") - dumpHeaders = BoolP("dump-headers", "", false, "Dump HTTP headers - may contain sensitive info") - dumpBodies = BoolP("dump-bodies", "", false, "Dump HTTP headers and bodies - may contain sensitive info") - dumpAuth = BoolP("dump-auth", "", false, "Dump HTTP headers with auth info") - skipVerify = BoolP("no-check-certificate", "", false, "Do not verify the server SSL certificate. Insecure.") - AskPassword = BoolP("ask-password", "", true, "Allow prompt for password for encrypted configuration.") - deleteBefore = BoolP("delete-before", "", false, "When synchronizing, delete files on destination before transfering") - deleteDuring = BoolP("delete-during", "", false, "When synchronizing, delete files during transfer (default)") - deleteAfter = BoolP("delete-after", "", false, "When synchronizing, delete files on destination after transfering") - trackRenames = BoolP("track-renames", "", false, "When synchronizing, track file renames and do a server side move if possible") - lowLevelRetries = IntP("low-level-retries", "", 10, "Number of low level retries to do.") - updateOlder = BoolP("update", "u", false, "Skip files that are newer on the destination.") - noGzip = BoolP("no-gzip-encoding", "", false, "Don't set Accept-Encoding: gzip.") - maxDepth = IntP("max-depth", "", -1, "If set limits the recursion depth to this.") - ignoreSize = BoolP("ignore-size", "", false, "Ignore size when skipping use mod-time or checksum.") - ignoreChecksum = BoolP("ignore-checksum", "", false, "Skip post copy check of checksums.") - noTraverse = BoolP("no-traverse", "", false, "Don't traverse destination file system on copy.") - noUpdateModTime = BoolP("no-update-modtime", "", false, "Don't update destination mod-time if files identical.") - backupDir = StringP("backup-dir", "", "", "Make backups into hierarchy based in DIR.") - suffix = StringP("suffix", "", "", "Suffix for use with --backup-dir.") - useListR = BoolP("fast-list", "", false, "Use recursive list if available. Uses more memory but fewer transactions.") - tpsLimit = Float64P("tpslimit", "", 0, "Limit HTTP transactions per second to this.") - tpsLimitBurst = IntP("tpslimit-burst", "", 1, "Max burst of transactions for --tpslimit.") - bindAddr = StringP("bind", "", "", "Local address to bind to for outgoing connections, IPv4, IPv6 or name.") - disableFeatures = StringP("disable", "", "", "Disable a comma separated list of features. Use help to see a list.") - userAgent = StringP("user-agent", "", "rclone/"+Version, "Set the user-agent to a specified string. The default is rclone/ version") - logLevel = LogLevelNotice - statsLogLevel = LogLevelInfo - bwLimit BwTimetable - bufferSize SizeSuffix = 16 << 20 + verbose = CountP("verbose", "v", "Print lots more stuff (repeat for more)") + quiet = BoolP("quiet", "q", false, "Print as little stuff as possible") + modifyWindow = DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same") + checkers = IntP("checkers", "", 8, "Number of checkers to run in parallel.") + transfers = IntP("transfers", "", 4, "Number of file transfers to run in parallel.") + configFile = StringP("config", "", ConfigPath, "Config file.") + checkSum = BoolP("checksum", "c", false, "Skip based on checksum & size, not mod-time & size") + sizeOnly = BoolP("size-only", "", false, "Skip based on size only, not mod-time or checksum") + ignoreTimes = BoolP("ignore-times", "I", false, "Don't skip files that match size and time - transfer all files") + ignoreExisting = BoolP("ignore-existing", "", false, "Skip all files that exist on destination") + dryRun = BoolP("dry-run", "n", false, "Do a trial run with no permanent changes") + connectTimeout = DurationP("contimeout", "", 60*time.Second, "Connect timeout") + timeout = DurationP("timeout", "", 5*60*time.Second, "IO idle timeout") + dumpHeaders = BoolP("dump-headers", "", false, "Dump HTTP headers - may contain sensitive info") + dumpBodies = BoolP("dump-bodies", "", false, "Dump HTTP headers and bodies - may contain sensitive info") + dumpAuth = BoolP("dump-auth", "", false, "Dump HTTP headers with auth info") + skipVerify = BoolP("no-check-certificate", "", false, "Do not verify the server SSL certificate. Insecure.") + AskPassword = BoolP("ask-password", "", true, "Allow prompt for password for encrypted configuration.") + deleteBefore = BoolP("delete-before", "", false, "When synchronizing, delete files on destination before transfering") + deleteDuring = BoolP("delete-during", "", false, "When synchronizing, delete files during transfer (default)") + deleteAfter = BoolP("delete-after", "", false, "When synchronizing, delete files on destination after transfering") + trackRenames = BoolP("track-renames", "", false, "When synchronizing, track file renames and do a server side move if possible") + lowLevelRetries = IntP("low-level-retries", "", 10, "Number of low level retries to do.") + updateOlder = BoolP("update", "u", false, "Skip files that are newer on the destination.") + noGzip = BoolP("no-gzip-encoding", "", false, "Don't set Accept-Encoding: gzip.") + maxDepth = IntP("max-depth", "", -1, "If set limits the recursion depth to this.") + ignoreSize = BoolP("ignore-size", "", false, "Ignore size when skipping use mod-time or checksum.") + ignoreChecksum = BoolP("ignore-checksum", "", false, "Skip post copy check of checksums.") + noTraverse = BoolP("no-traverse", "", false, "Don't traverse destination file system on copy.") + noUpdateModTime = BoolP("no-update-modtime", "", false, "Don't update destination mod-time if files identical.") + backupDir = StringP("backup-dir", "", "", "Make backups into hierarchy based in DIR.") + suffix = StringP("suffix", "", "", "Suffix for use with --backup-dir.") + useListR = BoolP("fast-list", "", false, "Use recursive list if available. Uses more memory but fewer transactions.") + tpsLimit = Float64P("tpslimit", "", 0, "Limit HTTP transactions per second to this.") + tpsLimitBurst = IntP("tpslimit-burst", "", 1, "Max burst of transactions for --tpslimit.") + bindAddr = StringP("bind", "", "", "Local address to bind to for outgoing connections, IPv4, IPv6 or name.") + disableFeatures = StringP("disable", "", "", "Disable a comma separated list of features. Use help to see a list.") + userAgent = StringP("user-agent", "", "rclone/"+Version, "Set the user-agent to a specified string. The default is rclone/ version") + streamingUploadCutoff = SizeSuffix(100 * 1024) + logLevel = LogLevelNotice + statsLogLevel = LogLevelInfo + bwLimit BwTimetable + bufferSize SizeSuffix = 16 << 20 // Key to use for password en/decryption. // When nil, no encryption will be used for saving. @@ -117,6 +118,7 @@ func init() { VarP(&statsLogLevel, "stats-log-level", "", "Log level to show --stats output DEBUG|INFO|NOTICE|ERROR") VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix b|k|M|G or a full timetable.") VarP(&bufferSize, "buffer-size", "", "Buffer size when copying files.") + VarP(&streamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.") } // crypt internals @@ -202,42 +204,43 @@ func MustReveal(x string) string { // ConfigInfo is filesystem config options type ConfigInfo struct { - LogLevel LogLevel - StatsLogLevel LogLevel - DryRun bool - CheckSum bool - SizeOnly bool - IgnoreTimes bool - IgnoreExisting bool - ModifyWindow time.Duration - Checkers int - Transfers int - ConnectTimeout time.Duration // Connect timeout - Timeout time.Duration // Data channel timeout - DumpHeaders bool - DumpBodies bool - DumpAuth bool - Filter *Filter - InsecureSkipVerify bool // Skip server certificate verification - DeleteMode DeleteMode - TrackRenames bool // Track file renames. - LowLevelRetries int - UpdateOlder bool // Skip files that are newer on the destination - NoGzip bool // Disable compression - MaxDepth int - IgnoreSize bool - IgnoreChecksum bool - NoTraverse bool - NoUpdateModTime bool - DataRateUnit string - BackupDir string - Suffix string - UseListR bool - BufferSize SizeSuffix - TPSLimit float64 - TPSLimitBurst int - BindAddr net.IP - DisableFeatures []string + LogLevel LogLevel + StatsLogLevel LogLevel + DryRun bool + CheckSum bool + SizeOnly bool + IgnoreTimes bool + IgnoreExisting bool + ModifyWindow time.Duration + Checkers int + Transfers int + ConnectTimeout time.Duration // Connect timeout + Timeout time.Duration // Data channel timeout + DumpHeaders bool + DumpBodies bool + DumpAuth bool + Filter *Filter + InsecureSkipVerify bool // Skip server certificate verification + DeleteMode DeleteMode + TrackRenames bool // Track file renames. + LowLevelRetries int + UpdateOlder bool // Skip files that are newer on the destination + NoGzip bool // Disable compression + MaxDepth int + IgnoreSize bool + IgnoreChecksum bool + NoTraverse bool + NoUpdateModTime bool + DataRateUnit string + BackupDir string + Suffix string + UseListR bool + BufferSize SizeSuffix + TPSLimit float64 + TPSLimitBurst int + BindAddr net.IP + DisableFeatures []string + StreamingUploadCutoff SizeSuffix } // Return the path to the configuration file @@ -377,6 +380,7 @@ func LoadConfig() { Config.TPSLimit = *tpsLimit Config.TPSLimitBurst = *tpsLimitBurst Config.BufferSize = bufferSize + Config.StreamingUploadCutoff = streamingUploadCutoff Config.TrackRenames = *trackRenames diff --git a/fs/operations.go b/fs/operations.go index 4b2c204bd..ebfe8c049 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -65,7 +65,7 @@ func HashEquals(src, dst string) bool { // err - may return an error which will already have been logged // // If an error is returned it will return equal as false -func CheckHashes(src, dst Object) (equal bool, hash HashType, err error) { +func CheckHashes(src ObjectInfo, dst Object) (equal bool, hash HashType, err error) { common := src.Fs().Hashes().Overlap(dst.Fs().Hashes()) // Debugf(nil, "Shared hashes: %v", common) if common.Count() == 0 { @@ -115,11 +115,11 @@ func CheckHashes(src, dst Object) (equal bool, hash HashType, err error) { // // Otherwise the file is considered to be not equal including if there // were errors reading info. -func Equal(src, dst Object) bool { +func Equal(src ObjectInfo, dst Object) bool { return equal(src, dst, Config.SizeOnly, Config.CheckSum) } -func equal(src, dst Object, sizeOnly, checkSum bool) bool { +func equal(src ObjectInfo, dst Object, sizeOnly, checkSum bool) bool { if !Config.IgnoreSize { if src.Size() != dst.Size() { Debugf(src, "Sizes differ") @@ -1587,27 +1587,44 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er Stats.Transferring(dstFileName) defer func() { Stats.DoneTransferring(dstFileName, err == nil) - if err := in0.Close(); err != nil { + if err = in0.Close(); err != nil { Debugf(fdst, "Rcat: failed to close source: %v", err) } }() - hashOption := &HashesOption{Hashes: NewHashSet()} - - in := in0 - buf := make([]byte, 100*1024) - if n, err := io.ReadFull(in0, buf); err != nil { - Debugf(fdst, "File to upload is small, uploading instead of streaming") - in = ioutil.NopCloser(bytes.NewReader(buf[:n])) - in = NewAccountSizeName(in, int64(n), dstFileName).WithBuffer() - if !Config.SizeOnly { - hashOption = &HashesOption{Hashes: HashSet(fdst.Hashes().GetOne())} - } - objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil) - _, err := fdst.Put(in, objInfo, hashOption) + hashOption := &HashesOption{Hashes: fdst.Hashes()} + hash, err := NewMultiHasherTypes(fdst.Hashes()) + if err != nil { return err } - in = ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), in0)) + readCounter := NewCountingReader(in0) + trackingIn := io.TeeReader(readCounter, hash) + + compare := func(dst Object) error { + src := NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, hash.Sums(), fdst) + if !Equal(src, dst) { + Stats.Error() + err = errors.Errorf("corrupted on transfer") + Errorf(dst, "%v", err) + return err + } + return nil + } + + // check if file small enough for direct upload + buf := make([]byte, Config.StreamingUploadCutoff) + if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF { + Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n) + in := ioutil.NopCloser(bytes.NewReader(buf[:n])) + in = NewAccountSizeName(in, int64(n), dstFileName).WithBuffer() + objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil) + dst, err := fdst.Put(in, objInfo, hashOption) + if err != nil { + return err + } + return compare(dst) + } + in := ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), trackingIn)) fStreamTo := fdst canStream := fdst.Features().PutStream != nil @@ -1626,10 +1643,6 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er fStreamTo = tmpLocalFs } - if !Config.SizeOnly { - hashOption = &HashesOption{Hashes: HashSet(fStreamTo.Hashes().GetOne())} - } - in = NewAccountSizeName(in, -1, dstFileName).WithBuffer() if Config.DryRun { @@ -1641,10 +1654,16 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er objInfo := NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil) tmpObj, err := fStreamTo.Features().PutStream(in, objInfo, hashOption) - if err == nil && !canStream { - err = Copy(fdst, nil, dstFileName, tmpObj) + if err != nil { + return err } - return err + if err = compare(tmpObj); err != nil { + return err + } + if !canStream { + return Copy(fdst, nil, dstFileName, tmpObj) + } + return nil } // Rmdirs removes any empty directories (or directories only diff --git a/fs/operations_test.go b/fs/operations_test.go index 2dbbef5eb..1ac25457a 100644 --- a/fs/operations_test.go +++ b/fs/operations_test.go @@ -732,28 +732,39 @@ func TestCat(t *testing.T) { } func TestRcat(t *testing.T) { - r := NewRun(t) - defer r.Finalise() + checkSumBefore := fs.Config.CheckSum + defer func() { fs.Config.CheckSum = checkSumBefore }() - fstest.CheckListing(t, r.fremote, []fstest.Item{}) + check := func() { + r := NewRun(t) + defer r.Finalise() - data1 := "this is some really nice test data" - path1 := "small_file_from_pipe" + fstest.CheckListing(t, r.fremote, []fstest.Item{}) - data2 := string(make([]byte, 100*1024+1)) - path2 := "big_file_from_pipe" + data1 := "this is some really nice test data" + path1 := "small_file_from_pipe" - in := ioutil.NopCloser(strings.NewReader(data1)) - err := fs.Rcat(r.fremote, path1, in, t1) - require.NoError(t, err) + data2 := string(make([]byte, fs.Config.StreamingUploadCutoff+1)) + path2 := "big_file_from_pipe" - in = ioutil.NopCloser(strings.NewReader(data2)) - err = fs.Rcat(r.fremote, path2, in, t2) - require.NoError(t, err) + in := ioutil.NopCloser(strings.NewReader(data1)) + err := fs.Rcat(r.fremote, path1, in, t1) + require.NoError(t, err) - file1 := fstest.NewItem(path1, data1, t1) - file2 := fstest.NewItem(path2, data2, t2) - fstest.CheckItems(t, r.fremote, file1, file2) + in = ioutil.NopCloser(strings.NewReader(data2)) + err = fs.Rcat(r.fremote, path2, in, t2) + require.NoError(t, err) + + file1 := fstest.NewItem(path1, data1, t1) + file2 := fstest.NewItem(path2, data2, t2) + fstest.CheckItems(t, r.fremote, file1, file2) + } + + fs.Config.CheckSum = true + check() + + fs.Config.CheckSum = false + check() } func TestRmdirs(t *testing.T) {