1
0
mirror of https://github.com/rclone/rclone.git synced 2025-01-24 12:56:36 +02:00
Nick Craig-Wood 30cccc7101 cache: fix backends shutting down when in use when used via the rc
Before this fix, if a long running task (eg a copy) was started by the
rc then the backend could expire before the copy had finished.

The typical symptom was with the dropbox backend giving "batcher is
shutting down" errors.

This patch fixes the problem by pinning the backend until the job has
finished.

See: https://forum.rclone.org/t/uploads-start-repeatedly-failing-after-a-while-using-rc-sync-copy-vs-rclone-copy-for-dropbox/38873/
2023-06-13 15:48:20 +01:00

198 lines
5.5 KiB
Go

// Package cache implements the Fs cache
package cache
import (
"context"
"runtime"
"sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/lib/cache"
)
var (
once sync.Once // creation
c *cache.Cache
mu sync.Mutex // mutex to protect remap
remap = map[string]string{} // map user supplied names to canonical names
)
// Create the cache just once
func createOnFirstUse() {
once.Do(func() {
ci := fs.GetConfig(context.Background())
c = cache.New()
c.SetExpireDuration(ci.FsCacheExpireDuration)
c.SetExpireInterval(ci.FsCacheExpireInterval)
c.SetFinalizer(func(value interface{}) {
if s, ok := value.(fs.Shutdowner); ok {
_ = fs.CountError(s.Shutdown(context.Background()))
}
})
})
}
// Canonicalize looks up fsString in the mapping from user supplied
// names to canonical names and return the canonical form
func Canonicalize(fsString string) string {
createOnFirstUse()
mu.Lock()
canonicalName, ok := remap[fsString]
mu.Unlock()
if !ok {
return fsString
}
fs.Debugf(nil, "fs cache: switching user supplied name %q for canonical name %q", fsString, canonicalName)
return canonicalName
}
// Put in a mapping from fsString => canonicalName if they are different
func addMapping(fsString, canonicalName string) {
if canonicalName == fsString {
return
}
mu.Lock()
remap[fsString] = canonicalName
mu.Unlock()
}
// GetFn gets an fs.Fs named fsString either from the cache or creates
// it afresh with the create function
func GetFn(ctx context.Context, fsString string, create func(ctx context.Context, fsString string) (fs.Fs, error)) (f fs.Fs, err error) {
createOnFirstUse()
canonicalFsString := Canonicalize(fsString)
created := false
value, err := c.Get(canonicalFsString, func(canonicalFsString string) (f interface{}, ok bool, err error) {
f, err = create(ctx, fsString) // always create the backend with the original non-canonicalised string
ok = err == nil || err == fs.ErrorIsFile
created = ok
return f, ok, err
})
if err != nil && err != fs.ErrorIsFile {
return nil, err
}
f = value.(fs.Fs)
// Check we stored the Fs at the canonical name
if created {
canonicalName := fs.ConfigString(f)
if canonicalName != canonicalFsString {
// Note that if err == fs.ErrorIsFile at this moment
// then we can't rename the remote as it will have the
// wrong error status, we need to add a new one.
if err == nil {
fs.Debugf(nil, "fs cache: renaming cache item %q to be canonical %q", canonicalFsString, canonicalName)
value, found := c.Rename(canonicalFsString, canonicalName)
if found {
f = value.(fs.Fs)
}
addMapping(canonicalFsString, canonicalName)
} else {
fs.Debugf(nil, "fs cache: adding new entry for parent of %q, %q", canonicalFsString, canonicalName)
Put(canonicalName, f)
}
}
}
return f, err
}
// Pin f into the cache until Unpin is called
func Pin(f fs.Fs) {
createOnFirstUse()
c.Pin(fs.ConfigString(f))
}
// PinUntilFinalized pins f into the cache until x is garbage collected
//
// This calls runtime.SetFinalizer on x so it shouldn't have a
// finalizer already.
func PinUntilFinalized(f fs.Fs, x interface{}) {
Pin(f)
runtime.SetFinalizer(x, func(_ interface{}) {
Unpin(f)
})
}
// Unpin f from the cache
func Unpin(f fs.Fs) {
createOnFirstUse()
c.Unpin(fs.ConfigString(f))
}
// To avoid circular dependencies these are filled in by fs/rc/jobs/job.go
var (
// JobGetJobID for internal use only
JobGetJobID func(context.Context) (int64, bool)
// JobOnFinish for internal use only
JobOnFinish func(int64, func()) (func(), error)
)
// Get gets an fs.Fs named fsString either from the cache or creates it afresh
func Get(ctx context.Context, fsString string) (f fs.Fs, err error) {
// If we are making a long lived backend which lives longer
// than this request, we want to disconnect it from the
// current context and in particular any WithCancel contexts,
// but we want to preserve the config embedded in the context.
newCtx := context.Background()
newCtx = fs.CopyConfig(newCtx, ctx)
newCtx = filter.CopyConfig(newCtx, ctx)
f, err = GetFn(newCtx, fsString, fs.NewFs)
if f == nil || (err != nil && err != fs.ErrorIsFile) {
return f, err
}
// If this is part of an rc job then pin the backend until it finishes
if JobOnFinish != nil && JobGetJobID != nil {
if jobID, ok := JobGetJobID(ctx); ok {
// fs.Debugf(f, "Pin for job %d", jobID)
Pin(f)
_, _ = JobOnFinish(jobID, func() {
// fs.Debugf(f, "Unpin for job %d", jobID)
Unpin(f)
})
}
}
return f, err
}
// GetArr gets []fs.Fs from []fsStrings either from the cache or creates it afresh
func GetArr(ctx context.Context, fsStrings []string) (f []fs.Fs, err error) {
var fArr []fs.Fs
for _, fsString := range fsStrings {
f1, err1 := GetFn(ctx, fsString, fs.NewFs)
if err1 != nil {
return fArr, err1
}
fArr = append(fArr, f1)
}
return fArr, nil
}
// Put puts an fs.Fs named fsString into the cache
func Put(fsString string, f fs.Fs) {
createOnFirstUse()
canonicalName := fs.ConfigString(f)
c.Put(canonicalName, f)
addMapping(fsString, canonicalName)
}
// ClearConfig deletes all entries which were based on the config name passed in
//
// Returns number of entries deleted
func ClearConfig(name string) (deleted int) {
createOnFirstUse()
return c.DeletePrefix(name + ":")
}
// Clear removes everything from the cache
func Clear() {
createOnFirstUse()
c.Clear()
}
// Entries returns the number of entries in the cache
func Entries() int {
createOnFirstUse()
return c.Entries()
}