diff --git a/README.md b/README.md index 1e8dc380b..ae2978b85 100644 --- a/README.md +++ b/README.md @@ -254,9 +254,6 @@ COPYING file included in this package). Bugs ---- - * Drive: Sometimes get: Failed to copy: Upload failed: googleapi: Error 403: Rate Limit Exceeded - * quota is 100.0 requests/second/user - * just retry the command if this happens * Empty directories left behind with Local and Drive * eg purging a local directory with subdirectories doesn't work diff --git a/docs/content/docs.md b/docs/content/docs.md index 16bf39a0a..c948676e8 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -132,8 +132,6 @@ COPYING file included in this package). Bugs ---- - * Drive: Sometimes get: Failed to copy: Upload failed: googleapi: Error 403: Rate Limit Exceeded - * quota is 100.0 requests/second/user * Empty directories left behind with Local and Drive * eg purging a local directory with subdirectories doesn't work diff --git a/drive/drive.go b/drive/drive.go index 0aca61645..b3da4d31f 100644 --- a/drive/drive.go +++ b/drive/drive.go @@ -1,14 +1,6 @@ // Drive interface package drive -// Gets this quite often -// Failed to set mtime: googleapi: Error 403: Rate Limit Exceeded - -// FIXME list containers equivalent should list directories? - -// FIXME list directory should list to channel for concurrency not -// append to array - // FIXME need to deal with some corner cases // * multiple files with the same name // * files can be in multiple directories @@ -27,6 +19,7 @@ import ( "time" "google.golang.org/api/drive/v2" + "google.golang.org/api/googleapi" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/googleauth" @@ -40,6 +33,9 @@ const ( driveFolderType = "application/vnd.google-apps.folder" timeFormatIn = time.RFC3339 timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00" + minSleep = 10 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 // bigger for slower decay, exponential ) // Globals @@ -83,6 +79,8 @@ type FsDrive struct { findRootLock sync.Mutex // Protect findRoot from concurrent use dirCache dirCache // Map of directory path to directory id findDirLock sync.Mutex // Protect findDir from concurrent use + pacer chan struct{} // To pace the operations + sleepTime time.Duration // Time to sleep for each transaction } // FsObjectDrive describes a drive object @@ -149,6 +147,74 @@ func (f *FsDrive) String() string { return fmt.Sprintf("Google drive root '%s'", f.root) } +// Wait for the pace +func (f *FsDrive) paceWait() { + // pacer starts with a token in and whenever we take one out + // XXX ms later we put another in. We could do this with a + // Ticker more accurately, but then we'd have to work out how + // not to run it when it wasn't needed + <-f.pacer + + // Restart the timer + go func(t time.Duration) { + // fs.Debug(f, "New sleep for %v at %v", t, time.Now()) + time.Sleep(t) + f.pacer <- struct{}{} + }(f.sleepTime) +} + +// Refresh the pace given an error that was returned. It returns a +// boolean as to whether the operation should be retried. +// +// See https://developers.google.com/drive/web/handle-errors +// http://stackoverflow.com/questions/18529524/403-rate-limit-after-only-1-insert-per-second +func (f *FsDrive) paceRefresh(err error) bool { + again := false + oldSleepTime := f.sleepTime + if err == nil { + f.sleepTime = (f.sleepTime<> decayConstant + if f.sleepTime < minSleep { + f.sleepTime = minSleep + } + if f.sleepTime != oldSleepTime { + fs.Debug(f, "Reducing sleep to %v", f.sleepTime) + } + } else { + fs.Debug(f, "Error recived: %v", err) + if gerr, ok := err.(*googleapi.Error); ok { + if len(gerr.Errors) > 0 { + reason := gerr.Errors[0].Reason + if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" { + f.sleepTime *= 2 + if f.sleepTime > maxSleep { + f.sleepTime = maxSleep + } + if f.sleepTime != oldSleepTime { + fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime) + } + again = true + } + } + } + } + + return again +} + +// Pace the remote operations to not exceed Google's limits and retry +// on 403 rate limit exceeded +// +// This calls fn, expecting it to place its error in perr +func (f *FsDrive) pace(perr *error, fn func()) { + for { + f.paceWait() + fn() + if !f.paceRefresh(*perr) { + break + } + } +} + // parseParse parses a drive 'url' func parseDrivePath(path string) (root string, err error) { root = strings.Trim(path, "/") @@ -186,7 +252,10 @@ func (f *FsDrive) listAll(dirId string, title string, directoriesOnly bool, file list := f.svc.Files.List().Q(query).MaxResults(1000) OUTER: for { - files, err := list.Do() + var files *drive.FileList + f.pace(&err, func() { + files, err = list.Do() + }) if err != nil { return false, fmt.Errorf("Couldn't list directory: %s", err) } @@ -217,10 +286,15 @@ func NewFs(name, path string) (fs.Fs, error) { } f := &FsDrive{ - root: root, - dirCache: newDirCache(), + root: root, + dirCache: newDirCache(), + pacer: make(chan struct{}, 1), + sleepTime: minSleep, } + // Put the first pacing token in + f.pacer <- struct{}{} + // Create a new authorized Drive client. f.client = t.Client() f.svc, err = drive.New(f.client) @@ -229,7 +303,9 @@ func NewFs(name, path string) (fs.Fs, error) { } // Read About so we know the root path - f.about, err = f.svc.About.Get().Do() + f.pace(&err, func() { + f.about, err = f.svc.About.Get().Do() + }) if err != nil { return nil, fmt.Errorf("Couldn't read info about Drive: %s", err) } @@ -489,13 +565,16 @@ func (f *FsDrive) _findDir(path string, create bool) (pathId string, err error) if create { // fmt.Println("Making", path) // Define the metadata for the directory we are going to create. - info := &drive.File{ + createInfo := &drive.File{ Title: leaf, Description: leaf, MimeType: driveFolderType, Parents: []*drive.ParentReference{{Id: pathId}}, } - info, err := f.svc.Files.Insert(info).Do() + var info *drive.File + f.pace(&err, func() { + info, err = f.svc.Files.Insert(createInfo).Do() + }) if err != nil { return pathId, fmt.Errorf("Failed to make directory: %v", err) } @@ -629,7 +708,7 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64 modifiedDate := modTime.Format(timeFormatOut) // Define the metadata for the file we are going to create. - info := &drive.File{ + createInfo := &drive.File{ Title: leaf, Description: leaf, Parents: []*drive.ParentReference{{Id: directoryId}}, @@ -639,7 +718,13 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64 // Make the API request to upload metadata and file data. in = &fs.SeekWrapper{In: in, Size: size} - info, err = f.svc.Files.Insert(info).Media(in).Do() + var info *drive.File + // Don't retry, return a retry error instead + f.paceWait() + info, err = f.svc.Files.Insert(createInfo).Media(in).Do() + if f.paceRefresh(err) { + return o, fs.RetryErrorf("Upload failed - retry: %s", err) + } if err != nil { return o, fmt.Errorf("Upload failed: %s", err) } @@ -660,7 +745,10 @@ func (f *FsDrive) Rmdir() error { if err != nil { return err } - children, err := f.svc.Children.List(f.rootId).MaxResults(10).Do() + var children *drive.ChildList + f.pace(&err, func() { + children, err = f.svc.Children.List(f.rootId).MaxResults(10).Do() + }) if err != nil { return err } @@ -669,7 +757,9 @@ func (f *FsDrive) Rmdir() error { } // Delete the directory if it isn't the root if f.root != "" { - err = f.svc.Files.Delete(f.rootId).Do() + f.pace(&err, func() { + err = f.svc.Files.Delete(f.rootId).Do() + }) if err != nil { return err } @@ -696,7 +786,9 @@ func (f *FsDrive) Purge() error { if err != nil { return err } - err = f.svc.Files.Delete(f.rootId).Do() + f.pace(&err, func() { + err = f.svc.Files.Delete(f.rootId).Do() + }) f.resetRoot() if err != nil { return err @@ -801,11 +893,14 @@ func (o *FsObjectDrive) SetModTime(modTime time.Time) { return } // New metadata - info := &drive.File{ + updateInfo := &drive.File{ ModifiedDate: modTime.Format(timeFormatOut), } // Set modified date - info, err = o.drive.svc.Files.Update(o.id, info).SetModifiedDate(true).Do() + var info *drive.File + o.drive.pace(&err, func() { + info, err = o.drive.svc.Files.Update(o.id, updateInfo).SetModifiedDate(true).Do() + }) if err != nil { fs.Stats.Error() fs.Log(o, "Failed to update remote mtime: %s", err) @@ -826,7 +921,10 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) { return nil, err } req.Header.Set("User-Agent", fs.UserAgent) - res, err := o.drive.client.Do(req) + var res *http.Response + o.drive.pace(&err, func() { + res, err = o.drive.client.Do(req) + }) if err != nil { return nil, err } @@ -843,14 +941,21 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) { // // The new object may have been created if an error is returned func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) error { - info := &drive.File{ + updateInfo := &drive.File{ Id: o.id, ModifiedDate: modTime.Format(timeFormatOut), } // Make the API request to upload metadata and file data. in = &fs.SeekWrapper{In: in, Size: size} - info, err := o.drive.svc.Files.Update(info.Id, info).SetModifiedDate(true).Media(in).Do() + var err error + var info *drive.File + // Don't retry, return a retry error instead + o.drive.paceWait() + info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do() + if o.drive.paceRefresh(err) { + return fs.RetryErrorf("Update failed - retry: %s", err) + } if err != nil { return fmt.Errorf("Update failed: %s", err) } @@ -860,7 +965,11 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro // Remove an object func (o *FsObjectDrive) Remove() error { - return o.drive.svc.Files.Delete(o.id).Do() + var err error + o.drive.pace(&err, func() { + err = o.drive.svc.Files.Delete(o.id).Do() + }) + return err } // Check the interfaces are satisfied diff --git a/fs/fs.go b/fs/fs.go index 30295db75..e107d4b5a 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -142,6 +142,35 @@ type Purger interface { Purge() error } +// An optional interface for error as to whether the operation should be retried +// +// This should be returned from Update or Put methods as required +type Retry interface { + error + Retry() bool +} + +// A type of error +type retryError string + +// Error interface +func (r retryError) Error() string { + return string(r) +} + +// Retry interface +func (r retryError) Retry() bool { + return true +} + +// Check interface +var _ Retry = retryError("") + +// RetryErrorf makes an error which indicates it would like to be retried +func RetryErrorf(format string, a ...interface{}) error { + return retryError(fmt.Sprintf(format, a...)) +} + // A channel of Objects type ObjectsChan chan Object diff --git a/fs/operations.go b/fs/operations.go index d4e7612c4..ec34a996b 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -103,8 +103,7 @@ func removeFailedCopy(dst Object) { Debug(dst, "Removing failed copy") removeErr := dst.Remove() if removeErr != nil { - Stats.Error() - Log(dst, "Failed to remove failed copy: %s", removeErr) + Debug(dst, "Failed to remove failed copy: %s", removeErr) } } } @@ -115,6 +114,10 @@ func removeFailedCopy(dst Object) { // call Copy() with dst nil on a pre-existing file then some filing // systems (eg Drive) may duplicate the file. func Copy(f Fs, dst, src Object) { + const maxTries = 10 + tries := 0 + doUpdate := dst != nil +tryAgain: in0, err := src.Open() if err != nil { Stats.Error() @@ -124,7 +127,7 @@ func Copy(f Fs, dst, src Object) { in := NewAccount(in0) // account the transfer var actionTaken string - if dst != nil { + if doUpdate { actionTaken = "Copied (updated existing)" err = dst.Update(in, src.ModTime(), src.Size()) } else { @@ -132,6 +135,13 @@ func Copy(f Fs, dst, src Object) { dst, err = f.Put(in, src.Remote(), src.ModTime(), src.Size()) } inErr := in.Close() + // Retry if err returned a retry error + if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries { + tries++ + Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries) + removeFailedCopy(dst) + goto tryAgain + } if err == nil { err = inErr }