From 6f61da5c75b6c112e6d832f8dde80a8549f164c8 Mon Sep 17 00:00:00 2001
From: Nick Craig-Wood <nick@craig-wood.com>
Date: Wed, 8 Nov 2017 09:18:16 +0000
Subject: [PATCH] dropbox: buffer the chunks when uploading large files so they
 can be retried

We use fs.RepeatableReader to buffer the chunks which plays nice with
the accounting.  The default chunk size is 128M which may be too
large.

Fixes #1806
---
 dropbox/dropbox.go | 32 ++++++++++++++++++++++----------
 1 file changed, 22 insertions(+), 10 deletions(-)

diff --git a/dropbox/dropbox.go b/dropbox/dropbox.go
index aa7bd775b..51a128447 100644
--- a/dropbox/dropbox.go
+++ b/dropbox/dropbox.go
@@ -1,7 +1,6 @@
 // Package dropbox provides an interface to Dropbox object storage
 package dropbox
 
-// FIXME buffer chunks for retries in upload
 // FIXME dropbox for business would be quite easy to add
 
 /*
@@ -805,8 +804,6 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
 // Will work optimally if size is >= uploadChunkSize. If the size is either
 // unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an
 // avoidable request to the Dropbox API that does not carry payload.
-//
-// FIXME buffer chunks to improve upload retries
 func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
 	chunkSize := int64(uploadChunkSize)
 	chunks := 0
@@ -828,8 +825,13 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
 	// write the first chunk
 	fmtChunk(1, false)
 	var res *files.UploadSessionStartResult
-	err = o.fs.pacer.CallNoRetry(func() (bool, error) {
-		res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize})
+	chunk := fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize})
+	err = o.fs.pacer.Call(func() (bool, error) {
+		// seek to the start in case this is a retry
+		if _, err = chunk.Seek(0, 0); err != nil {
+			return false, nil
+		}
+		res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, chunk)
 		return shouldRetry(err)
 	})
 	if err != nil {
@@ -859,8 +861,13 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
 		}
 		cursor.Offset = in.BytesRead()
 		fmtChunk(currentChunk, false)
-		err = o.fs.pacer.CallNoRetry(func() (bool, error) {
-			err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize})
+		chunk = fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize})
+		err = o.fs.pacer.Call(func() (bool, error) {
+			// seek to the start in case this is a retry
+			if _, err = chunk.Seek(0, 0); err != nil {
+				return false, nil
+			}
+			err = o.fs.srv.UploadSessionAppendV2(&appendArg, chunk)
 			return shouldRetry(err)
 		})
 		if err != nil {
@@ -876,8 +883,13 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
 		Commit: commitInfo,
 	}
 	fmtChunk(currentChunk, true)
-	err = o.fs.pacer.CallNoRetry(func() (bool, error) {
-		entry, err = o.fs.srv.UploadSessionFinish(args, in)
+	chunk = fs.NewRepeatableReader(in)
+	err = o.fs.pacer.Call(func() (bool, error) {
+		// seek to the start in case this is a retry
+		if _, err = chunk.Seek(0, 0); err != nil {
+			return false, nil
+		}
+		entry, err = o.fs.srv.UploadSessionFinish(args, chunk)
 		return shouldRetry(err)
 	})
 	if err != nil {
@@ -921,7 +933,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
 
 // Remove an object
 func (o *Object) Remove() (err error) {
-	err = o.fs.pacer.CallNoRetry(func() (bool, error) {
+	err = o.fs.pacer.Call(func() (bool, error) {
 		_, err = o.fs.srv.DeleteV2(&files.DeleteArg{Path: o.remotePath()})
 		return shouldRetry(err)
 	})