// Package combine implements a backend to combine multiple remotes in a directory tree
package combine

/*
   Have API to add/remove branches in the combine
*/

import (
	"context"
	"errors"
	"fmt"
	"io"
	"path"
	"strings"
	"sync"
	"time"

	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/cache"
	"github.com/rclone/rclone/fs/config/configmap"
	"github.com/rclone/rclone/fs/config/configstruct"
	"github.com/rclone/rclone/fs/hash"
	"github.com/rclone/rclone/fs/operations"
	"github.com/rclone/rclone/fs/walk"
	"golang.org/x/sync/errgroup"
)

// Register with Fs
func init() {
	fsi := &fs.RegInfo{
		Name:        "combine",
		Description: "Combine several remotes into one",
		NewFs:       NewFs,
		MetadataInfo: &fs.MetadataInfo{
			Help: `Any metadata supported by the underlying remote is read and written.`,
		},
		Options: []fs.Option{{
			Name: "upstreams",
			Help: `Upstreams for combining

These should be in the form

    dir=remote:path dir2=remote2:path

Where before the = is specified the root directory and after is the remote to
put there.

Embedded spaces can be added using quotes

    "dir=remote:path with space" "dir2=remote2:path with space"

`,
			Required: true,
			Default:  fs.SpaceSepList(nil),
		}},
	}
	fs.Register(fsi)
}

// Options defines the configuration for this backend
type Options struct {
	Upstreams fs.SpaceSepList `config:"upstreams"`
}

// Fs represents a combine of upstreams
type Fs struct {
	name      string               // name of this remote
	features  *fs.Features         // optional features
	opt       Options              // options for this Fs
	root      string               // the path we are working on
	hashSet   hash.Set             // common hashes
	when      time.Time            // directory times
	upstreams map[string]*upstream // map of upstreams
}

// adjustment stores the info to add a prefix to a path or chop characters off
type adjustment struct {
	root            string
	rootSlash       string
	mountpoint      string
	mountpointSlash string
}

// newAdjustment makes a new path adjustment adjusting between mountpoint and root
//
// mountpoint is the point the upstream is mounted and root is the combine root
func newAdjustment(root, mountpoint string) (a adjustment) {
	return adjustment{
		root:            root,
		rootSlash:       root + "/",
		mountpoint:      mountpoint,
		mountpointSlash: mountpoint + "/",
	}
}

var errNotUnderRoot = errors.New("file not under root")

// do makes the adjustment on s, mapping an upstream path into a combine path
func (a *adjustment) do(s string) (string, error) {
	absPath := join(a.mountpoint, s)
	if a.root == "" {
		return absPath, nil
	}
	if absPath == a.root {
		return "", nil
	}
	if !strings.HasPrefix(absPath, a.rootSlash) {
		return "", errNotUnderRoot
	}
	return absPath[len(a.rootSlash):], nil
}

// undo makes the adjustment on s, mapping a combine path into an upstream path
func (a *adjustment) undo(s string) (string, error) {
	absPath := join(a.root, s)
	if absPath == a.mountpoint {
		return "", nil
	}
	if !strings.HasPrefix(absPath, a.mountpointSlash) {
		return "", errNotUnderRoot
	}
	return absPath[len(a.mountpointSlash):], nil
}

// upstream represents an upstream Fs
type upstream struct {
	f              fs.Fs
	parent         *Fs
	dir            string     // directory the upstream is mounted
	pathAdjustment adjustment // how to fiddle with the path
}

// Create an upstream from the directory it is mounted on and the remote
func (f *Fs) newUpstream(ctx context.Context, dir, remote string) (*upstream, error) {
	uFs, err := cache.Get(ctx, remote)
	if err == fs.ErrorIsFile {
		return nil, fmt.Errorf("can't combine files yet, only directories %q: %w", remote, err)
	}
	if err != nil {
		return nil, fmt.Errorf("failed to create upstream %q: %w", remote, err)
	}
	u := &upstream{
		f:              uFs,
		parent:         f,
		dir:            dir,
		pathAdjustment: newAdjustment(f.root, dir),
	}
	cache.PinUntilFinalized(u.f, u)
	return u, nil
}

// NewFs constructs an Fs from the path.
//
// The returned Fs is the actual Fs, referenced by remote in the config
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (outFs fs.Fs, err error) {
	// defer log.Trace(nil, "name=%q, root=%q, m=%v", name, root, m)("f=%+v, err=%v", &outFs, &err)
	// Parse config into Options struct
	opt := new(Options)
	err = configstruct.Set(m, opt)
	if err != nil {
		return nil, err
	}
	// Backward compatible to old config
	if len(opt.Upstreams) == 0 {
		return nil, errors.New("combine can't point to an empty upstream - check the value of the upstreams setting")
	}
	for _, u := range opt.Upstreams {
		if strings.HasPrefix(u, name+":") {
			return nil, errors.New("can't point combine remote at itself - check the value of the upstreams setting")
		}
	}
	isDir := false
	for strings.HasSuffix(root, "/") {
		root = root[:len(root)-1]
		isDir = true
	}

	f := &Fs{
		name:      name,
		root:      root,
		opt:       *opt,
		upstreams: make(map[string]*upstream, len(opt.Upstreams)),
		when:      time.Now(),
	}

	g, gCtx := errgroup.WithContext(ctx)
	var mu sync.Mutex
	for _, upstream := range opt.Upstreams {
		upstream := upstream
		g.Go(func() (err error) {
			equal := strings.IndexRune(upstream, '=')
			if equal < 0 {
				return fmt.Errorf("no \"=\" in upstream definition %q", upstream)
			}
			dir, remote := upstream[:equal], upstream[equal+1:]
			if dir == "" {
				return fmt.Errorf("empty dir in upstream definition %q", upstream)
			}
			if remote == "" {
				return fmt.Errorf("empty remote in upstream definition %q", upstream)
			}
			if strings.ContainsRune(dir, '/') {
				return fmt.Errorf("dirs can't contain / (yet): %q", dir)
			}
			u, err := f.newUpstream(gCtx, dir, remote)
			if err != nil {
				return err
			}
			mu.Lock()
			if _, found := f.upstreams[dir]; found {
				err = fmt.Errorf("duplicate directory name %q", dir)
			} else {
				f.upstreams[dir] = u
			}
			mu.Unlock()
			return err
		})
	}
	err = g.Wait()
	if err != nil {
		return nil, err
	}
	// check features
	var features = (&fs.Features{
		CaseInsensitive:          true,
		DuplicateFiles:           false,
		ReadMimeType:             true,
		WriteMimeType:            true,
		CanHaveEmptyDirectories:  true,
		BucketBased:              true,
		SetTier:                  true,
		GetTier:                  true,
		ReadMetadata:             true,
		WriteMetadata:            true,
		UserMetadata:             true,
		ReadDirMetadata:          true,
		WriteDirMetadata:         true,
		WriteDirSetModTime:       true,
		UserDirMetadata:          true,
		DirModTimeUpdatesOnWrite: true,
		PartialUploads:           true,
	}).Fill(ctx, f)
	canMove := true
	for _, u := range f.upstreams {
		features = features.Mask(ctx, u.f) // Mask all upstream fs
		if !operations.CanServerSideMove(u.f) {
			canMove = false
		}
	}
	// We can move if all remotes support Move or Copy
	if canMove {
		features.Move = f.Move
	}

	// Enable ListR when upstreams either support ListR or is local
	// But not when all upstreams are local
	if features.ListR == nil {
		for _, u := range f.upstreams {
			if u.f.Features().ListR != nil {
				features.ListR = f.ListR
			} else if !u.f.Features().IsLocal {
				features.ListR = nil
				break
			}
		}
	}

	// Enable Purge when any upstreams support it
	if features.Purge == nil {
		for _, u := range f.upstreams {
			if u.f.Features().Purge != nil {
				features.Purge = f.Purge
				break
			}
		}
	}

	// Enable Shutdown when any upstreams support it
	if features.Shutdown == nil {
		for _, u := range f.upstreams {
			if u.f.Features().Shutdown != nil {
				features.Shutdown = f.Shutdown
				break
			}
		}
	}

	// Enable DirCacheFlush when any upstreams support it
	if features.DirCacheFlush == nil {
		for _, u := range f.upstreams {
			if u.f.Features().DirCacheFlush != nil {
				features.DirCacheFlush = f.DirCacheFlush
				break
			}
		}
	}

	// Enable CleanUp when any upstreams support it
	if features.CleanUp == nil {
		for _, u := range f.upstreams {
			if u.f.Features().CleanUp != nil {
				features.CleanUp = f.CleanUp
				break
			}
		}
	}

	// Enable ChangeNotify when any upstreams support it
	if features.ChangeNotify == nil {
		for _, u := range f.upstreams {
			if u.f.Features().ChangeNotify != nil {
				features.ChangeNotify = f.ChangeNotify
				break
			}
		}
	}

	// show that we wrap other backends
	features.Overlay = true

	f.features = features

	// Get common intersection of hashes
	var hashSet hash.Set
	var first = true
	for _, u := range f.upstreams {
		if first {
			hashSet = u.f.Hashes()
			first = false
		} else {
			hashSet = hashSet.Overlap(u.f.Hashes())
		}
	}
	f.hashSet = hashSet

	// Check to see if the root is actually a file
	if f.root != "" && !isDir {
		_, err := f.NewObject(ctx, "")
		if err != nil {
			if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile || err == fs.ErrorIsDir {
				// File doesn't exist or is a directory so return old f
				return f, nil
			}
			return nil, err
		}

		// Check to see if the root path is actually an existing file
		f.root = path.Dir(f.root)
		if f.root == "." {
			f.root = ""
		}
		// Adjust path adjustment to remove leaf
		for _, u := range f.upstreams {
			u.pathAdjustment = newAdjustment(f.root, u.dir)
		}
		return f, fs.ErrorIsFile
	}
	return f, nil
}

// Run a function over all the upstreams in parallel
func (f *Fs) multithread(ctx context.Context, fn func(context.Context, *upstream) error) error {
	g, gCtx := errgroup.WithContext(ctx)
	for _, u := range f.upstreams {
		u := u
		g.Go(func() (err error) {
			return fn(gCtx, u)
		})
	}
	return g.Wait()
}

// join the elements together but unlike path.Join return empty string
func join(elem ...string) string {
	result := path.Join(elem...)
	if result == "." {
		return ""
	}
	if len(result) > 0 && result[0] == '/' {
		result = result[1:]
	}
	return result
}

// find the upstream for the remote passed in, returning the upstream and the adjusted path
func (f *Fs) findUpstream(remote string) (u *upstream, uRemote string, err error) {
	// defer log.Trace(remote, "")("f=%v, uRemote=%q, err=%v", &u, &uRemote, &err)
	for _, u := range f.upstreams {
		uRemote, err = u.pathAdjustment.undo(remote)
		if err == nil {
			return u, uRemote, nil
		}
	}
	return nil, "", fmt.Errorf("combine for remote %q: %w", remote, fs.ErrorDirNotFound)
}

// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
	return f.name
}

// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
	return f.root
}

// String converts this Fs to a string
func (f *Fs) String() string {
	return fmt.Sprintf("combine root '%s'", f.root)
}

// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
	return f.features
}

// Rmdir removes the root directory of the Fs object
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
	// The root always exists
	if f.root == "" && dir == "" {
		return nil
	}
	u, uRemote, err := f.findUpstream(dir)
	if err != nil {
		return err
	}
	return u.f.Rmdir(ctx, uRemote)
}

// Hashes returns hash.HashNone to indicate remote hashing is unavailable
func (f *Fs) Hashes() hash.Set {
	return f.hashSet
}

// Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
	// The root always exists
	if f.root == "" && dir == "" {
		return nil
	}
	u, uRemote, err := f.findUpstream(dir)
	if err != nil {
		return err
	}
	return u.f.Mkdir(ctx, uRemote)
}

// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
	u, uRemote, err := f.findUpstream(dir)
	if err != nil {
		return nil, err
	}
	do := u.f.Features().MkdirMetadata
	if do == nil {
		return nil, fs.ErrorNotImplemented
	}
	newDir, err := do(ctx, uRemote, metadata)
	if err != nil {
		return nil, err
	}
	entries := fs.DirEntries{newDir}
	entries, err = u.wrapEntries(ctx, entries)
	if err != nil {
		return nil, err
	}
	newDir, ok := entries[0].(fs.Directory)
	if !ok {
		return nil, fmt.Errorf("internal error: expecting %T to be fs.Directory", entries[0])
	}
	return newDir, nil
}

// purge the upstream or fallback to a slow way
func (u *upstream) purge(ctx context.Context, dir string) (err error) {
	if do := u.f.Features().Purge; do != nil {
		err = do(ctx, dir)
	} else {
		err = operations.Purge(ctx, u.f, dir)
	}
	return err
}

// Purge all files in the directory
//
// Implement this if you have a way of deleting all the files
// quicker than just running Remove() on the result of List()
//
// Return an error if it doesn't exist
func (f *Fs) Purge(ctx context.Context, dir string) error {
	if f.root == "" && dir == "" {
		return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
			return u.purge(ctx, "")
		})
	}
	u, uRemote, err := f.findUpstream(dir)
	if err != nil {
		return err
	}
	return u.purge(ctx, uRemote)
}

// Copy src to this remote using server-side copy operations.
//
// This is stored with the remote path given.
//
// It returns the destination Object and a possible error.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
	srcObj, ok := src.(*Object)
	if !ok {
		fs.Debugf(src, "Can't copy - not same remote type")
		return nil, fs.ErrorCantCopy
	}

	dstU, dstRemote, err := f.findUpstream(remote)
	if err != nil {
		return nil, err
	}

	do := dstU.f.Features().Copy
	if do == nil {
		return nil, fs.ErrorCantCopy
	}

	o, err := do(ctx, srcObj.Object, dstRemote)
	if err != nil {
		return nil, err
	}

	return dstU.newObject(o), nil
}

// Move src to this remote using server-side move operations.
//
// This is stored with the remote path given.
//
// It returns the destination Object and a possible error.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
	srcObj, ok := src.(*Object)
	if !ok {
		fs.Debugf(src, "Can't move - not same remote type")
		return nil, fs.ErrorCantMove
	}

	dstU, dstRemote, err := f.findUpstream(remote)
	if err != nil {
		return nil, err
	}

	do := dstU.f.Features().Move
	useCopy := false
	if do == nil {
		do = dstU.f.Features().Copy
		if do == nil {
			return nil, fs.ErrorCantMove
		}
		useCopy = true
	}

	o, err := do(ctx, srcObj.Object, dstRemote)
	if err != nil {
		return nil, err
	}

	// If did Copy then remove the source object
	if useCopy {
		err = srcObj.Remove(ctx)
		if err != nil {
			return nil, err
		}
	}

	return dstU.newObject(o), nil
}

// DirMove moves src, srcRemote to this remote at dstRemote
// using server-side move operations.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantDirMove
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) {
	// defer log.Trace(f, "src=%v, srcRemote=%q, dstRemote=%q", src, srcRemote, dstRemote)("err=%v", &err)
	srcFs, ok := src.(*Fs)
	if !ok {
		fs.Debugf(src, "Can't move directory - not same remote type")
		return fs.ErrorCantDirMove
	}

	dstU, dstURemote, err := f.findUpstream(dstRemote)
	if err != nil {
		return err
	}

	srcU, srcURemote, err := srcFs.findUpstream(srcRemote)
	if err != nil {
		return err
	}

	do := dstU.f.Features().DirMove
	if do == nil {
		return fs.ErrorCantDirMove
	}

	fs.Logf(dstU.f, "srcU.f=%v, srcURemote=%q, dstURemote=%q", srcU.f, srcURemote, dstURemote)
	return do(ctx, srcU.f, srcURemote, dstURemote)
}

// ChangeNotify calls the passed function with a path
// that has had changes. If the implementation
// uses polling, it should adhere to the given interval.
// At least one value will be written to the channel,
// specifying the initial value and updated values might
// follow. A 0 Duration should pause the polling.
// The ChangeNotify implementation must empty the channel
// regularly. When the channel gets closed, the implementation
// should stop polling and release resources.
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), ch <-chan time.Duration) {
	var uChans []chan time.Duration

	for _, u := range f.upstreams {
		u := u
		if do := u.f.Features().ChangeNotify; do != nil {
			ch := make(chan time.Duration)
			uChans = append(uChans, ch)
			wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
				newPath, err := u.pathAdjustment.do(path)
				if err != nil {
					fs.Logf(f, "ChangeNotify: unable to process %q: %s", path, err)
					return
				}
				fs.Debugf(f, "ChangeNotify: path %q entryType %d", newPath, entryType)
				notifyFunc(newPath, entryType)
			}
			do(ctx, wrappedNotifyFunc, ch)
		}
	}

	go func() {
		for i := range ch {
			for _, c := range uChans {
				c <- i
			}
		}
		for _, c := range uChans {
			close(c)
		}
	}()
}

// DirCacheFlush resets the directory cache - used in testing
// as an optional interface
func (f *Fs) DirCacheFlush() {
	ctx := context.Background()
	_ = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
		if do := u.f.Features().DirCacheFlush; do != nil {
			do()
		}
		return nil
	})
}

func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
	srcPath := src.Remote()
	u, uRemote, err := f.findUpstream(srcPath)
	if err != nil {
		return nil, err
	}
	uSrc := fs.NewOverrideRemote(src, uRemote)
	var o fs.Object
	if stream {
		o, err = u.f.Features().PutStream(ctx, in, uSrc, options...)
	} else {
		o, err = u.f.Put(ctx, in, uSrc, options...)
	}
	if err != nil {
		return nil, err
	}
	return u.newObject(o), nil
}

// Put in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	o, err := f.NewObject(ctx, src.Remote())
	switch err {
	case nil:
		return o, o.Update(ctx, in, src, options...)
	case fs.ErrorObjectNotFound:
		return f.put(ctx, in, src, false, options...)
	default:
		return nil, err
	}
}

// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	o, err := f.NewObject(ctx, src.Remote())
	switch err {
	case nil:
		return o, o.Update(ctx, in, src, options...)
	case fs.ErrorObjectNotFound:
		return f.put(ctx, in, src, true, options...)
	default:
		return nil, err
	}
}

// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
	usage := &fs.Usage{
		Total:   new(int64),
		Used:    new(int64),
		Trashed: new(int64),
		Other:   new(int64),
		Free:    new(int64),
		Objects: new(int64),
	}
	for _, u := range f.upstreams {
		doAbout := u.f.Features().About
		if doAbout == nil {
			continue
		}
		usg, err := doAbout(ctx)
		if errors.Is(err, fs.ErrorDirNotFound) {
			continue
		}
		if err != nil {
			return nil, err
		}
		if usg.Total != nil && usage.Total != nil {
			*usage.Total += *usg.Total
		} else {
			usage.Total = nil
		}
		if usg.Used != nil && usage.Used != nil {
			*usage.Used += *usg.Used
		} else {
			usage.Used = nil
		}
		if usg.Trashed != nil && usage.Trashed != nil {
			*usage.Trashed += *usg.Trashed
		} else {
			usage.Trashed = nil
		}
		if usg.Other != nil && usage.Other != nil {
			*usage.Other += *usg.Other
		} else {
			usage.Other = nil
		}
		if usg.Free != nil && usage.Free != nil {
			*usage.Free += *usg.Free
		} else {
			usage.Free = nil
		}
		if usg.Objects != nil && usage.Objects != nil {
			*usage.Objects += *usg.Objects
		} else {
			usage.Objects = nil
		}
	}
	return usage, nil
}

// Wraps entries for this upstream
func (u *upstream) wrapEntries(ctx context.Context, entries fs.DirEntries) (fs.DirEntries, error) {
	for i, entry := range entries {
		switch x := entry.(type) {
		case fs.Object:
			entries[i] = u.newObject(x)
		case fs.Directory:
			newPath, err := u.pathAdjustment.do(x.Remote())
			if err != nil {
				return nil, err
			}
			newDir := fs.NewDirWrapper(newPath, x)
			entries[i] = newDir
		default:
			return nil, fmt.Errorf("unknown entry type %T", entry)
		}
	}
	return entries, nil
}

// List the objects and directories in dir into entries.  The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
	// defer log.Trace(f, "dir=%q", dir)("entries = %v, err=%v", &entries, &err)
	if f.root == "" && dir == "" {
		entries = make(fs.DirEntries, 0, len(f.upstreams))
		for combineDir := range f.upstreams {
			d := fs.NewLimitedDirWrapper(combineDir, fs.NewDir(combineDir, f.when))
			entries = append(entries, d)
		}
		return entries, nil
	}
	u, uRemote, err := f.findUpstream(dir)
	if err != nil {
		return nil, err
	}
	entries, err = u.f.List(ctx, uRemote)
	if err != nil {
		return nil, err
	}
	return u.wrapEntries(ctx, entries)
}

// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order.  If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
	// defer log.Trace(f, "dir=%q, callback=%v", dir, callback)("err=%v", &err)
	if f.root == "" && dir == "" {
		rootEntries, err := f.List(ctx, "")
		if err != nil {
			return err
		}
		err = callback(rootEntries)
		if err != nil {
			return err
		}
		var mu sync.Mutex
		syncCallback := func(entries fs.DirEntries) error {
			mu.Lock()
			defer mu.Unlock()
			return callback(entries)
		}
		err = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
			return f.ListR(ctx, u.dir, syncCallback)
		})
		if err != nil {
			return err
		}
		return nil
	}
	u, uRemote, err := f.findUpstream(dir)
	if err != nil {
		return err
	}
	wrapCallback := func(entries fs.DirEntries) error {
		entries, err := u.wrapEntries(ctx, entries)
		if err != nil {
			return err
		}
		return callback(entries)
	}
	if do := u.f.Features().ListR; do != nil {
		err = do(ctx, uRemote, wrapCallback)
	} else {
		err = walk.ListR(ctx, u.f, uRemote, true, -1, walk.ListAll, wrapCallback)
	}
	if err == fs.ErrorDirNotFound {
		err = nil
	}
	return err
}

// NewObject creates a new remote combine file object
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
	u, uRemote, err := f.findUpstream(remote)
	if err != nil {
		return nil, err
	}
	if uRemote == "" || strings.HasSuffix(uRemote, "/") {
		return nil, fs.ErrorIsDir
	}
	o, err := u.f.NewObject(ctx, uRemote)
	if err != nil {
		return nil, err
	}
	return u.newObject(o), nil
}

// Precision is the greatest Precision of all upstreams
func (f *Fs) Precision() time.Duration {
	var greatestPrecision time.Duration
	for _, u := range f.upstreams {
		uPrecision := u.f.Precision()
		if uPrecision > greatestPrecision {
			greatestPrecision = uPrecision
		}
	}
	return greatestPrecision
}

// Shutdown the backend, closing any background tasks and any
// cached connections.
func (f *Fs) Shutdown(ctx context.Context) error {
	return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
		if do := u.f.Features().Shutdown; do != nil {
			return do(ctx)
		}
		return nil
	})
}

// PublicLink generates a public link to the remote path (usually readable by anyone)
func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (string, error) {
	u, uRemote, err := f.findUpstream(remote)
	if err != nil {
		return "", err
	}
	do := u.f.Features().PublicLink
	if do == nil {
		return "", fs.ErrorNotImplemented
	}
	return do(ctx, uRemote, expire, unlink)
}

// PutUnchecked in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
//
// May create duplicates or return errors if src already
// exists.
func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	srcPath := src.Remote()
	u, uRemote, err := f.findUpstream(srcPath)
	if err != nil {
		return nil, err
	}
	do := u.f.Features().PutUnchecked
	if do == nil {
		return nil, fs.ErrorNotImplemented
	}
	uSrc := fs.NewOverrideRemote(src, uRemote)
	return do(ctx, in, uSrc, options...)
}

// MergeDirs merges the contents of all the directories passed
// in into the first one and rmdirs the other directories.
func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
	if len(dirs) == 0 {
		return nil
	}
	var (
		u     *upstream
		uDirs []fs.Directory
	)
	for _, dir := range dirs {
		uNew, uDir, err := f.findUpstream(dir.Remote())
		if err != nil {
			return err
		}
		if u == nil {
			u = uNew
		} else if u != uNew {
			return fmt.Errorf("can't merge directories from different upstreams")
		}
		uDirs = append(uDirs, fs.NewOverrideDirectory(dir, uDir))
	}
	do := u.f.Features().MergeDirs
	if do == nil {
		return fs.ErrorNotImplemented
	}
	return do(ctx, uDirs)
}

// DirSetModTime sets the directory modtime for dir
func (f *Fs) DirSetModTime(ctx context.Context, dir string, modTime time.Time) error {
	u, uDir, err := f.findUpstream(dir)
	if err != nil {
		return err
	}
	if uDir == "" {
		fs.Debugf(dir, "Can't set modtime on upstream root. skipping.")
		return nil
	}
	if do := u.f.Features().DirSetModTime; do != nil {
		return do(ctx, uDir, modTime)
	}
	return fs.ErrorNotImplemented
}

// CleanUp the trash in the Fs
//
// Implement this if you have a way of emptying the trash or
// otherwise cleaning up old versions of files.
func (f *Fs) CleanUp(ctx context.Context) error {
	return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
		if do := u.f.Features().CleanUp; do != nil {
			return do(ctx)
		}
		return nil
	})
}

// OpenWriterAt opens with a handle for random access writes
//
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
	u, uRemote, err := f.findUpstream(remote)
	if err != nil {
		return nil, err
	}
	do := u.f.Features().OpenWriterAt
	if do == nil {
		return nil, fs.ErrorNotImplemented
	}
	return do(ctx, uRemote, size)
}

// Object describes a wrapped Object
//
// This is a wrapped Object which knows its path prefix
type Object struct {
	fs.Object
	u *upstream
}

func (u *upstream) newObject(o fs.Object) *Object {
	return &Object{
		Object: o,
		u:      u,
	}
}

// Fs returns read only access to the Fs that this object is part of
func (o *Object) Fs() fs.Info {
	return o.u.parent
}

// String returns the remote path
func (o *Object) String() string {
	return o.Remote()
}

// Remote returns the remote path
func (o *Object) Remote() string {
	newPath, err := o.u.pathAdjustment.do(o.Object.String())
	if err != nil {
		fs.Errorf(o.Object, "Bad object: %v", err)
		return err.Error()
	}
	return newPath
}

// MimeType returns the content type of the Object if known
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
	if do, ok := o.Object.(fs.MimeTyper); ok {
		mimeType = do.MimeType(ctx)
	}
	return mimeType
}

// UnWrap returns the Object that this Object is wrapping or
// nil if it isn't wrapping anything
func (o *Object) UnWrap() fs.Object {
	return o.Object
}

// GetTier returns storage tier or class of the Object
func (o *Object) GetTier() string {
	do, ok := o.Object.(fs.GetTierer)
	if !ok {
		return ""
	}
	return do.GetTier()
}

// ID returns the ID of the Object if known, or "" if not
func (o *Object) ID() string {
	do, ok := o.Object.(fs.IDer)
	if !ok {
		return ""
	}
	return do.ID()
}

// Metadata returns metadata for an object
//
// It should return nil if there is no Metadata
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
	do, ok := o.Object.(fs.Metadataer)
	if !ok {
		return nil, nil
	}
	return do.Metadata(ctx)
}

// SetMetadata sets metadata for an Object
//
// It should return fs.ErrorNotImplemented if it can't set metadata
func (o *Object) SetMetadata(ctx context.Context, metadata fs.Metadata) error {
	do, ok := o.Object.(fs.SetMetadataer)
	if !ok {
		return fs.ErrorNotImplemented
	}
	return do.SetMetadata(ctx, metadata)
}

// SetTier performs changing storage tier of the Object if
// multiple storage classes supported
func (o *Object) SetTier(tier string) error {
	do, ok := o.Object.(fs.SetTierer)
	if !ok {
		return errors.New("underlying remote does not support SetTier")
	}
	return do.SetTier(tier)
}

// Check the interfaces are satisfied
var (
	_ fs.Fs              = (*Fs)(nil)
	_ fs.Purger          = (*Fs)(nil)
	_ fs.PutStreamer     = (*Fs)(nil)
	_ fs.Copier          = (*Fs)(nil)
	_ fs.Mover           = (*Fs)(nil)
	_ fs.DirMover        = (*Fs)(nil)
	_ fs.DirCacheFlusher = (*Fs)(nil)
	_ fs.ChangeNotifier  = (*Fs)(nil)
	_ fs.Abouter         = (*Fs)(nil)
	_ fs.ListRer         = (*Fs)(nil)
	_ fs.Shutdowner      = (*Fs)(nil)
	_ fs.PublicLinker    = (*Fs)(nil)
	_ fs.PutUncheckeder  = (*Fs)(nil)
	_ fs.MergeDirser     = (*Fs)(nil)
	_ fs.DirSetModTimer  = (*Fs)(nil)
	_ fs.MkdirMetadataer = (*Fs)(nil)
	_ fs.CleanUpper      = (*Fs)(nil)
	_ fs.OpenWriterAter  = (*Fs)(nil)
	_ fs.FullObject      = (*Object)(nil)
)