package downloaders import ( "context" "sync" "time" "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/asyncreader" "github.com/rclone/rclone/fs/chunkedreader" "github.com/rclone/rclone/lib/ranges" "github.com/rclone/rclone/vfs/vfscommon" ) // FIXME implement max downloaders const ( // max time a downloader can be idle before closing itself maxDownloaderIdleTime = 5 * time.Second // max number of bytes a reader should skip over before closing it maxSkipBytes = 1024 * 1024 // time between background kicks of waiters to pick up errors backgroundKickerInterval = 5 * time.Second // maximum number of errors before declaring dead maxErrorCount = 10 ) // Item is the interface that an item to download must obey type Item interface { // FindMissing adjusts r returning a new ranges.Range which only // contains the range which needs to be downloaded. This could be // empty - check with IsEmpty. It also adjust this to make sure it is // not larger than the file. FindMissing(r ranges.Range) (outr ranges.Range) // HasRange returns true if the current ranges entirely include range HasRange(r ranges.Range) bool // WriteAtNoOverwrite writes b to the file, but will not overwrite // already present ranges. // // This is used by the downloader to write bytes to the file // // It returns n the total bytes processed and skipped the number of // bytes which were processed but not actually written to the file. WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) } // Downloaders is a number of downloader~s and a queue of waiters // waiting for segments to be downloaded to a file. type Downloaders struct { // Write once - no locking required ctx context.Context cancel context.CancelFunc item Item opt *vfscommon.Options src fs.Object // source object remote string wg sync.WaitGroup // Read write mu sync.Mutex dls []*downloader waiters []waiter errorCount int // number of consecutive errors lastErr error // last error received } // waiter is a range we are waiting for and a channel to signal when // the range is found type waiter struct { r ranges.Range errChan chan<- error } // downloader represents a running download for part of a file. type downloader struct { // Write once dls *Downloaders // parent structure quit chan struct{} // close to quit the downloader wg sync.WaitGroup // to keep track of downloader goroutine kick chan struct{} // kick the downloader when needed // Read write mu sync.Mutex start int64 // start offset offset int64 // current offset maxOffset int64 // maximum offset we are reading to tr *accounting.Transfer in *accounting.Account // input we are reading from skipped int64 // number of bytes we have skipped sequentially _closed bool // set to true if downloader is closed stop bool // set to true if we have called _stop() } // New makes a downloader for item func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) { if src == nil { panic("internal error: newDownloaders called with nil src object") } ctx, cancel := context.WithCancel(context.Background()) dls = &Downloaders{ ctx: ctx, cancel: cancel, item: item, opt: opt, src: src, remote: remote, } dls.wg.Add(1) go func() { defer dls.wg.Done() ticker := time.NewTicker(backgroundKickerInterval) select { case <-ticker.C: err := dls.kickWaiters() if err != nil { fs.Errorf(dls.src, "vfs cache: failed to kick waiters: %v", err) } case <-ctx.Done(): break } ticker.Stop() }() return dls } // Accumulate errors for this downloader // // It should be called with // // n bytes downloaded // err is error from download // // call with lock held func (dls *Downloaders) _countErrors(n int64, err error) { if err == nil && n != 0 { if dls.errorCount != 0 { fs.Infof(dls.src, "vfs cache: downloader: resetting error count to 0") dls.errorCount = 0 dls.lastErr = nil } return } if err != nil { dls.errorCount++ dls.lastErr = err fs.Infof(dls.src, "vfs cache: downloader: error count now %d: %v", dls.errorCount, err) } } func (dls *Downloaders) countErrors(n int64, err error) { dls.mu.Lock() dls._countErrors(n, err) dls.mu.Unlock() } // Make a new downloader, starting it to download r // // call with lock held func (dls *Downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) { // defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) dl = &downloader{ kick: make(chan struct{}, 1), quit: make(chan struct{}), dls: dls, start: r.Pos, offset: r.Pos, maxOffset: r.End(), } err = dl.open(dl.offset) if err != nil { _ = dl.close(err) return nil, errors.Wrap(err, "failed to open downloader") } dls.dls = append(dls.dls, dl) dl.wg.Add(1) go func() { defer dl.wg.Done() n, err := dl.download() _ = dl.close(err) dl.dls.countErrors(n, err) if err != nil { fs.Errorf(dl.dls.src, "vfs cache: failed to download: %v", err) } err = dl.dls.kickWaiters() if err != nil { fs.Errorf(dl.dls.src, "vfs cache: failed to kick waiters: %v", err) } }() return dl, nil } // _removeClosed() removes any downloaders which are closed. // // Call with the mutex held func (dls *Downloaders) _removeClosed() { newDownloaders := dls.dls[:0] for _, dl := range dls.dls { if !dl.closed() { newDownloaders = append(newDownloaders, dl) } } dls.dls = newDownloaders } // Close all running downloaders and return any unfulfilled waiters // with inErr func (dls *Downloaders) Close(inErr error) (err error) { dls.mu.Lock() defer dls.mu.Unlock() dls._removeClosed() for _, dl := range dls.dls { dls.mu.Unlock() closeErr := dl.stopAndClose(inErr) dls.mu.Lock() if closeErr != nil && err != nil { err = closeErr } } dls.cancel() dls.wg.Wait() dls.dls = nil dls._dispatchWaiters() dls._closeWaiters(inErr) return err } // Download the range passed in returning when it has been downloaded // with an error from the downloading go routine. func (dls *Downloaders) Download(r ranges.Range) (err error) { // defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err) dls.mu.Lock() errChan := make(chan error) waiter := waiter{ r: r, errChan: errChan, } err = dls._ensureDownloader(r) if err != nil { dls.mu.Unlock() return err } dls.waiters = append(dls.waiters, waiter) dls.mu.Unlock() return <-errChan } // close any waiters with the error passed in // // call with lock held func (dls *Downloaders) _closeWaiters(err error) { for _, waiter := range dls.waiters { waiter.errChan <- err } dls.waiters = nil } // ensure a downloader is running for the range if required. If one isn't found // then it starts it. // // call with lock held func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { // The window includes potentially unread data in the buffer window := int64(fs.Config.BufferSize) // Increase the read range by the read ahead if set if dls.opt.ReadAhead > 0 { r.Size += int64(dls.opt.ReadAhead) } // We may be reopening a downloader after a failure here or // doing a tentative prefetch so check to see that we haven't // read some stuff already. // // Clip r to stuff which needs downloading r = dls.item.FindMissing(r) // If the range is entirely present then we only need to start a // dowloader if the window isn't full. if r.IsEmpty() { // Make a new range which includes the window rWindow := r if rWindow.Size < window { rWindow.Size = window } // Clip rWindow to stuff which needs downloading rWindow = dls.item.FindMissing(rWindow) // If rWindow is empty then just return without starting a // downloader as there is no data within the window which needs // downloading. if rWindow.IsEmpty() { return nil } // Start downloading at the start of the unread window r.Pos = rWindow.Pos // But don't write anything for the moment r.Size = 0 } var dl *downloader // Look through downloaders to find one in range // If there isn't one then start a new one dls._removeClosed() for _, dl = range dls.dls { start, offset := dl.getRange() // The downloader's offset to offset+window is the gap // in which we would like to re-use this // downloader. The downloader will never reach before // start and offset+windows is too far away - we'd // rather start another downloader. // fs.Debugf(nil, "r=%v start=%d, offset=%d, found=%v", r, start, offset, r.Pos >= start && r.Pos < offset+window) if r.Pos >= start && r.Pos < offset+window { // Found downloader which will soon have our data dl.setRange(r) return nil } } // Downloader not found so start a new one dl, err = dls._newDownloader(r) if err != nil { dls._countErrors(0, err) return errors.Wrap(err, "failed to start downloader") } return err } // EnsureDownloader makes sure a downloader is running for the range // passed in. If one isn't found then it starts it. // // It does not wait for the range to be downloaded func (dls *Downloaders) EnsureDownloader(r ranges.Range) (err error) { dls.mu.Lock() defer dls.mu.Unlock() return dls._ensureDownloader(r) } // _dispatchWaiters() sends any waiters which have completed back to // their callers. // // Call with the mutex held func (dls *Downloaders) _dispatchWaiters() { if len(dls.waiters) == 0 { return } newWaiters := dls.waiters[:0] for _, waiter := range dls.waiters { if dls.item.HasRange(waiter.r) { waiter.errChan <- nil } else { newWaiters = append(newWaiters, waiter) } } dls.waiters = newWaiters } // Send any waiters which have completed back to their callers and make sure // there is a downloader appropriate for each waiter func (dls *Downloaders) kickWaiters() (err error) { dls.mu.Lock() defer dls.mu.Unlock() dls._dispatchWaiters() if len(dls.waiters) == 0 { return nil } // Make sure each waiter has a downloader // This is an O(waiters*Downloaders) algorithm // However the number of waiters and the number of downloaders // are both expected to be small. for _, waiter := range dls.waiters { err = dls._ensureDownloader(waiter.r) if err != nil { // Failures here will be retried by background kicker fs.Errorf(dls.src, "vfs cache: restart download failed: %v", err) } } if dls.errorCount > maxErrorCount { fs.Errorf(dls.src, "vfs cache: too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) dls._closeWaiters(dls.lastErr) return dls.lastErr } return nil } // Write writes len(p) bytes from p to the underlying data stream. It // returns the number of bytes written from p (0 <= n <= len(p)) and // any error encountered that caused the write to stop early. Write // must return a non-nil error if it returns n < len(p). Write must // not modify the slice data, even temporarily. // // Implementations must not retain p. func (dl *downloader) Write(p []byte) (n int, err error) { // defer log.Trace(dl.dls.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err) // Kick the waiters on exit if some characters received defer func() { if n <= 0 { return } if waitErr := dl.dls.kickWaiters(); waitErr != nil { fs.Errorf(dl.dls.src, "vfs cache: download write: failed to kick waiters: %v", waitErr) if err == nil { err = waitErr } } }() dl.mu.Lock() defer dl.mu.Unlock() // Wait here if we have reached maxOffset until // - we are quitting // - we get kicked // - timeout happens if dl.offset >= dl.maxOffset { var timeout = time.NewTimer(maxDownloaderIdleTime) dl.mu.Unlock() select { case <-dl.quit: dl.mu.Lock() timeout.Stop() case <-dl.kick: dl.mu.Lock() timeout.Stop() case <-timeout.C: // stop any future reading dl.mu.Lock() if !dl.stop { fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it timed out") dl._stop() } } } n, skipped, err := dl.dls.item.WriteAtNoOverwrite(p, dl.offset) if skipped == n { dl.skipped += int64(skipped) } else { dl.skipped = 0 } dl.offset += int64(n) // Kill this downloader if skipped too many bytes if !dl.stop && dl.skipped > maxSkipBytes { fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it has skipped %d bytes", dl.skipped) dl._stop() } // If running without a async buffer then stop now as // StopBuffering has no effect if the Account wasn't buffered // so we need to stop manually now rather than wait for the // AsyncReader to stop. if dl.stop && !dl.in.HasBuffer() { err = asyncreader.ErrorStreamAbandoned } return n, err } // open the file from offset // // should be called on a fresh downloader func (dl *downloader) open(offset int64) (err error) { // defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err) dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src) size := dl.dls.src.Size() if size < 0 { // FIXME should just completely download these return errors.New("can't open unknown sized file") } // FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag. // var rangeOption *fs.RangeOption // if offset > 0 { // rangeOption = &fs.RangeOption{Start: offset, End: size - 1} // } // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, fs.Config.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption) in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit)) _, err = in0.Seek(offset, 0) if err != nil { return errors.Wrap(err, "vfs reader: failed to open source file") } dl.in = dl.tr.Account(dl.dls.ctx, in0).WithBuffer() // account and buffer the transfer dl.offset = offset // FIXME set mod time // FIXME check checksums return nil } // close the downloader func (dl *downloader) close(inErr error) (err error) { // defer log.Trace(dl.dls.src, "inErr=%v", err)("err=%v", &err) checkErr := func(e error) { if e == nil || errors.Cause(err) == asyncreader.ErrorStreamAbandoned { return } err = e } dl.mu.Lock() if dl.in != nil { checkErr(dl.in.Close()) dl.in = nil } if dl.tr != nil { dl.tr.Done(inErr) dl.tr = nil } dl._closed = true dl.mu.Unlock() return nil } // closed returns true if the downloader has been closed alread func (dl *downloader) closed() bool { dl.mu.Lock() defer dl.mu.Unlock() return dl._closed } // stop the downloader if running // // Call with the mutex held func (dl *downloader) _stop() { // defer log.Trace(dl.dls.src, "")("") // exit if have already called _stop if dl.stop { return } dl.stop = true // Signal quit now to unblock the downloader close(dl.quit) // stop the downloader by stopping the async reader buffering // any more input. This causes all the stuff in the async // buffer (which can be many MB) to be written to the disk // before exiting. if dl.in != nil { dl.in.StopBuffering() } } // stop the downloader if running then close it with the error passed in func (dl *downloader) stopAndClose(inErr error) (err error) { // Stop the downloader by closing its input dl.mu.Lock() dl._stop() dl.mu.Unlock() // wait for downloader to finish... // do this without mutex as asyncreader // calls back into Write() which needs the lock dl.wg.Wait() return dl.close(inErr) } // Start downloading to the local file starting at offset until maxOffset. func (dl *downloader) download() (n int64, err error) { // defer log.Trace(dl.dls.src, "")("err=%v", &err) n, err = dl.in.WriteTo(dl) if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned { return n, errors.Wrap(err, "vfs reader: failed to write to cache file") } return n, nil } // setRange makes sure the downloader is downloading the range passed in func (dl *downloader) setRange(r ranges.Range) { dl.mu.Lock() maxOffset := r.End() if maxOffset > dl.maxOffset { dl.maxOffset = maxOffset // fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset) select { case dl.kick <- struct{}{}: default: } } dl.mu.Unlock() } // get the current range this downloader is working on func (dl *downloader) getRange() (start, offset int64) { dl.mu.Lock() defer dl.mu.Unlock() return dl.start, dl.offset }