diff --git a/vfs/dir.go b/vfs/dir.go index 70181fd79..f8d86d489 100644 --- a/vfs/dir.go +++ b/vfs/dir.go @@ -191,6 +191,26 @@ func (d *Dir) walk(fun func(*Dir)) { fun(d) } +// countActiveWriters returns the number of writers active in this +// directory and any subdirectories. +func (d *Dir) countActiveWriters() (writers int) { + d.walk(func(d *Dir) { + // NB d.mu is held by walk() here + fs.Debugf(d.path, "Looking for writers") + for leaf, item := range d.items { + fs.Debugf(leaf, "reading active writers") + if file, ok := item.(*File); ok { + n := file.activeWriters() + if n != 0 { + fs.Debugf(file, "active writers %d", n) + } + writers += n + } + } + }) + return writers +} + // age returns the duration since the last time the directory contents // was read and the content is cosidered stale. age will be 0 and // stale true if the last read time is empty. @@ -698,9 +718,16 @@ func (d *Dir) Sync() error { // VFS returns the instance of the VFS func (d *Dir) VFS() *VFS { + // No locking required return d.vfs } +// Fs returns the Fs that the Dir is on +func (d *Dir) Fs() fs.Fs { + // No locking required + return d.f +} + // Truncate changes the size of the named file. func (d *Dir) Truncate(size int64) error { return ENOSYS diff --git a/vfs/file.go b/vfs/file.go index c330f33db..f40d299db 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -18,13 +18,19 @@ import ( // both have locks there is plenty of potential for deadlocks. In // order to mitigate this, we use the following conventions // -// File may read directly, without locking, from the read-only section -// of the Dir object, eg vfs, and f members. File may **not** read any -// other members directly. +// File may **only** call these methods from Dir with the File lock +// held. // -// File may **not** call Dir methods with the File lock held. This -// preserves total lock ordering and makes File subordinate to Dir as -// far as locking is concerned, preventing deadlocks. +// Dir.Fs +// Dir.VFS +// +// (As these are read only and do not need to take the Dir mutex.) +// +// File may **not** call any other Dir methods with the File lock +// held. This preserves total lock ordering and makes File subordinate +// to Dir as far as locking is concerned, preventing deadlocks. +// +// File may **not** read any members of Dir directly. // File represents a file type File struct { @@ -162,8 +168,8 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error { oldPendingRenameFun := f.pendingRenameFun f.mu.RUnlock() - if features := d.f.Features(); features.Move == nil && features.Copy == nil { - err := errors.Errorf("Fs %q can't rename files (no server side Move or Copy)", d.f) + if features := d.Fs().Features(); features.Move == nil && features.Copy == nil { + err := errors.Errorf("Fs %q can't rename files (no server side Move or Copy)", d.Fs()) fs.Errorf(f.Path(), "Dir.Rename error: %v", err) return err } @@ -191,8 +197,8 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error { } // do the move of the remote object - dstOverwritten, _ := d.f.NewObject(ctx, newPath) - newObject, err := operations.Move(ctx, d.f, dstOverwritten, newPath, o) + dstOverwritten, _ := d.Fs().NewObject(ctx, newPath) + newObject, err := operations.Move(ctx, d.Fs(), dstOverwritten, newPath, o) if err != nil { fs.Errorf(f.Path(), "File.Rename error: %v", err) return err @@ -611,7 +617,7 @@ func (f *File) VFS() *VFS { func (f *File) Fs() fs.Fs { f.mu.RLock() defer f.mu.RUnlock() - return f.d.f + return f.d.Fs() } // Open a file according to the flags provided diff --git a/vfs/read_write.go b/vfs/read_write.go index 2344f1223..a19fa94ca 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -55,12 +55,12 @@ func newRWFileHandle(d *Dir, f *File, flags int) (fh *RWFileHandle, err error) { } // mark the file as open in the cache - must be done before the mkdir - fh.d.vfs.cache.open(fh.file.Path()) + fh.d.VFS().cache.open(fh.file.Path()) // Make a place for the file - _, err = d.vfs.cache.mkdir(fh.file.Path()) + _, err = d.VFS().cache.mkdir(fh.file.Path()) if err != nil { - fh.d.vfs.cache.close(fh.file.Path()) + fh.d.VFS().cache.close(fh.file.Path()) return nil, errors.Wrap(err, "open RW handle failed to make cache directory") } @@ -110,9 +110,9 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) { // If the remote object exists AND its cached file exists locally AND there are no // other RW handles with it open, then attempt to update it. if o != nil && fh.file.rwOpens() == 0 { - cacheObj, err := fh.d.vfs.cache.f.NewObject(context.TODO(), fh.file.Path()) + cacheObj, err := fh.d.VFS().cache.f.NewObject(context.TODO(), fh.file.Path()) if err == nil && cacheObj != nil { - _, err = copyObj(fh.d.vfs.cache.f, cacheObj, fh.file.Path(), o) + _, err = copyObj(fh.d.VFS().cache.f, cacheObj, fh.file.Path(), o) if err != nil { return errors.Wrap(err, "open RW handle failed to update cached file") } @@ -125,7 +125,7 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) { // cache file does not exist, so need to fetch it if we have an object to fetch // it from if o != nil { - _, err = copyObj(fh.d.vfs.cache.f, nil, fh.file.Path(), o) + _, err = copyObj(fh.d.VFS().cache.f, nil, fh.file.Path(), o) if err != nil { cause := errors.Cause(err) if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound { @@ -277,7 +277,7 @@ func (fh *RWFileHandle) flushWrites(closeFile bool) error { if isCopied { // Transfer the temp file to the remote - cacheObj, err := fh.d.vfs.cache.f.NewObject(context.TODO(), fh.file.Path()) + cacheObj, err := fh.d.VFS().cache.f.NewObject(context.TODO(), fh.file.Path()) if err != nil { err = errors.Wrap(err, "failed to find cache file") fs.Errorf(fh.logPrefix(), "%v", err) @@ -289,7 +289,7 @@ func (fh *RWFileHandle) flushWrites(closeFile bool) error { if objOld != nil { objPath = objOld.Remote() // use the path of the actual object if available } - o, err := copyObj(fh.d.vfs.f, objOld, objPath, cacheObj) + o, err := copyObj(fh.d.VFS().f, objOld, objPath, cacheObj) if err != nil { err = errors.Wrap(err, "failed to transfer file from cache to remote") fs.Errorf(fh.logPrefix(), "%v", err) @@ -322,7 +322,7 @@ func (fh *RWFileHandle) close() (err error) { if fh.opened { fh.file.delRWOpen() } - fh.d.vfs.cache.close(fh.file.Path()) + fh.d.VFS().cache.close(fh.file.Path()) }() return fh.flushWrites(true) diff --git a/vfs/vfs.go b/vfs/vfs.go index 2f35a0a8f..2fa71db69 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -305,21 +305,7 @@ func (vfs *VFS) WaitForWriters(timeout time.Duration) { defer tick.Stop() tick.Stop() for { - writers := 0 - vfs.root.walk(func(d *Dir) { - fs.Debugf(d.path, "Looking for writers") - // NB d.mu is held by walk() here - for leaf, item := range d.items { - fs.Debugf(leaf, "reading active writers") - if file, ok := item.(*File); ok { - n := file.activeWriters() - if n != 0 { - fs.Debugf(file, "active writers %d", n) - } - writers += n - } - } - }) + writers := vfs.root.countActiveWriters() if writers == 0 { return }