diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go index 0c2f213fc..a705a182d 100644 --- a/vfs/vfscache/cache.go +++ b/vfs/vfscache/cache.go @@ -20,6 +20,7 @@ import ( "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/lib/file" + "github.com/rclone/rclone/vfs/vfscache/writeback" "github.com/rclone/rclone/vfs/vfscommon" ) @@ -35,15 +36,15 @@ import ( // Cache opened files type Cache struct { // read only - no locking needed to read these - fremote fs.Fs // fs for the remote we are caching - fcache fs.Fs // fs for the cache directory - fcacheMeta fs.Fs // fs for the cache metadata directory - opt *vfscommon.Options // vfs Options - root string // root of the cache directory - metaRoot string // root of the cache metadata directory - hashType hash.Type // hash to use locally and remotely - hashOption *fs.HashesOption // corresponding OpenOption - writeback *writeBack // holds Items for writeback + fremote fs.Fs // fs for the remote we are caching + fcache fs.Fs // fs for the cache directory + fcacheMeta fs.Fs // fs for the cache metadata directory + opt *vfscommon.Options // vfs Options + root string // root of the cache directory + metaRoot string // root of the cache metadata directory + hashType hash.Type // hash to use locally and remotely + hashOption *fs.HashesOption // corresponding OpenOption + writeback *writeback.WriteBack // holds Items for writeback mu sync.Mutex // protects the following variables item map[string]*Item // files/directories in the cache @@ -88,7 +89,7 @@ func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options) (*Cache, er item: make(map[string]*Item), hashType: hashType, hashOption: hashOption, - writeback: newWriteBack(ctx, opt), + writeback: writeback.New(ctx, opt), } // Make sure cache directories exist @@ -510,7 +511,7 @@ func (c *Cache) clean() { } } c.mu.Unlock() - uploadsInProgress, uploadsQueued := c.writeback.getStats() + uploadsInProgress, uploadsQueued := c.writeback.Stats() fs.Infof(nil, "Cleaned the cache: objects %d (was %d) in use %d, to upload %d, uploading %d, total size %v (was %v)", newItems, oldItems, totalInUse, uploadsQueued, uploadsInProgress, newUsed, oldUsed) } diff --git a/vfs/vfscache/downloader.go b/vfs/vfscache/downloaders/downloaders.go similarity index 84% rename from vfs/vfscache/downloader.go rename to vfs/vfscache/downloaders/downloaders.go index 6294e1465..8347b0cd1 100644 --- a/vfs/vfscache/downloader.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -1,4 +1,4 @@ -package vfscache +package downloaders import ( "context" @@ -12,6 +12,7 @@ import ( "github.com/rclone/rclone/fs/chunkedreader" "github.com/rclone/rclone/fs/log" "github.com/rclone/rclone/lib/ranges" + "github.com/rclone/rclone/vfs/vfscommon" ) // FIXME implement max downloaders @@ -27,16 +28,37 @@ const ( maxErrorCount = 10 ) -// downloaders is a number of downloader~s and a queue of waiters -// waiting for segments to be downloaded. -type downloaders struct { +// Item is the interface that an item to download must obey +type Item interface { + // FindMissing adjusts r returning a new ranges.Range which only + // contains the range which needs to be downloaded. This could be + // empty - check with IsEmpty. It also adjust this to make sure it is + // not larger than the file. + FindMissing(r ranges.Range) (outr ranges.Range) + + // HasRange returns true if the current ranges entirely include range + HasRange(r ranges.Range) bool + + // WriteAtNoOverwrite writes b to the file, but will not overwrite + // already present ranges. + // + // This is used by the downloader to write bytes to the file + // + // It returns n the total bytes processed and skipped the number of + // bytes which were processed but not actually written to the file. + WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) +} + +// Downloaders is a number of downloader~s and a queue of waiters +// waiting for segments to be downloaded to a file. +type Downloaders struct { // Write once - no locking required ctx context.Context cancel context.CancelFunc - item *Item + item Item + opt *vfscommon.Options src fs.Object // source object remote string - fcache fs.Fs // destination Fs wg sync.WaitGroup // Read write @@ -57,7 +79,7 @@ type waiter struct { // downloader represents a running download for part of a file. type downloader struct { // Write once - dls *downloaders // parent structure + dls *Downloaders // parent structure quit chan struct{} // close to quit the downloader wg sync.WaitGroup // to keep track of downloader goroutine kick chan struct{} // kick the downloader when needed @@ -74,18 +96,19 @@ type downloader struct { stop bool // set to true if we have called _stop() } -func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls *downloaders) { +// New makes a downloader for item +func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) { if src == nil { panic("internal error: newDownloaders called with nil src object") } ctx, cancel := context.WithCancel(context.Background()) - dls = &downloaders{ + dls = &Downloaders{ ctx: ctx, cancel: cancel, item: item, + opt: opt, src: src, remote: remote, - fcache: fcache, } dls.wg.Add(1) go func() { @@ -114,7 +137,7 @@ func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls // err is error from download // // call with lock held -func (dls *downloaders) _countErrors(n int64, err error) { +func (dls *Downloaders) _countErrors(n int64, err error) { if err == nil && n != 0 { if dls.errorCount != 0 { fs.Infof(dls.src, "Resetting error count to 0") @@ -130,7 +153,7 @@ func (dls *downloaders) _countErrors(n int64, err error) { } } -func (dls *downloaders) countErrors(n int64, err error) { +func (dls *Downloaders) countErrors(n int64, err error) { dls.mu.Lock() dls._countErrors(n, err) dls.mu.Unlock() @@ -139,7 +162,7 @@ func (dls *downloaders) countErrors(n int64, err error) { // Make a new downloader, starting it to download r // // call with lock held -func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) { +func (dls *Downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) { defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) dl = &downloader{ @@ -180,7 +203,7 @@ func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err erro // _removeClosed() removes any downloaders which are closed. // // Call with the mutex held -func (dls *downloaders) _removeClosed() { +func (dls *Downloaders) _removeClosed() { newDownloaders := dls.dls[:0] for _, dl := range dls.dls { if !dl.closed() { @@ -192,7 +215,7 @@ func (dls *downloaders) _removeClosed() { // Close all running downloaders and return any unfulfilled waiters // with inErr -func (dls *downloaders) close(inErr error) (err error) { +func (dls *Downloaders) Close(inErr error) (err error) { dls.mu.Lock() defer dls.mu.Unlock() dls._removeClosed() @@ -212,8 +235,9 @@ func (dls *downloaders) close(inErr error) (err error) { return err } -// Ensure a downloader is running to download r -func (dls *downloaders) ensure(r ranges.Range) (err error) { +// Download the range passed in returning when it has been downloaded +// with an error from the downloading go routine. +func (dls *Downloaders) Download(r ranges.Range) (err error) { defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err) dls.mu.Lock() @@ -238,7 +262,7 @@ func (dls *downloaders) ensure(r ranges.Range) (err error) { // close any waiters with the error passed in // // call with lock held -func (dls *downloaders) _closeWaiters(err error) { +func (dls *Downloaders) _closeWaiters(err error) { for _, waiter := range dls.waiters { waiter.errChan <- err } @@ -249,7 +273,7 @@ func (dls *downloaders) _closeWaiters(err error) { // then it starts it. // // call with lock held -func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) { +func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { // FIXME this window could be a different config var? window := int64(fs.Config.BufferSize) @@ -258,7 +282,7 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) { // read some stuff already. // // Clip r to stuff which needs downloading - r = dls.item.findMissing(r) + r = dls.item.FindMissing(r) // If the range is entirely present then we only need to start a // dowloader if the window isn't full. @@ -269,7 +293,7 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) { rWindow.Size = window } // Clip rWindow to stuff which needs downloading - rWindow = dls.item.findMissing(rWindow) + rWindow = dls.item.FindMissing(rWindow) // If rWindow is empty then just return without starting a // downloader as there is no data within the window which needs // downloading. @@ -310,9 +334,11 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) { return err } -// ensure a downloader is running for offset if required. If one -// isn't found then it starts it -func (dls *downloaders) ensureDownloader(r ranges.Range) (err error) { +// EnsureDownloader makes sure a downloader is running for the range +// passed in. If one isn't found then it starts it. +// +// It does not wait for the range to be downloaded +func (dls *Downloaders) EnsureDownloader(r ranges.Range) (err error) { dls.mu.Lock() defer dls.mu.Unlock() return dls._ensureDownloader(r) @@ -322,14 +348,14 @@ func (dls *downloaders) ensureDownloader(r ranges.Range) (err error) { // their callers. // // Call with the mutex held -func (dls *downloaders) _dispatchWaiters() { +func (dls *Downloaders) _dispatchWaiters() { if len(dls.waiters) == 0 { return } newWaiters := dls.waiters[:0] for _, waiter := range dls.waiters { - if dls.item.hasRange(waiter.r) { + if dls.item.HasRange(waiter.r) { waiter.errChan <- nil } else { newWaiters = append(newWaiters, waiter) @@ -340,7 +366,7 @@ func (dls *downloaders) _dispatchWaiters() { // Send any waiters which have completed back to their callers and make sure // there is a downloader appropriate for each waiter -func (dls *downloaders) kickWaiters() (err error) { +func (dls *Downloaders) kickWaiters() (err error) { dls.mu.Lock() defer dls.mu.Unlock() @@ -351,7 +377,7 @@ func (dls *downloaders) kickWaiters() (err error) { } // Make sure each waiter has a downloader - // This is an O(waiters*downloaders) algorithm + // This is an O(waiters*Downloaders) algorithm // However the number of waiters and the number of downloaders // are both expected to be small. for _, waiter := range dls.waiters { @@ -421,7 +447,7 @@ func (dl *downloader) Write(p []byte) (n int, err error) { } } - n, skipped, err := dl.dls.item.writeAtNoOverwrite(p, dl.offset) + n, skipped, err := dl.dls.item.WriteAtNoOverwrite(p, dl.offset) if skipped == n { dl.skipped += int64(skipped) } else { @@ -457,7 +483,7 @@ func (dl *downloader) open(offset int64) (err error) { // } // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, fs.Config.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption) - in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.item.c.opt.ChunkSize), int64(dl.dls.item.c.opt.ChunkSizeLimit)) + in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit)) _, err = in0.Seek(offset, 0) if err != nil { return errors.Wrap(err, "vfs reader: failed to open source file") diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go index 559e11623..9d27d57c8 100644 --- a/vfs/vfscache/item.go +++ b/vfs/vfscache/item.go @@ -15,6 +15,8 @@ import ( "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/ranges" + "github.com/rclone/rclone/vfs/vfscache/downloaders" + "github.com/rclone/rclone/vfs/vfscache/writeback" ) // NB as Cache and Item are tightly linked it is necessary to have a @@ -47,15 +49,16 @@ type Item struct { // read only c *Cache // cache this is part of - mu sync.Mutex // protect the variables - name string // name in the VFS - opens int // number of times file is open - downloaders *downloaders // a record of the downloaders in action - may be nil - o fs.Object // object we are caching - may be nil - fd *os.File // handle we are using to read and write to the file - metaDirty bool // set if the info needs writeback - modified bool // set if the file has been modified since the last Open - info Info // info about the file to persist to backing store + mu sync.Mutex // protect the variables + name string // name in the VFS + opens int // number of times file is open + downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil + o fs.Object // object we are caching - may be nil + fd *os.File // handle we are using to read and write to the file + metaDirty bool // set if the info needs writeback + modified bool // set if the file has been modified since the last Open + info Info // info about the file to persist to backing store + writeBackID writeback.Handle // id of any writebacks in progress } @@ -370,7 +373,7 @@ func (item *Item) _dirty() { if !item.modified { item.modified = true item.mu.Unlock() - item.c.writeback.remove(item) + item.c.writeback.Remove(item.writeBackID) item.mu.Lock() } if !item.info.Dirty { @@ -464,7 +467,7 @@ func (item *Item) Open(o fs.Object) (err error) { // Create the downloaders if item.o != nil { - item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o) + item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o) } return err @@ -523,7 +526,7 @@ func (item *Item) store(ctx context.Context, storeFn StoreFn) (err error) { func (item *Item) Close(storeFn StoreFn) (err error) { defer log.Trace(item.o, "Item.Close")("err=%v", &err) var ( - downloaders *downloaders + downloaders *downloaders.Downloaders syncWriteBack = item.c.opt.WriteBack <= 0 ) item.mu.Lock() @@ -575,7 +578,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) { // downloader.Write calls ensure which needs the lock // close downloader with mutex unlocked item.mu.Unlock() - checkErr(downloaders.close(nil)) + checkErr(downloaders.Close(nil)) item.mu.Lock() } @@ -609,8 +612,10 @@ func (item *Item) Close(storeFn StoreFn) (err error) { checkErr(item._store(context.Background(), storeFn)) } else { // asynchronous writeback + item.c.writeback.SetID(&item.writeBackID) + id := item.writeBackID item.mu.Unlock() - item.c.writeback.add(item, item.name, item.modified, func(ctx context.Context) error { + item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error { return item.store(ctx, storeFn) }) item.mu.Lock() @@ -733,7 +738,7 @@ func (item *Item) _removeMeta(reason string) { func (item *Item) _remove(reason string) (wasWriting bool) { // Cancel writeback, if any item.mu.Unlock() - wasWriting = item.c.writeback.remove(item) + wasWriting = item.c.writeback.Remove(item.writeBackID) item.mu.Lock() item.info.clean() item.metaDirty = false @@ -766,18 +771,18 @@ func (item *Item) present() bool { return item._present() } -// hasRange returns true if the current ranges entirely include range -func (item *Item) hasRange(r ranges.Range) bool { +// HasRange returns true if the current ranges entirely include range +func (item *Item) HasRange(r ranges.Range) bool { item.mu.Lock() defer item.mu.Unlock() return item.info.Rs.Present(r) } -// findMissing adjusts r returning a new ranges.Range which only +// FindMissing adjusts r returning a new ranges.Range which only // contains the range which needs to be downloaded. This could be // empty - check with IsEmpty. It also adjust this to make sure it is // not larger than the file. -func (item *Item) findMissing(r ranges.Range) (outr ranges.Range) { +func (item *Item) FindMissing(r ranges.Range) (outr ranges.Range) { item.mu.Lock() defer item.mu.Unlock() outr = item.info.Rs.FindMissing(r) @@ -805,12 +810,12 @@ func (item *Item) _ensure(offset, size int64) (err error) { return nil } // Otherwise start the downloader for the future if required - return item.downloaders.ensureDownloader(r) + return item.downloaders.EnsureDownloader(r) } if item.downloaders == nil { return errors.New("internal error: downloaders is nil") } - return item.downloaders.ensure(r) + return item.downloaders.Download(r) } // _written marks the (offset, size) as present in the backing file @@ -925,14 +930,14 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) { return n, err } -// writeAtNoOverwrite writes b to the file, but will not overwrite +// WriteAtNoOverwrite writes b to the file, but will not overwrite // already present ranges. // // This is used by the downloader to write bytes to the file // // It returns n the total bytes processed and skipped the number of // bytes which were processed but not actually written to the file. -func (item *Item) writeAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) { +func (item *Item) WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) { item.mu.Lock() var ( @@ -1002,11 +1007,11 @@ func (item *Item) Sync() (err error) { // rename the item func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) { - var downloaders *downloaders + var downloaders *downloaders.Downloaders // close downloader with mutex unlocked defer func() { if downloaders != nil { - _ = downloaders.close(nil) + _ = downloaders.Close(nil) } }() diff --git a/vfs/vfscache/writeback.go b/vfs/vfscache/writeback/writeback.go similarity index 70% rename from vfs/vfscache/writeback.go rename to vfs/vfscache/writeback/writeback.go index c82123535..dd5a1fd06 100644 --- a/vfs/vfscache/writeback.go +++ b/vfs/vfscache/writeback/writeback.go @@ -1,11 +1,12 @@ -// This keeps track of the files which need to be written back - -package vfscache +// Package writeback keeps track of the files which need to be written +// back to storage +package writeback import ( "container/heap" "context" "sync" + "sync/atomic" "time" "github.com/rclone/rclone/fs" @@ -17,30 +18,35 @@ const ( maxUploadDelay = 5 * time.Minute // max delay betwen upload attempts ) -// putFn is the interface that item provides to store the data -type putFn func(context.Context) error +// PutFn is the interface that item provides to store the data +type PutFn func(context.Context) error -// writeBack keeps track of the items which need to be written back to the disk at some point -type writeBack struct { +// Handle is returned for callers to keep track of writeback items +type Handle uint64 + +// WriteBack keeps track of the items which need to be written back to the disk at some point +type WriteBack struct { ctx context.Context mu sync.Mutex - items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only - lookup map[*Item]*writeBackItem // for getting a *writeBackItem from a *Item - writeBackItems are in here until cancelled - opt *vfscommon.Options // VFS options - timer *time.Timer // next scheduled time for the uploader - expiry time.Time // time the next item exires or IsZero - uploads int // number of uploads in progress - id uint64 // id of the last writeBackItem created + items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only + lookup map[Handle]*writeBackItem // for getting a *writeBackItem from a Handle - writeBackItems are in here until cancelled + opt *vfscommon.Options // VFS options + timer *time.Timer // next scheduled time for the uploader + expiry time.Time // time the next item exires or IsZero + uploads int // number of uploads in progress + + // read and written with atomic + id Handle // id of the last writeBackItem created } -// make a new writeBack +// New make a new WriteBack // -// cancel the context to stop the background goroutine -func newWriteBack(ctx context.Context, opt *vfscommon.Options) *writeBack { - wb := &writeBack{ +// cancel the context to stop the background processing +func New(ctx context.Context, opt *vfscommon.Options) *WriteBack { + wb := &WriteBack{ ctx: ctx, items: writeBackItems{}, - lookup: make(map[*Item]*writeBackItem), + lookup: make(map[Handle]*writeBackItem), opt: opt, } heap.Init(&wb.items) @@ -56,15 +62,14 @@ func newWriteBack(ctx context.Context, opt *vfscommon.Options) *writeBack { // writeBack.mu must be held to manipulate this type writeBackItem struct { name string // name of the item so we don't have to read it from item - id uint64 // id of the item + id Handle // id of the item index int // index into the priority queue for update - item *Item // Item that needs writeback expiry time.Time // When this expires we will write it back uploading bool // True if item is being processed by upload() method onHeap bool // true if this item is on the items heap cancel context.CancelFunc // To cancel the upload with done chan struct{} // closed when the cancellation completes - putFn putFn // To write the object data + putFn PutFn // To write the object data tries int // number of times we have tried to upload delay time.Duration // delay between upload attempts } @@ -118,7 +123,7 @@ func (ws *writeBackItems) _update(item *writeBackItem, expiry time.Time) { // return a new expiry time based from now until the WriteBack timeout // // call with lock held -func (wb *writeBack) _newExpiry() time.Time { +func (wb *WriteBack) _newExpiry() time.Time { expiry := time.Now() if wb.opt.WriteBack > 0 { expiry = expiry.Add(wb.opt.WriteBack) @@ -130,14 +135,13 @@ func (wb *writeBack) _newExpiry() time.Time { // make a new writeBackItem // // call with the lock held -func (wb *writeBack) _newItem(item *Item, name string) *writeBackItem { - wb.id++ +func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem { + wb.SetID(&id) wbItem := &writeBackItem{ name: name, - item: item, expiry: wb._newExpiry(), delay: wb.opt.WriteBack, - id: wb.id, + id: id, } wb._addItem(wbItem) wb._pushItem(wbItem) @@ -147,21 +151,21 @@ func (wb *writeBack) _newItem(item *Item, name string) *writeBackItem { // add a writeBackItem to the lookup map // // call with the lock held -func (wb *writeBack) _addItem(wbItem *writeBackItem) { - wb.lookup[wbItem.item] = wbItem +func (wb *WriteBack) _addItem(wbItem *writeBackItem) { + wb.lookup[wbItem.id] = wbItem } // delete a writeBackItem from the lookup map // // call with the lock held -func (wb *writeBack) _delItem(wbItem *writeBackItem) { - delete(wb.lookup, wbItem.item) +func (wb *WriteBack) _delItem(wbItem *writeBackItem) { + delete(wb.lookup, wbItem.id) } // pop a writeBackItem from the items heap // // call with the lock held -func (wb *writeBack) _popItem() (wbItem *writeBackItem) { +func (wb *WriteBack) _popItem() (wbItem *writeBackItem) { wbItem = heap.Pop(&wb.items).(*writeBackItem) wbItem.onHeap = false return wbItem @@ -170,7 +174,7 @@ func (wb *writeBack) _popItem() (wbItem *writeBackItem) { // push a writeBackItem onto the items heap // // call with the lock held -func (wb *writeBack) _pushItem(wbItem *writeBackItem) { +func (wb *WriteBack) _pushItem(wbItem *writeBackItem) { if !wbItem.onHeap { heap.Push(&wb.items, wbItem) wbItem.onHeap = true @@ -180,7 +184,7 @@ func (wb *writeBack) _pushItem(wbItem *writeBackItem) { // remove a writeBackItem from the items heap // // call with the lock held -func (wb *writeBack) _removeItem(wbItem *writeBackItem) { +func (wb *WriteBack) _removeItem(wbItem *writeBackItem) { if wbItem.onHeap { heap.Remove(&wb.items, wbItem.index) wbItem.onHeap = false @@ -190,7 +194,7 @@ func (wb *writeBack) _removeItem(wbItem *writeBackItem) { // peek the oldest writeBackItem - may be nil // // call with the lock held -func (wb *writeBack) _peekItem() (wbItem *writeBackItem) { +func (wb *WriteBack) _peekItem() (wbItem *writeBackItem) { if len(wb.items) == 0 { return nil } @@ -198,7 +202,7 @@ func (wb *writeBack) _peekItem() (wbItem *writeBackItem) { } // stop the timer which runs the expiries -func (wb *writeBack) _stopTimer() { +func (wb *WriteBack) _stopTimer() { if wb.expiry.IsZero() { return } @@ -211,7 +215,7 @@ func (wb *writeBack) _stopTimer() { } // reset the timer which runs the expiries -func (wb *writeBack) _resetTimer() { +func (wb *WriteBack) _resetTimer() { wbItem := wb._peekItem() if wbItem == nil { wb._stopTimer() @@ -234,17 +238,31 @@ func (wb *writeBack) _resetTimer() { } } -// add adds an item to the writeback queue or resets its timer if it +// SetID sets the Handle pointed to if it is non zero to the next +// handle. +func (wb *WriteBack) SetID(pid *Handle) { + if *pid == 0 { + *pid = Handle(atomic.AddUint64((*uint64)(&wb.id), 1)) + } +} + +// Add adds an item to the writeback queue or resets its timer if it // is already there. // -// if modified is false then it it doesn't a pending upload -func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) *writeBackItem { +// If id is 0 then a new item will always be created and the new +// Handle will be returned. +// +// Use SetID to create Handles in advance of calling Add +// +// If modified is false then it it doesn't cancel a pending upload if +// there is one as there is no need. +func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle { wb.mu.Lock() defer wb.mu.Unlock() - wbItem, ok := wb.lookup[item] + wbItem, ok := wb.lookup[id] if !ok { - wbItem = wb._newItem(item, name) + wbItem = wb._newItem(id, name) } else { if wbItem.uploading && modified { // We are uploading already so cancel the upload @@ -255,18 +273,19 @@ func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) *w } wbItem.putFn = putFn wb._resetTimer() - return wbItem + return wbItem.id } -// Call when a file is removed. This cancels a writeback if there is -// one and doesn't return the item to the queue. -func (wb *writeBack) remove(item *Item) (found bool) { +// Remove should be called when a file should be removed from the +// writeback queue. This cancels a writeback if there is one and +// doesn't return the item to the queue. +func (wb *WriteBack) Remove(id Handle) (found bool) { wb.mu.Lock() defer wb.mu.Unlock() - wbItem, found := wb.lookup[item] + wbItem, found := wb.lookup[id] if found { - fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %p", wbItem.uploading, wbItem, wbItem.item) + fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %d", wbItem.uploading, wbItem, wbItem.id) if wbItem.uploading { // We are uploading already so cancel the upload wb._cancelUpload(wbItem) @@ -283,7 +302,7 @@ func (wb *writeBack) remove(item *Item) (found bool) { // upload the item - called as a goroutine // // uploading will have been incremented here already -func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { +func (wb *WriteBack) upload(ctx context.Context, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() putFn := wbItem.putFn @@ -327,7 +346,7 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { // cancel the upload - the item should be on the heap after this returns // // call with lock held -func (wb *writeBack) _cancelUpload(wbItem *writeBackItem) { +func (wb *WriteBack) _cancelUpload(wbItem *writeBackItem) { if !wbItem.uploading { return } @@ -351,10 +370,10 @@ func (wb *writeBack) _cancelUpload(wbItem *writeBackItem) { // cancelUpload cancels the upload of the item if there is one in progress // // it returns true if there was an upload in progress -func (wb *writeBack) cancelUpload(item *Item) bool { +func (wb *WriteBack) cancelUpload(id Handle) bool { wb.mu.Lock() defer wb.mu.Unlock() - wbItem, ok := wb.lookup[item] + wbItem, ok := wb.lookup[id] if !ok || !wbItem.uploading { return false } @@ -363,7 +382,7 @@ func (wb *writeBack) cancelUpload(item *Item) bool { } // this uploads as many items as possible -func (wb *writeBack) processItems(ctx context.Context) { +func (wb *WriteBack) processItems(ctx context.Context) { wb.mu.Lock() defer wb.mu.Unlock() @@ -397,8 +416,8 @@ func (wb *writeBack) processItems(ctx context.Context) { } } -// return the number of uploads in progress -func (wb *writeBack) getStats() (uploadsInProgress, uploadsQueued int) { +// Stats return the number of uploads in progress and queued +func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) { wb.mu.Lock() defer wb.mu.Unlock() return wb.uploads, len(wb.items) diff --git a/vfs/vfscache/writeback_test.go b/vfs/vfscache/writeback/writeback_test.go similarity index 84% rename from vfs/vfscache/writeback_test.go rename to vfs/vfscache/writeback/writeback_test.go index 2e22d7d61..135dd796c 100644 --- a/vfs/vfscache/writeback_test.go +++ b/vfs/vfscache/writeback/writeback_test.go @@ -1,4 +1,4 @@ -package vfscache +package writeback import ( "container/heap" @@ -15,16 +15,16 @@ import ( "github.com/stretchr/testify/assert" ) -func newTestWriteBack(t *testing.T) (wb *writeBack, cancel func()) { +func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) { ctx, cancel := context.WithCancel(context.Background()) opt := vfscommon.DefaultOpt opt.WriteBack = 100 * time.Millisecond - wb = newWriteBack(ctx, &opt) + wb = New(ctx, &opt) return wb, cancel } // string for debugging - make a copy and pop the items out in order -func (wb *writeBack) string(t *testing.T) string { +func (wb *WriteBack) string(t *testing.T) string { wb.mu.Lock() defer wb.mu.Unlock() ws := wb.items @@ -54,7 +54,7 @@ func TestWriteBackItems(t *testing.T) { wbItem2 := writeBackItem{name: "two", expiry: now.Add(2 * time.Second)} wbItem3 := writeBackItem{name: "three", expiry: now.Add(4 * time.Second)} - wb := &writeBack{ + wb := &WriteBack{ items: writeBackItems{}, } @@ -80,7 +80,7 @@ func TestWriteBackItems(t *testing.T) { assert.Equal(t, "one,two,three", wb.string(t)) } -func checkOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) { +func checkOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() assert.True(t, wbItem.onHeap) @@ -92,7 +92,7 @@ func checkOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) { assert.Failf(t, "expecting %q on heap", wbItem.name) } -func checkNotOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) { +func checkNotOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() assert.False(t, wbItem.onHeap) @@ -103,35 +103,34 @@ func checkNotOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) { } } -func checkInLookup(t *testing.T, wb *writeBack, wbItem *writeBackItem) { +func checkInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() - assert.Equal(t, wbItem, wb.lookup[wbItem.item]) + assert.Equal(t, wbItem, wb.lookup[wbItem.id]) } -func checkNotInLookup(t *testing.T, wb *writeBack, wbItem *writeBackItem) { +func checkNotInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() - assert.Nil(t, wb.lookup[wbItem.item]) + assert.Nil(t, wb.lookup[wbItem.id]) } func TestWriteBackItemCRUD(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item1, item2, item3 := &Item{}, &Item{}, &Item{} // _peekItem empty assert.Nil(t, wb._peekItem()) - wbItem1 := wb._newItem(item1, "one") + wbItem1 := wb._newItem(0, "one") checkOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) - wbItem2 := wb._newItem(item2, "two") + wbItem2 := wb._newItem(0, "two") checkOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) - wbItem3 := wb._newItem(item3, "three") + wbItem3 := wb._newItem(0, "three") checkOnHeap(t, wb, wbItem3) checkInLookup(t, wb, wbItem3) @@ -180,7 +179,7 @@ func TestWriteBackItemCRUD(t *testing.T) { assert.Equal(t, "one,three", wb.string(t)) } -func assertTimerRunning(t *testing.T, wb *writeBack, running bool) { +func assertTimerRunning(t *testing.T, wb *WriteBack, running bool) { wb.mu.Lock() if running { assert.NotEqual(t, time.Time{}, wb.expiry) @@ -202,7 +201,7 @@ func TestWriteBackResetTimer(t *testing.T) { // Check timer is stopped assertTimerRunning(t, wb, false) - _ = wb._newItem(&Item{}, "three") + _ = wb._newItem(0, "three") // Reset the timer on an queue with stuff wb._resetTimer() @@ -230,7 +229,7 @@ func newPutItem(t *testing.T) *putItem { } } -// put the object as per putFn interface +// put the object as per PutFn interface func (pi *putItem) put(ctx context.Context) (err error) { pi.wg.Add(1) defer pi.wg.Done() @@ -274,7 +273,7 @@ func (pi *putItem) finish(err error) { pi.wg.Wait() } -func waitUntilNoTransfers(t *testing.T, wb *writeBack) { +func waitUntilNoTransfers(t *testing.T, wb *WriteBack) { for i := 0; i < 100; i++ { wb.mu.Lock() uploads := wb.uploads @@ -292,10 +291,15 @@ func TestWriteBackAddOK(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} pi := newPutItem(t) - wbItem := wb.add(item, "one", true, pi.put) + var inID Handle + wb.SetID(&inID) + assert.Equal(t, Handle(1), inID) + + id := wb.Add(inID, "one", true, pi.put) + assert.Equal(t, inID, id) + wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) @@ -315,10 +319,10 @@ func TestWriteBackAddFailRetry(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} pi := newPutItem(t) - wbItem := wb.add(item, "one", true, pi.put) + id := wb.Add(0, "one", true, pi.put) + wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) @@ -348,10 +352,10 @@ func TestWriteBackAddUpdate(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} pi := newPutItem(t) - wbItem := wb.add(item, "one", true, pi.put) + id := wb.Add(0, "one", true, pi.put) + wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) @@ -363,8 +367,8 @@ func TestWriteBackAddUpdate(t *testing.T) { // Now the upload has started add another one pi2 := newPutItem(t) - wbItem2 := wb.add(item, "one", true, pi2.put) - assert.Equal(t, wbItem, wbItem2) + id2 := wb.Add(id, "one", true, pi2.put) + assert.Equal(t, id, id2) checkOnHeap(t, wb, wbItem) // object awaiting writeback time checkInLookup(t, wb, wbItem) @@ -387,10 +391,10 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} pi := newPutItem(t) - wbItem := wb.add(item, "one", false, pi.put) + id := wb.Add(0, "one", false, pi.put) + wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) @@ -402,8 +406,8 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) { // Now the upload has started add another one pi2 := newPutItem(t) - wbItem2 := wb.add(item, "one", false, pi2.put) - assert.Equal(t, wbItem, wbItem2) + id2 := wb.Add(id, "one", false, pi2.put) + assert.Equal(t, id, id2) checkNotOnHeap(t, wb, wbItem) // object still being transfered checkInLookup(t, wb, wbItem) @@ -426,10 +430,10 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} pi := newPutItem(t) - wbItem := wb.add(item, "one", true, pi.put) + id := wb.Add(0, "one", true, pi.put) + wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) @@ -437,8 +441,8 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) { // Immediately add another upload before the first has started pi2 := newPutItem(t) - wbItem2 := wb.add(item, "one", true, pi2.put) - assert.Equal(t, wbItem, wbItem2) + id2 := wb.Add(id, "one", true, pi2.put) + assert.Equal(t, id, id2) checkOnHeap(t, wb, wbItem) // object still awaiting transfer checkInLookup(t, wb, wbItem) @@ -464,25 +468,24 @@ func TestWriteBackGetStats(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} pi := newPutItem(t) - wb.add(item, "one", true, pi.put) + wb.Add(0, "one", true, pi.put) - inProgress, queued := wb.getStats() + inProgress, queued := wb.Stats() assert.Equal(t, queued, 1) assert.Equal(t, inProgress, 0) <-pi.started - inProgress, queued = wb.getStats() + inProgress, queued = wb.Stats() assert.Equal(t, queued, 0) assert.Equal(t, inProgress, 1) pi.finish(nil) // transfer successful waitUntilNoTransfers(t, wb) - inProgress, queued = wb.getStats() + inProgress, queued = wb.Stats() assert.Equal(t, queued, 0) assert.Equal(t, inProgress, 0) @@ -501,10 +504,10 @@ func TestWriteBackMaxQueue(t *testing.T) { for i := 0; i < toTransfer; i++ { pi := newPutItem(t) pis = append(pis, pi) - wb.add(&Item{}, fmt.Sprintf("number%d", 1), true, pi.put) + wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put) } - inProgress, queued := wb.getStats() + inProgress, queued := wb.Stats() assert.Equal(t, toTransfer, queued) assert.Equal(t, 0, inProgress) @@ -516,7 +519,7 @@ func TestWriteBackMaxQueue(t *testing.T) { // timer should be stopped now assertTimerRunning(t, wb, false) - inProgress, queued = wb.getStats() + inProgress, queued = wb.Stats() assert.Equal(t, toTransfer-maxTransfers, queued) assert.Equal(t, maxTransfers, inProgress) @@ -532,7 +535,7 @@ func TestWriteBackMaxQueue(t *testing.T) { } waitUntilNoTransfers(t, wb) - inProgress, queued = wb.getStats() + inProgress, queued = wb.Stats() assert.Equal(t, queued, 0) assert.Equal(t, inProgress, 0) } @@ -541,26 +544,26 @@ func TestWriteBackRemove(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} - // cancel when not in writeback - assert.False(t, wb.remove(item)) + assert.False(t, wb.Remove(1)) // add item pi1 := newPutItem(t) - wbItem := wb.add(item, "one", true, pi1.put) + id := wb.Add(0, "one", true, pi1.put) + wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // cancel when not uploading - assert.True(t, wb.remove(item)) + assert.True(t, wb.Remove(id)) checkNotOnHeap(t, wb, wbItem) checkNotInLookup(t, wb, wbItem) assert.False(t, pi1.cancelled) // add item pi2 := newPutItem(t) - wbItem = wb.add(item, "one", true, pi2.put) + id = wb.Add(id, "one", true, pi2.put) + wbItem = wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) @@ -568,7 +571,7 @@ func TestWriteBackRemove(t *testing.T) { <-pi2.started // cancel when uploading - assert.True(t, wb.remove(item)) + assert.True(t, wb.Remove(id)) checkNotOnHeap(t, wb, wbItem) checkNotInLookup(t, wb, wbItem) assert.True(t, pi2.cancelled) @@ -578,19 +581,18 @@ func TestWriteBackCancelUpload(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - item := &Item{} - // cancel when not in writeback - assert.False(t, wb.cancelUpload(item)) + assert.False(t, wb.cancelUpload(1)) // add item pi := newPutItem(t) - wbItem := wb.add(item, "one", true, pi.put) + id := wb.Add(0, "one", true, pi.put) + wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // cancel when not uploading - assert.False(t, wb.cancelUpload(item)) + assert.False(t, wb.cancelUpload(id)) checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) @@ -600,7 +602,7 @@ func TestWriteBackCancelUpload(t *testing.T) { checkInLookup(t, wb, wbItem) // cancel when uploading - assert.True(t, wb.cancelUpload(item)) + assert.True(t, wb.cancelUpload(id)) checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.True(t, pi.cancelled)