From 4af4bbb539c3e6e5f2d241a900ffb46cc3a2e34e Mon Sep 17 00:00:00 2001 From: remusb Date: Thu, 30 Nov 2017 21:16:45 +0200 Subject: [PATCH] cache: add support for PutStream - fixes #1836 --- cache/cache.go | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 4603fe7ed..89a046d41 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -376,6 +376,7 @@ func NewFs(name, rpath string) (fs.Fs, error) { DirChangeNotify: nil, DirCacheFlush: f.DirCacheFlush, PutUnchecked: f.PutUnchecked, + PutStream: f.PutStream, CleanUp: f.CleanUp, UnWrap: f.UnWrap, }).Fill(f).Mask(wrappedFs) @@ -689,18 +690,18 @@ func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn i } } -// Put in to the remote path with the modTime given of the given size -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - fs.Debugf(f, "put data at '%s'", src.Remote()) +type putFn func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) +// put in to the remote path +func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (fs.Object, error) { var err error var obj fs.Object if f.cacheWrites { f.cacheReader(in, src, func(inn io.Reader) { - obj, err = f.Fs.Put(inn, src, options...) + obj, err = put(inn, src, options...) }) } else { - obj, err = f.Fs.Put(in, src, options...) + obj, err = put(in, src, options...) } if err != nil { @@ -714,6 +715,12 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs. return cachedObj, nil } +// Put in to the remote path with the modTime given of the given size +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + fs.Debugf(f, "put data at '%s'", src.Remote()) + return f.put(in, src, options, f.Fs.Put) +} + // PutUnchecked uploads the object func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { do := f.Fs.Features().PutUnchecked @@ -721,26 +728,17 @@ func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOpt return nil, errors.New("can't PutUnchecked") } fs.Infof(f, "put data unchecked in '%s'", src.Remote()) + return f.put(in, src, options, do) +} - var err error - var obj fs.Object - if f.cacheWrites { - f.cacheReader(in, src, func(inn io.Reader) { - obj, err = f.Fs.Put(inn, src, options...) - }) - } else { - obj, err = f.Fs.Put(in, src, options...) +// PutStream uploads the object +func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + do := f.Fs.Features().PutStream + if do == nil { + return nil, errors.New("can't PutStream") } - - if err != nil { - fs.Errorf(src, "error saving in cache: %v", err) - return nil, err - } - cachedObj := ObjectFromOriginal(f, obj).persist() - - // clean cache - go f.CleanUpCache(false) - return cachedObj, nil + fs.Infof(f, "put data streaming in '%s'", src.Remote()) + return f.put(in, src, options, do) } // Copy src to this remote using server side copy operations. @@ -962,6 +960,7 @@ var ( _ fs.Mover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil)