mirror of
https://github.com/rclone/rclone.git
synced 2025-01-13 20:38:12 +02:00
union: Implement multiple writable remotes
This commit is contained in:
parent
93f5125f51
commit
78a9e7440a
167
backend/union/entry.go
Normal file
167
backend/union/entry.go
Normal file
@ -0,0 +1,167 @@
|
||||
package union
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
// Object describes a union Object
|
||||
//
|
||||
// This is a wrapped object which returns the Union Fs as its parent
|
||||
type Object struct {
|
||||
*upstream.Object
|
||||
fs *Fs // what this object is part of
|
||||
co []upstream.Entry
|
||||
}
|
||||
|
||||
// Directory describes a union Directory
|
||||
//
|
||||
// This is a wrapped object contains all candidates
|
||||
type Directory struct {
|
||||
*upstream.Directory
|
||||
cd []upstream.Entry
|
||||
}
|
||||
|
||||
type entry interface {
|
||||
upstream.Entry
|
||||
candidates() []upstream.Entry
|
||||
}
|
||||
|
||||
// UnWrap returns the Object that this Object is wrapping or
|
||||
// nil if it isn't wrapping anything
|
||||
func (o *Object) UnWrap() *upstream.Object {
|
||||
return o.Object
|
||||
}
|
||||
|
||||
// Fs returns the union Fs as the parent
|
||||
func (o *Object) Fs() fs.Info {
|
||||
return o.fs
|
||||
}
|
||||
|
||||
func (o *Object) candidates() []upstream.Entry {
|
||||
return o.co
|
||||
}
|
||||
|
||||
func (d *Directory) candidates() []upstream.Entry {
|
||||
return d.cd
|
||||
}
|
||||
|
||||
// Update in to the object with the modTime given of the given size
|
||||
//
|
||||
// When called from outside a Fs by rclone, src.Size() will always be >= 0.
|
||||
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
|
||||
// return an error or update the object properly (rather than e.g. calling panic).
|
||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
entries, err := o.fs.actionEntries(o.candidates()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(entries) == 1 {
|
||||
obj := entries[0].(*upstream.Object)
|
||||
return obj.Update(ctx, in, src, options...)
|
||||
}
|
||||
// Get multiple reader
|
||||
readers := make([]io.Reader, len(entries))
|
||||
writers := make([]io.Writer, len(entries))
|
||||
errs := Errors(make([]error, len(entries)+1))
|
||||
for i := range entries {
|
||||
r, w := io.Pipe()
|
||||
bw := bufio.NewWriter(w)
|
||||
readers[i], writers[i] = r, bw
|
||||
defer func() {
|
||||
err := w.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
mw := io.MultiWriter(writers...)
|
||||
es := make([]error, len(writers)+1)
|
||||
_, es[len(es)-1] = io.Copy(mw, in)
|
||||
for i, bw := range writers {
|
||||
es[i] = bw.(*bufio.Writer).Flush()
|
||||
}
|
||||
errs[len(entries)] = Errors(es).Err()
|
||||
}()
|
||||
// Multi-threading
|
||||
multithread(len(entries), func(i int) {
|
||||
if o, ok := entries[i].(*upstream.Object); ok {
|
||||
err := o.Update(ctx, readers[i], src, options...)
|
||||
errs[i] = errors.Wrap(err, o.UpstreamFs().Name())
|
||||
} else {
|
||||
errs[i] = fs.ErrorNotAFile
|
||||
}
|
||||
})
|
||||
return errs.Err()
|
||||
}
|
||||
|
||||
// Remove candidate objects selected by ACTION policy
|
||||
func (o *Object) Remove(ctx context.Context) error {
|
||||
entries, err := o.fs.actionEntries(o.candidates()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errs := Errors(make([]error, len(entries)))
|
||||
multithread(len(entries), func(i int) {
|
||||
if o, ok := entries[i].(*upstream.Object); ok {
|
||||
err := o.Remove(ctx)
|
||||
errs[i] = errors.Wrap(err, o.UpstreamFs().Name())
|
||||
} else {
|
||||
errs[i] = fs.ErrorNotAFile
|
||||
}
|
||||
})
|
||||
return errs.Err()
|
||||
}
|
||||
|
||||
// SetModTime sets the metadata on the object to set the modification date
|
||||
func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
|
||||
entries, err := o.fs.actionEntries(o.candidates()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
errs := Errors(make([]error, len(entries)))
|
||||
multithread(len(entries), func(i int) {
|
||||
if o, ok := entries[i].(*upstream.Object); ok {
|
||||
err := o.SetModTime(ctx, t)
|
||||
errs[i] = errors.Wrap(err, o.UpstreamFs().Name())
|
||||
} else {
|
||||
errs[i] = fs.ErrorNotAFile
|
||||
}
|
||||
})
|
||||
wg.Wait()
|
||||
return errs.Err()
|
||||
}
|
||||
|
||||
// ModTime returns the modification date of the directory
|
||||
// It returns the latest ModTime of all candidates
|
||||
func (d *Directory) ModTime(ctx context.Context) (t time.Time) {
|
||||
entries := d.candidates()
|
||||
times := make([]time.Time, len(entries))
|
||||
multithread(len(entries), func(i int) {
|
||||
times[i] = entries[i].ModTime(ctx)
|
||||
})
|
||||
for _, ti := range times {
|
||||
if t.Before(ti) {
|
||||
t = ti
|
||||
}
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
// Size returns the size of the directory
|
||||
// It returns the sum of all candidates
|
||||
func (d *Directory) Size() (s int64) {
|
||||
for _, e := range d.candidates() {
|
||||
s += e.Size()
|
||||
}
|
||||
return s
|
||||
}
|
68
backend/union/errors.go
Normal file
68
backend/union/errors.go
Normal file
@ -0,0 +1,68 @@
|
||||
package union
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// The Errors type wraps a slice of errors
|
||||
type Errors []error
|
||||
|
||||
// Map returns a copy of the error slice with all its errors modified
|
||||
// according to the mapping function. If mapping returns nil,
|
||||
// the error is dropped from the error slice with no replacement.
|
||||
func (e Errors) Map(mapping func(error) error) Errors {
|
||||
s := make([]error, len(e))
|
||||
i := 0
|
||||
for _, err := range e {
|
||||
nerr := mapping(err)
|
||||
if nerr == nil {
|
||||
continue
|
||||
}
|
||||
s[i] = nerr
|
||||
i++
|
||||
}
|
||||
return Errors(s[:i])
|
||||
}
|
||||
|
||||
// FilterNil returns the Errors without nil
|
||||
func (e Errors) FilterNil() Errors {
|
||||
ne := e.Map(func(err error) error {
|
||||
return err
|
||||
})
|
||||
return ne
|
||||
}
|
||||
|
||||
// Err returns a error interface that filtered nil,
|
||||
// or nil if no non-nil Error is presented.
|
||||
func (e Errors) Err() error {
|
||||
ne := e.FilterNil()
|
||||
if len(ne) == 0 {
|
||||
return nil
|
||||
}
|
||||
return ne
|
||||
}
|
||||
|
||||
// Error returns a concatenated string of the contained errors
|
||||
func (e Errors) Error() string {
|
||||
var buf bytes.Buffer
|
||||
|
||||
if len(e) == 0 {
|
||||
buf.WriteString("no error")
|
||||
}
|
||||
if len(e) == 1 {
|
||||
buf.WriteString("1 error: ")
|
||||
} else {
|
||||
fmt.Fprintf(&buf, "%d errors: ", len(e))
|
||||
}
|
||||
|
||||
for i, err := range e {
|
||||
if i != 0 {
|
||||
buf.WriteString("; ")
|
||||
}
|
||||
|
||||
buf.WriteString(err.Error())
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
44
backend/union/policy/all.go
Normal file
44
backend/union/policy/all.go
Normal file
@ -0,0 +1,44 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("all", &All{})
|
||||
}
|
||||
|
||||
// All policy behaves the same as EpAll except for the CREATE category
|
||||
// Action category: same as epall.
|
||||
// Create category: apply to all branches.
|
||||
// Search category: same as epall.
|
||||
type All struct {
|
||||
EpAll
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *All) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
return upstreams, nil
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *All) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
entries = filterNCEntries(entries)
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
return entries, nil
|
||||
}
|
99
backend/union/policy/epall.go
Normal file
99
backend/union/policy/epall.go
Normal file
@ -0,0 +1,99 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("epall", &EpAll{})
|
||||
}
|
||||
|
||||
// EpAll stands for existing path, all
|
||||
// Action category: apply to all found.
|
||||
// Create category: apply to all found.
|
||||
// Search category: same as epff.
|
||||
type EpAll struct {
|
||||
EpFF
|
||||
}
|
||||
|
||||
func (p *EpAll) epall(ctx context.Context, upstreams []*upstream.Fs, filePath string) ([]*upstream.Fs, error) {
|
||||
var wg sync.WaitGroup
|
||||
ufs := make([]*upstream.Fs, len(upstreams))
|
||||
for i, u := range upstreams {
|
||||
wg.Add(1)
|
||||
i, u := i, u // Closure
|
||||
go func() {
|
||||
rfs := u.RootFs
|
||||
remote := path.Join(u.RootPath, filePath)
|
||||
if findEntry(ctx, rfs, remote) != nil {
|
||||
ufs[i] = u
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
var results []*upstream.Fs
|
||||
for _, f := range ufs {
|
||||
if f != nil {
|
||||
results = append(results, f)
|
||||
}
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *EpAll) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterRO(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
return p.epall(ctx, upstreams, path)
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *EpAll) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
entries = filterROEntries(entries)
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *EpAll) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
upstreams, err := p.epall(ctx, upstreams, path+"/..")
|
||||
return upstreams, err
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *EpAll) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
entries = filterNCEntries(entries)
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
return entries, nil
|
||||
}
|
115
backend/union/policy/epff.go
Normal file
115
backend/union/policy/epff.go
Normal file
@ -0,0 +1,115 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("epff", &EpFF{})
|
||||
}
|
||||
|
||||
// EpFF stands for existing path, first found
|
||||
// Given the order of the candidates, act on the first one found where the relative path exists.
|
||||
type EpFF struct{}
|
||||
|
||||
func (p *EpFF) epff(ctx context.Context, upstreams []*upstream.Fs, filePath string) (*upstream.Fs, error) {
|
||||
ch := make(chan *upstream.Fs)
|
||||
for _, u := range upstreams {
|
||||
u := u // Closure
|
||||
go func() {
|
||||
rfs := u.RootFs
|
||||
remote := path.Join(u.RootPath, filePath)
|
||||
if findEntry(ctx, rfs, remote) == nil {
|
||||
u = nil
|
||||
}
|
||||
ch <- u
|
||||
}()
|
||||
}
|
||||
var u *upstream.Fs
|
||||
for i := 0; i < len(upstreams); i++ {
|
||||
u = <-ch
|
||||
if u != nil {
|
||||
// close remaining goroutines
|
||||
go func(num int) {
|
||||
defer close(ch)
|
||||
for i := 0; i < num; i++ {
|
||||
<-ch
|
||||
}
|
||||
}(len(upstreams) - 1 - i)
|
||||
}
|
||||
}
|
||||
if u == nil {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *EpFF) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterRO(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.epff(ctx, upstreams, path)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *EpFF) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
entries = filterROEntries(entries)
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
return entries[:1], nil
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *EpFF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.epff(ctx, upstreams, path+"/..")
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *EpFF) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
entries = filterNCEntries(entries)
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
return entries[:1], nil
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *EpFF) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.epff(ctx, upstreams, path)
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *EpFF) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return entries[0], nil
|
||||
}
|
116
backend/union/policy/eplfs.go
Normal file
116
backend/union/policy/eplfs.go
Normal file
@ -0,0 +1,116 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("eplfs", &EpLfs{})
|
||||
}
|
||||
|
||||
// EpLfs stands for existing path, least free space
|
||||
// Of all the candidates on which the path exists choose the one with the least free space.
|
||||
type EpLfs struct {
|
||||
EpAll
|
||||
}
|
||||
|
||||
func (p *EpLfs) lfs(upstreams []*upstream.Fs) (*upstream.Fs, error) {
|
||||
var minFreeSpace int64 = math.MaxInt64
|
||||
var lfsupstream *upstream.Fs
|
||||
for _, u := range upstreams {
|
||||
space, err := u.GetFreeSpace()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Free Space is not supported for upstream %s, treating as infinite", u.Name())
|
||||
}
|
||||
if space < minFreeSpace {
|
||||
minFreeSpace = space
|
||||
lfsupstream = u
|
||||
}
|
||||
}
|
||||
if lfsupstream == nil {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return lfsupstream, nil
|
||||
}
|
||||
|
||||
func (p *EpLfs) lfsEntries(entries []upstream.Entry) (upstream.Entry, error) {
|
||||
var minFreeSpace int64
|
||||
var lfsEntry upstream.Entry
|
||||
for _, e := range entries {
|
||||
space, err := e.UpstreamFs().GetFreeSpace()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Free Space is not supported for upstream %s, treating as infinite", e.UpstreamFs().Name())
|
||||
}
|
||||
if space < minFreeSpace {
|
||||
minFreeSpace = space
|
||||
lfsEntry = e
|
||||
}
|
||||
}
|
||||
return lfsEntry, nil
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *EpLfs) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Action(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.lfs(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *EpLfs) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.ActionEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.lfsEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *EpLfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Create(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.lfs(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *EpLfs) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.CreateEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.lfsEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *EpLfs) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams, err := p.epall(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.lfs(upstreams)
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *EpLfs) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.lfsEntries(entries)
|
||||
}
|
116
backend/union/policy/eplno.go
Normal file
116
backend/union/policy/eplno.go
Normal file
@ -0,0 +1,116 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("eplno", &EpLno{})
|
||||
}
|
||||
|
||||
// EpLno stands for existing path, least number of objects
|
||||
// Of all the candidates on which the path exists choose the one with the least number of objects
|
||||
type EpLno struct {
|
||||
EpAll
|
||||
}
|
||||
|
||||
func (p *EpLno) lno(upstreams []*upstream.Fs) (*upstream.Fs, error) {
|
||||
var minNumObj int64 = math.MaxInt64
|
||||
var lnoUpstream *upstream.Fs
|
||||
for _, u := range upstreams {
|
||||
numObj, err := u.GetNumObjects()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Number of Objects is not supported for upstream %s, treating as 0", u.Name())
|
||||
}
|
||||
if minNumObj > numObj {
|
||||
minNumObj = numObj
|
||||
lnoUpstream = u
|
||||
}
|
||||
}
|
||||
if lnoUpstream == nil {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return lnoUpstream, nil
|
||||
}
|
||||
|
||||
func (p *EpLno) lnoEntries(entries []upstream.Entry) (upstream.Entry, error) {
|
||||
var minNumObj int64 = math.MaxInt64
|
||||
var lnoEntry upstream.Entry
|
||||
for _, e := range entries {
|
||||
numObj, err := e.UpstreamFs().GetNumObjects()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Number of Objects is not supported for upstream %s, treating as 0", e.UpstreamFs().Name())
|
||||
}
|
||||
if minNumObj > numObj {
|
||||
minNumObj = numObj
|
||||
lnoEntry = e
|
||||
}
|
||||
}
|
||||
return lnoEntry, nil
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *EpLno) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Action(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.lno(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *EpLno) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.ActionEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.lnoEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *EpLno) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Create(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.lno(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *EpLno) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.CreateEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.lnoEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *EpLno) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams, err := p.epall(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.lno(upstreams)
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *EpLno) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.lnoEntries(entries)
|
||||
}
|
116
backend/union/policy/eplus.go
Normal file
116
backend/union/policy/eplus.go
Normal file
@ -0,0 +1,116 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("eplus", &EpLus{})
|
||||
}
|
||||
|
||||
// EpLus stands for existing path, least used space
|
||||
// Of all the candidates on which the path exists choose the one with the least used space.
|
||||
type EpLus struct {
|
||||
EpAll
|
||||
}
|
||||
|
||||
func (p *EpLus) lus(upstreams []*upstream.Fs) (*upstream.Fs, error) {
|
||||
var minUsedSpace int64 = math.MaxInt64
|
||||
var lusupstream *upstream.Fs
|
||||
for _, u := range upstreams {
|
||||
space, err := u.GetUsedSpace()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Used Space is not supported for upstream %s, treating as 0", u.Name())
|
||||
}
|
||||
if space < minUsedSpace {
|
||||
minUsedSpace = space
|
||||
lusupstream = u
|
||||
}
|
||||
}
|
||||
if lusupstream == nil {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return lusupstream, nil
|
||||
}
|
||||
|
||||
func (p *EpLus) lusEntries(entries []upstream.Entry) (upstream.Entry, error) {
|
||||
var minUsedSpace int64
|
||||
var lusEntry upstream.Entry
|
||||
for _, e := range entries {
|
||||
space, err := e.UpstreamFs().GetFreeSpace()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Used Space is not supported for upstream %s, treating as 0", e.UpstreamFs().Name())
|
||||
}
|
||||
if space < minUsedSpace {
|
||||
minUsedSpace = space
|
||||
lusEntry = e
|
||||
}
|
||||
}
|
||||
return lusEntry, nil
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *EpLus) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Action(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.lus(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *EpLus) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.ActionEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.lusEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *EpLus) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Create(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.lus(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *EpLus) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.CreateEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.lusEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *EpLus) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams, err := p.epall(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.lus(upstreams)
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *EpLus) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.lusEntries(entries)
|
||||
}
|
115
backend/union/policy/epmfs.go
Normal file
115
backend/union/policy/epmfs.go
Normal file
@ -0,0 +1,115 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("epmfs", &EpMfs{})
|
||||
}
|
||||
|
||||
// EpMfs stands for existing path, most free space
|
||||
// Of all the candidates on which the path exists choose the one with the most free space.
|
||||
type EpMfs struct {
|
||||
EpAll
|
||||
}
|
||||
|
||||
func (p *EpMfs) mfs(upstreams []*upstream.Fs) (*upstream.Fs, error) {
|
||||
var maxFreeSpace int64
|
||||
var mfsupstream *upstream.Fs
|
||||
for _, u := range upstreams {
|
||||
space, err := u.GetFreeSpace()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Free Space is not supported for upstream %s, treating as infinite", u.Name())
|
||||
}
|
||||
if maxFreeSpace < space {
|
||||
maxFreeSpace = space
|
||||
mfsupstream = u
|
||||
}
|
||||
}
|
||||
if mfsupstream == nil {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return mfsupstream, nil
|
||||
}
|
||||
|
||||
func (p *EpMfs) mfsEntries(entries []upstream.Entry) (upstream.Entry, error) {
|
||||
var maxFreeSpace int64
|
||||
var mfsEntry upstream.Entry
|
||||
for _, e := range entries {
|
||||
space, err := e.UpstreamFs().GetFreeSpace()
|
||||
if err != nil {
|
||||
fs.LogPrintf(fs.LogLevelNotice, nil,
|
||||
"Free Space is not supported for upstream %s, treating as infinite", e.UpstreamFs().Name())
|
||||
}
|
||||
if maxFreeSpace < space {
|
||||
maxFreeSpace = space
|
||||
mfsEntry = e
|
||||
}
|
||||
}
|
||||
return mfsEntry, nil
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *EpMfs) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Action(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.mfs(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *EpMfs) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.ActionEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.mfsEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *EpMfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Create(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := p.mfs(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *EpMfs) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.CreateEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := p.mfsEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *EpMfs) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams, err := p.epall(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.mfs(upstreams)
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *EpMfs) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.mfsEntries(entries)
|
||||
}
|
86
backend/union/policy/eprand.go
Normal file
86
backend/union/policy/eprand.go
Normal file
@ -0,0 +1,86 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("eprand", &EpRand{})
|
||||
}
|
||||
|
||||
// EpRand stands for existing path, random
|
||||
// Calls epall and then randomizes. Returns one candidate.
|
||||
type EpRand struct {
|
||||
EpAll
|
||||
}
|
||||
|
||||
func (p *EpRand) rand(upstreams []*upstream.Fs) *upstream.Fs {
|
||||
rand.Seed(time.Now().Unix())
|
||||
return upstreams[rand.Intn(len(upstreams))]
|
||||
}
|
||||
|
||||
func (p *EpRand) randEntries(entries []upstream.Entry) upstream.Entry {
|
||||
rand.Seed(time.Now().Unix())
|
||||
return entries[rand.Intn(len(entries))]
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *EpRand) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Action(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*upstream.Fs{p.rand(upstreams)}, nil
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *EpRand) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.ActionEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []upstream.Entry{p.randEntries(entries)}, nil
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *EpRand) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.EpAll.Create(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*upstream.Fs{p.rand(upstreams)}, nil
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *EpRand) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.EpAll.CreateEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []upstream.Entry{p.randEntries(entries)}, nil
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *EpRand) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams, err := p.epall(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.rand(upstreams), nil
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *EpRand) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.randEntries(entries), nil
|
||||
}
|
32
backend/union/policy/ff.go
Normal file
32
backend/union/policy/ff.go
Normal file
@ -0,0 +1,32 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("ff", &FF{})
|
||||
}
|
||||
|
||||
// FF stands for first found
|
||||
// Search category: same as epff.
|
||||
// Action category: same as epff.
|
||||
// Create category: Given the order of the candiates, act on the first one found.
|
||||
type FF struct {
|
||||
EpFF
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *FF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return upstreams, fs.ErrorPermissionDenied
|
||||
}
|
||||
return upstreams[:1], nil
|
||||
}
|
33
backend/union/policy/lfs.go
Normal file
33
backend/union/policy/lfs.go
Normal file
@ -0,0 +1,33 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("lfs", &Lfs{})
|
||||
}
|
||||
|
||||
// Lfs stands for least free space
|
||||
// Search category: same as eplfs.
|
||||
// Action category: same as eplfs.
|
||||
// Create category: Pick the drive with the least free space.
|
||||
type Lfs struct {
|
||||
EpLfs
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *Lfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.lfs(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
33
backend/union/policy/lno.go
Normal file
33
backend/union/policy/lno.go
Normal file
@ -0,0 +1,33 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("lno", &Lno{})
|
||||
}
|
||||
|
||||
// Lno stands for least number of objects
|
||||
// Search category: same as eplno.
|
||||
// Action category: same as eplno.
|
||||
// Create category: Pick the drive with the least number of objects.
|
||||
type Lno struct {
|
||||
EpLno
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *Lno) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.lno(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
33
backend/union/policy/lus.go
Normal file
33
backend/union/policy/lus.go
Normal file
@ -0,0 +1,33 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("lus", &Lus{})
|
||||
}
|
||||
|
||||
// Lus stands for least used space
|
||||
// Search category: same as eplus.
|
||||
// Action category: same as eplus.
|
||||
// Create category: Pick the drive with the least used space.
|
||||
type Lus struct {
|
||||
EpLus
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *Lus) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.lus(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
33
backend/union/policy/mfs.go
Normal file
33
backend/union/policy/mfs.go
Normal file
@ -0,0 +1,33 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("mfs", &Mfs{})
|
||||
}
|
||||
|
||||
// Mfs stands for most free space
|
||||
// Search category: same as epmfs.
|
||||
// Action category: same as epmfs.
|
||||
// Create category: Pick the drive with the most free space.
|
||||
type Mfs struct {
|
||||
EpMfs
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *Mfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.mfs(upstreams)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
149
backend/union/policy/newest.go
Normal file
149
backend/union/policy/newest.go
Normal file
@ -0,0 +1,149 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("newest", &Newest{})
|
||||
}
|
||||
|
||||
// Newest policy picks the file / directory with the largest mtime
|
||||
// It implies the existance of a path
|
||||
type Newest struct {
|
||||
EpAll
|
||||
}
|
||||
|
||||
func (p *Newest) newest(ctx context.Context, upstreams []*upstream.Fs, filePath string) (*upstream.Fs, error) {
|
||||
var wg sync.WaitGroup
|
||||
ufs := make([]*upstream.Fs, len(upstreams))
|
||||
mtimes := make([]time.Time, len(upstreams))
|
||||
for i, u := range upstreams {
|
||||
wg.Add(1)
|
||||
i, u := i, u // Closure
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
rfs := u.RootFs
|
||||
remote := path.Join(u.RootPath, filePath)
|
||||
if e := findEntry(ctx, rfs, remote); e != nil {
|
||||
ufs[i] = u
|
||||
mtimes[i] = e.ModTime(ctx)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
maxMtime := time.Time{}
|
||||
var newestFs *upstream.Fs
|
||||
for i, u := range ufs {
|
||||
if u != nil && mtimes[i].After(maxMtime) {
|
||||
maxMtime = mtimes[i]
|
||||
newestFs = u
|
||||
}
|
||||
}
|
||||
if newestFs == nil {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return newestFs, nil
|
||||
}
|
||||
|
||||
func (p *Newest) newestEntries(entries []upstream.Entry) (upstream.Entry, error) {
|
||||
var wg sync.WaitGroup
|
||||
mtimes := make([]time.Time, len(entries))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
for i, e := range entries {
|
||||
wg.Add(1)
|
||||
i, e := i, e // Closure
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
mtimes[i] = e.ModTime(ctx)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
maxMtime := time.Time{}
|
||||
var newestEntry upstream.Entry
|
||||
for i, t := range mtimes {
|
||||
if t.After(maxMtime) {
|
||||
maxMtime = t
|
||||
newestEntry = entries[i]
|
||||
}
|
||||
}
|
||||
if newestEntry == nil {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return newestEntry, nil
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *Newest) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterRO(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.newest(ctx, upstreams, path)
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *Newest) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
entries = filterROEntries(entries)
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
e, err := p.newestEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *Newest) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams = filterNC(upstreams)
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
u, err := p.newest(ctx, upstreams, path+"/..")
|
||||
return []*upstream.Fs{u}, err
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *Newest) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
entries = filterNCEntries(entries)
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
e, err := p.newestEntries(entries)
|
||||
return []upstream.Entry{e}, err
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *Newest) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.newest(ctx, upstreams, path)
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *Newest) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.newestEntries(entries)
|
||||
}
|
129
backend/union/policy/policy.go
Normal file
129
backend/union/policy/policy.go
Normal file
@ -0,0 +1,129 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
var policies = make(map[string]Policy)
|
||||
|
||||
// Policy is the interface of a set of defined behavior choosing
|
||||
// the upstream Fs to operate on
|
||||
type Policy interface {
|
||||
// Action category policy, governing the modification of files and directories
|
||||
Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error)
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error)
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error)
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error)
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error)
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
SearchEntries(entries ...upstream.Entry) (upstream.Entry, error)
|
||||
}
|
||||
|
||||
func registerPolicy(name string, p Policy) {
|
||||
policies[strings.ToLower(name)] = p
|
||||
}
|
||||
|
||||
// Get a Policy from the list
|
||||
func Get(name string) (Policy, error) {
|
||||
p, ok := policies[strings.ToLower(name)]
|
||||
if !ok {
|
||||
return nil, errors.Errorf("didn't find policy called %q", name)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func filterRO(ufs []*upstream.Fs) (wufs []*upstream.Fs) {
|
||||
for _, u := range ufs {
|
||||
if u.IsWritable() {
|
||||
wufs = append(wufs, u)
|
||||
}
|
||||
}
|
||||
return wufs
|
||||
}
|
||||
|
||||
func filterROEntries(ue []upstream.Entry) (wue []upstream.Entry) {
|
||||
for _, e := range ue {
|
||||
if e.UpstreamFs().IsWritable() {
|
||||
wue = append(wue, e)
|
||||
}
|
||||
}
|
||||
return wue
|
||||
}
|
||||
|
||||
func filterNC(ufs []*upstream.Fs) (wufs []*upstream.Fs) {
|
||||
for _, u := range ufs {
|
||||
if u.IsCreatable() {
|
||||
wufs = append(wufs, u)
|
||||
}
|
||||
}
|
||||
return wufs
|
||||
}
|
||||
|
||||
func filterNCEntries(ue []upstream.Entry) (wue []upstream.Entry) {
|
||||
for _, e := range ue {
|
||||
if e.UpstreamFs().IsCreatable() {
|
||||
wue = append(wue, e)
|
||||
}
|
||||
}
|
||||
return wue
|
||||
}
|
||||
|
||||
func parentDir(absPath string) string {
|
||||
parent := path.Dir(strings.TrimRight(absPath, "/"))
|
||||
if parent == "." {
|
||||
parent = ""
|
||||
}
|
||||
return parent
|
||||
}
|
||||
|
||||
func clean(absPath string) string {
|
||||
cleanPath := path.Clean(absPath)
|
||||
if cleanPath == "." {
|
||||
cleanPath = ""
|
||||
}
|
||||
return cleanPath
|
||||
}
|
||||
|
||||
func findEntry(ctx context.Context, f fs.Fs, remote string) fs.DirEntry {
|
||||
remote = clean(remote)
|
||||
dir := parentDir(remote)
|
||||
entries, err := f.List(ctx, dir)
|
||||
if remote == dir {
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// random modtime for root
|
||||
randomNow := time.Unix(time.Now().Unix()-rand.Int63n(10000), 0)
|
||||
return fs.NewDir("", randomNow)
|
||||
}
|
||||
found := false
|
||||
for _, e := range entries {
|
||||
eRemote := e.Remote()
|
||||
if f.Features().CaseInsensitive {
|
||||
found = strings.EqualFold(remote, eRemote)
|
||||
} else {
|
||||
found = (remote == eRemote)
|
||||
}
|
||||
if found {
|
||||
return e
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
83
backend/union/policy/rand.go
Normal file
83
backend/union/policy/rand.go
Normal file
@ -0,0 +1,83 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPolicy("rand", &Rand{})
|
||||
}
|
||||
|
||||
// Rand stands for random
|
||||
// Calls all and then randomizes. Returns one candidate.
|
||||
type Rand struct {
|
||||
All
|
||||
}
|
||||
|
||||
func (p *Rand) rand(upstreams []*upstream.Fs) *upstream.Fs {
|
||||
return upstreams[rand.Intn(len(upstreams))]
|
||||
}
|
||||
|
||||
func (p *Rand) randEntries(entries []upstream.Entry) upstream.Entry {
|
||||
return entries[rand.Intn(len(entries))]
|
||||
}
|
||||
|
||||
// Action category policy, governing the modification of files and directories
|
||||
func (p *Rand) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.All.Action(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*upstream.Fs{p.rand(upstreams)}, nil
|
||||
}
|
||||
|
||||
// ActionEntries is ACTION category policy but receving a set of candidate entries
|
||||
func (p *Rand) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.All.ActionEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []upstream.Entry{p.randEntries(entries)}, nil
|
||||
}
|
||||
|
||||
// Create category policy, governing the creation of files and directories
|
||||
func (p *Rand) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
|
||||
upstreams, err := p.All.Create(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*upstream.Fs{p.rand(upstreams)}, nil
|
||||
}
|
||||
|
||||
// CreateEntries is CREATE category policy but receving a set of candidate entries
|
||||
func (p *Rand) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
entries, err := p.All.CreateEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []upstream.Entry{p.randEntries(entries)}, nil
|
||||
}
|
||||
|
||||
// Search category policy, governing the access to files and directories
|
||||
func (p *Rand) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
|
||||
if len(upstreams) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
upstreams, err := p.epall(ctx, upstreams, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.rand(upstreams), nil
|
||||
}
|
||||
|
||||
// SearchEntries is SEARCH category policy but receving a set of candidate entries
|
||||
func (p *Rand) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return p.randEntries(entries), nil
|
||||
}
|
@ -1,17 +1,20 @@
|
||||
package union
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rclone/rclone/backend/union/policy"
|
||||
"github.com/rclone/rclone/backend/union/upstream"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/config/configmap"
|
||||
"github.com/rclone/rclone/fs/config/configstruct"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
@ -21,12 +24,32 @@ import (
|
||||
func init() {
|
||||
fsi := &fs.RegInfo{
|
||||
Name: "union",
|
||||
Description: "Union merges the contents of several remotes",
|
||||
Description: "Union merges the contents of several upstream fs",
|
||||
NewFs: NewFs,
|
||||
Options: []fs.Option{{
|
||||
Name: "remotes",
|
||||
Help: "List of space separated remotes.\nCan be 'remotea:test/dir remoteb:', '\"remotea:test/space dir\" remoteb:', etc.\nThe last remote is used to write to.",
|
||||
Name: "upstreams",
|
||||
Help: "List of space separated upstreams.\nCan be 'upstreama:test/dir upstreamb:', '\"upstreama:test/space:ro dir\" upstreamb:', etc.\n",
|
||||
Required: true,
|
||||
}, {
|
||||
Name: "action_policy",
|
||||
Help: "Policy to choose upstream on ACTION category.",
|
||||
Required: true,
|
||||
Default: "epall",
|
||||
}, {
|
||||
Name: "create_policy",
|
||||
Help: "Policy to choose upstream on CREATE category.",
|
||||
Required: true,
|
||||
Default: "epmfs",
|
||||
}, {
|
||||
Name: "search_policy",
|
||||
Help: "Policy to choose upstream on SEARCH category.",
|
||||
Required: true,
|
||||
Default: "ff",
|
||||
}, {
|
||||
Name: "cache_time",
|
||||
Help: "Cache time of usage and free space (in seconds). This option is only useful when a path preserving policy is used.",
|
||||
Required: true,
|
||||
Default: 120,
|
||||
}},
|
||||
}
|
||||
fs.Register(fsi)
|
||||
@ -34,39 +57,48 @@ func init() {
|
||||
|
||||
// Options defines the configuration for this backend
|
||||
type Options struct {
|
||||
Remotes fs.SpaceSepList `config:"remotes"`
|
||||
Upstreams fs.SpaceSepList `config:"upstreams"`
|
||||
Remotes fs.SpaceSepList `config:"remotes"` // Depreated
|
||||
ActionPolicy string `config:"action_policy"`
|
||||
CreatePolicy string `config:"create_policy"`
|
||||
SearchPolicy string `config:"search_policy"`
|
||||
CacheTime int `config:"cache_time"`
|
||||
}
|
||||
|
||||
// Fs represents a union of remotes
|
||||
// Fs represents a union of upstreams
|
||||
type Fs struct {
|
||||
name string // name of this remote
|
||||
features *fs.Features // optional features
|
||||
opt Options // options for this Fs
|
||||
root string // the path we are working on
|
||||
remotes []fs.Fs // slice of remotes
|
||||
wr fs.Fs // writable remote
|
||||
hashSet hash.Set // intersection of hash types
|
||||
name string // name of this remote
|
||||
features *fs.Features // optional features
|
||||
opt Options // options for this Fs
|
||||
root string // the path we are working on
|
||||
upstreams []*upstream.Fs // slice of upstreams
|
||||
hashSet hash.Set // intersection of hash types
|
||||
actionPolicy policy.Policy // policy for ACTION
|
||||
createPolicy policy.Policy // policy for CREATE
|
||||
searchPolicy policy.Policy // policy for SEARCH
|
||||
}
|
||||
|
||||
// Object describes a union Object
|
||||
//
|
||||
// This is a wrapped object which returns the Union Fs as its parent
|
||||
type Object struct {
|
||||
fs.Object
|
||||
fs *Fs // what this object is part of
|
||||
}
|
||||
|
||||
// Wrap an existing object in the union Object
|
||||
func (f *Fs) wrapObject(o fs.Object) *Object {
|
||||
return &Object{
|
||||
Object: o,
|
||||
fs: f,
|
||||
// Wrap candidate objects in to an union Object
|
||||
func (f *Fs) wrapEntries(entries ...upstream.Entry) (entry, error) {
|
||||
e, err := f.searchEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch e.(type) {
|
||||
case *upstream.Object:
|
||||
return &Object{
|
||||
Object: e.(*upstream.Object),
|
||||
fs: f,
|
||||
co: entries,
|
||||
}, nil
|
||||
case *upstream.Directory:
|
||||
return &Directory{
|
||||
Directory: e.(*upstream.Directory),
|
||||
cd: entries,
|
||||
}, nil
|
||||
default:
|
||||
return nil, errors.Errorf("unknown object type %T", e)
|
||||
}
|
||||
}
|
||||
|
||||
// Fs returns the union Fs as the parent
|
||||
func (o *Object) Fs() fs.Info {
|
||||
return o.fs
|
||||
}
|
||||
|
||||
// Name of the remote (as passed into NewFs)
|
||||
@ -91,7 +123,16 @@ func (f *Fs) Features() *fs.Features {
|
||||
|
||||
// Rmdir removes the root directory of the Fs object
|
||||
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||
return f.wr.Rmdir(ctx, dir)
|
||||
upstreams, err := f.action(ctx, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errs := Errors(make([]error, len(upstreams)))
|
||||
multithread(len(upstreams), func(i int) {
|
||||
err := upstreams[i].Rmdir(ctx, dir)
|
||||
errs[i] = errors.Wrap(err, upstreams[i].Name())
|
||||
})
|
||||
return errs.Err()
|
||||
}
|
||||
|
||||
// Hashes returns hash.HashNone to indicate remote hashing is unavailable
|
||||
@ -101,7 +142,22 @@ func (f *Fs) Hashes() hash.Set {
|
||||
|
||||
// Mkdir makes the root directory of the Fs object
|
||||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||
return f.wr.Mkdir(ctx, dir)
|
||||
upstreams, err := f.create(ctx, dir)
|
||||
if err == fs.ErrorObjectNotFound && dir != parentDir(dir) {
|
||||
if err := f.Mkdir(ctx, parentDir(dir)); err != nil {
|
||||
return err
|
||||
}
|
||||
upstreams, err = f.create(ctx, dir)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errs := Errors(make([]error, len(upstreams)))
|
||||
multithread(len(upstreams), func(i int) {
|
||||
err := upstreams[i].Mkdir(ctx, dir)
|
||||
errs[i] = errors.Wrap(err, upstreams[i].Name())
|
||||
})
|
||||
return errs.Err()
|
||||
}
|
||||
|
||||
// Purge all files in the root and the root directory
|
||||
@ -111,7 +167,21 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||
//
|
||||
// Return an error if it doesn't exist
|
||||
func (f *Fs) Purge(ctx context.Context) error {
|
||||
return f.wr.Features().Purge(ctx)
|
||||
for _, r := range f.upstreams {
|
||||
if r.Features().Purge == nil {
|
||||
return fs.ErrorCantPurge
|
||||
}
|
||||
}
|
||||
upstreams, err := f.action(ctx, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errs := Errors(make([]error, len(upstreams)))
|
||||
multithread(len(upstreams), func(i int) {
|
||||
err := upstreams[i].Features().Purge(ctx)
|
||||
errs[i] = errors.Wrap(err, upstreams[i].Name())
|
||||
})
|
||||
return errs.Err()
|
||||
}
|
||||
|
||||
// Copy src to this remote using server side copy operations.
|
||||
@ -124,15 +194,26 @@ func (f *Fs) Purge(ctx context.Context) error {
|
||||
//
|
||||
// If it isn't possible then return fs.ErrorCantCopy
|
||||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||
if src.Fs() != f.wr {
|
||||
srcObj, ok := src.(*Object)
|
||||
if !ok {
|
||||
fs.Debugf(src, "Can't copy - not same remote type")
|
||||
return nil, fs.ErrorCantCopy
|
||||
}
|
||||
o, err := f.wr.Features().Copy(ctx, src, remote)
|
||||
if err != nil {
|
||||
o := srcObj.UnWrap()
|
||||
u := o.UpstreamFs()
|
||||
do := u.Features().Copy
|
||||
if do == nil {
|
||||
return nil, fs.ErrorCantCopy
|
||||
}
|
||||
if !u.IsCreatable() {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
co, err := do(ctx, o, remote)
|
||||
if err != nil || co == nil {
|
||||
return nil, err
|
||||
}
|
||||
return f.wrapObject(o), nil
|
||||
wo, err := f.wrapEntries(u.WrapObject(co))
|
||||
return wo.(*Object), err
|
||||
}
|
||||
|
||||
// Move src to this remote using server side move operations.
|
||||
@ -145,15 +226,47 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||
//
|
||||
// If it isn't possible then return fs.ErrorCantMove
|
||||
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||
if src.Fs() != f.wr {
|
||||
o, ok := src.(*Object)
|
||||
if !ok {
|
||||
fs.Debugf(src, "Can't move - not same remote type")
|
||||
return nil, fs.ErrorCantMove
|
||||
}
|
||||
o, err := f.wr.Features().Move(ctx, src, remote)
|
||||
entries, err := f.actionEntries(o.candidates()...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f.wrapObject(o), err
|
||||
for _, e := range entries {
|
||||
if e.UpstreamFs().Features().Move == nil {
|
||||
return nil, fs.ErrorCantMove
|
||||
}
|
||||
}
|
||||
objs := make([]*upstream.Object, len(entries))
|
||||
errs := Errors(make([]error, len(entries)))
|
||||
multithread(len(entries), func(i int) {
|
||||
u := entries[i].UpstreamFs()
|
||||
o, ok := entries[i].(*upstream.Object)
|
||||
if !ok {
|
||||
errs[i] = errors.Wrap(fs.ErrorNotAFile, u.Name())
|
||||
return
|
||||
}
|
||||
mo, err := u.Features().Move(ctx, o.UnWrap(), remote)
|
||||
if err != nil || mo == nil {
|
||||
errs[i] = errors.Wrap(err, u.Name())
|
||||
return
|
||||
}
|
||||
objs[i] = u.WrapObject(mo)
|
||||
})
|
||||
var en []upstream.Entry
|
||||
for _, o := range objs {
|
||||
if o != nil {
|
||||
en = append(en, o)
|
||||
}
|
||||
}
|
||||
e, err := f.wrapEntries(en...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return e.(*Object), errs.Err()
|
||||
}
|
||||
|
||||
// DirMove moves src, srcRemote to this remote at dstRemote
|
||||
@ -165,12 +278,46 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||
//
|
||||
// If destination exists then return fs.ErrorDirExists
|
||||
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
|
||||
srcFs, ok := src.(*Fs)
|
||||
sfs, ok := src.(*Fs)
|
||||
if !ok {
|
||||
fs.Debugf(srcFs, "Can't move directory - not same remote type")
|
||||
fs.Debugf(src, "Can't move directory - not same remote type")
|
||||
return fs.ErrorCantDirMove
|
||||
}
|
||||
return f.wr.Features().DirMove(ctx, srcFs.wr, srcRemote, dstRemote)
|
||||
upstreams, err := sfs.action(ctx, srcRemote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, u := range upstreams {
|
||||
if u.Features().DirMove == nil {
|
||||
return fs.ErrorCantDirMove
|
||||
}
|
||||
}
|
||||
errs := Errors(make([]error, len(upstreams)))
|
||||
multithread(len(upstreams), func(i int) {
|
||||
su := upstreams[i]
|
||||
var du *upstream.Fs
|
||||
for _, u := range f.upstreams {
|
||||
if u.RootFs.Root() == su.RootFs.Root() {
|
||||
du = u
|
||||
}
|
||||
}
|
||||
if du == nil {
|
||||
errs[i] = errors.Wrap(fs.ErrorCantDirMove, su.Name()+":"+su.Root())
|
||||
return
|
||||
}
|
||||
err := du.Features().DirMove(ctx, su.Fs, srcRemote, dstRemote)
|
||||
errs[i] = errors.Wrap(err, du.Name()+":"+du.Root())
|
||||
})
|
||||
errs = errs.FilterNil()
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, e := range errs {
|
||||
if errors.Cause(e) != fs.ErrorDirExists {
|
||||
return errs
|
||||
}
|
||||
}
|
||||
return fs.ErrorDirExists
|
||||
}
|
||||
|
||||
// ChangeNotify calls the passed function with a path
|
||||
@ -183,23 +330,23 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
|
||||
// regularly. When the channel gets closed, the implementation
|
||||
// should stop polling and release resources.
|
||||
func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch <-chan time.Duration) {
|
||||
var remoteChans []chan time.Duration
|
||||
var uChans []chan time.Duration
|
||||
|
||||
for _, remote := range f.remotes {
|
||||
if ChangeNotify := remote.Features().ChangeNotify; ChangeNotify != nil {
|
||||
for _, u := range f.upstreams {
|
||||
if ChangeNotify := u.Features().ChangeNotify; ChangeNotify != nil {
|
||||
ch := make(chan time.Duration)
|
||||
remoteChans = append(remoteChans, ch)
|
||||
uChans = append(uChans, ch)
|
||||
ChangeNotify(ctx, fn, ch)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for i := range ch {
|
||||
for _, c := range remoteChans {
|
||||
for _, c := range uChans {
|
||||
c <- i
|
||||
}
|
||||
}
|
||||
for _, c := range remoteChans {
|
||||
for _, c := range uChans {
|
||||
close(c)
|
||||
}
|
||||
}()
|
||||
@ -208,10 +355,103 @@ func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch
|
||||
// DirCacheFlush resets the directory cache - used in testing
|
||||
// as an optional interface
|
||||
func (f *Fs) DirCacheFlush() {
|
||||
for _, remote := range f.remotes {
|
||||
if DirCacheFlush := remote.Features().DirCacheFlush; DirCacheFlush != nil {
|
||||
DirCacheFlush()
|
||||
multithread(len(f.upstreams), func(i int) {
|
||||
if do := f.upstreams[i].Features().DirCacheFlush; do != nil {
|
||||
do()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
|
||||
srcPath := src.Remote()
|
||||
upstreams, err := f.create(ctx, srcPath)
|
||||
if err == fs.ErrorObjectNotFound {
|
||||
if err := f.Mkdir(ctx, parentDir(srcPath)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
upstreams, err = f.create(ctx, srcPath)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(upstreams) == 1 {
|
||||
u := upstreams[0]
|
||||
var o fs.Object
|
||||
var err error
|
||||
if stream {
|
||||
o, err = u.Features().PutStream(ctx, in, src, options...)
|
||||
} else {
|
||||
o, err = u.Put(ctx, in, src, options...)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := f.wrapEntries(u.WrapObject(o))
|
||||
return e.(*Object), err
|
||||
}
|
||||
errs := Errors(make([]error, len(upstreams)+1))
|
||||
// Get multiple reader
|
||||
readers := make([]io.Reader, len(upstreams))
|
||||
writers := make([]io.Writer, len(upstreams))
|
||||
for i := range writers {
|
||||
r, w := io.Pipe()
|
||||
bw := bufio.NewWriter(w)
|
||||
readers[i], writers[i] = r, bw
|
||||
defer func() {
|
||||
err := w.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
mw := io.MultiWriter(writers...)
|
||||
es := make([]error, len(writers)+1)
|
||||
_, es[len(es)-1] = io.Copy(mw, in)
|
||||
for i, bw := range writers {
|
||||
es[i] = bw.(*bufio.Writer).Flush()
|
||||
}
|
||||
errs[len(upstreams)] = Errors(es).Err()
|
||||
}()
|
||||
// Multi-threading
|
||||
objs := make([]upstream.Entry, len(upstreams))
|
||||
multithread(len(upstreams), func(i int) {
|
||||
u := upstreams[i]
|
||||
var o fs.Object
|
||||
var err error
|
||||
if stream {
|
||||
o, err = u.Features().PutStream(ctx, readers[i], src, options...)
|
||||
} else {
|
||||
o, err = u.Put(ctx, readers[i], src, options...)
|
||||
}
|
||||
if err != nil {
|
||||
errs[i] = errors.Wrap(err, u.Name())
|
||||
return
|
||||
}
|
||||
objs[i] = u.WrapObject(o)
|
||||
})
|
||||
err = errs.Err()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e, err := f.wrapEntries(objs...)
|
||||
return e.(*Object), err
|
||||
}
|
||||
|
||||
// Put in to the remote path with the modTime given of the given size
|
||||
//
|
||||
// May create the object even if it returns an error - if so
|
||||
// will return the object and the error, otherwise will return
|
||||
// nil and the error
|
||||
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
o, err := f.NewObject(ctx, src.Remote())
|
||||
switch err {
|
||||
case nil:
|
||||
return o, o.Update(ctx, in, src, options...)
|
||||
case fs.ErrorObjectNotFound:
|
||||
return f.put(ctx, in, src, false, options...)
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,29 +461,64 @@ func (f *Fs) DirCacheFlush() {
|
||||
// will return the object and the error, otherwise will return
|
||||
// nil and the error
|
||||
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
o, err := f.wr.Features().PutStream(ctx, in, src, options...)
|
||||
if err != nil {
|
||||
o, err := f.NewObject(ctx, src.Remote())
|
||||
switch err {
|
||||
case nil:
|
||||
return o, o.Update(ctx, in, src, options...)
|
||||
case fs.ErrorObjectNotFound:
|
||||
return f.put(ctx, in, src, true, options...)
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
return f.wrapObject(o), err
|
||||
}
|
||||
|
||||
// About gets quota information from the Fs
|
||||
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
||||
return f.wr.Features().About(ctx)
|
||||
}
|
||||
|
||||
// Put in to the remote path with the modTime given of the given size
|
||||
//
|
||||
// May create the object even if it returns an error - if so
|
||||
// will return the object and the error, otherwise will return
|
||||
// nil and the error
|
||||
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
o, err := f.wr.Put(ctx, in, src, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
usage := &fs.Usage{
|
||||
Total: new(int64),
|
||||
Used: new(int64),
|
||||
Trashed: new(int64),
|
||||
Other: new(int64),
|
||||
Free: new(int64),
|
||||
Objects: new(int64),
|
||||
}
|
||||
return f.wrapObject(o), err
|
||||
for _, u := range f.upstreams {
|
||||
usg, err := u.About(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if usg.Total != nil && usage.Total != nil {
|
||||
*usage.Total += *usg.Total
|
||||
} else {
|
||||
usage.Total = nil
|
||||
}
|
||||
if usg.Used != nil && usage.Used != nil {
|
||||
*usage.Used += *usg.Used
|
||||
} else {
|
||||
usage.Used = nil
|
||||
}
|
||||
if usg.Trashed != nil && usage.Trashed != nil {
|
||||
*usage.Trashed += *usg.Trashed
|
||||
} else {
|
||||
usage.Trashed = nil
|
||||
}
|
||||
if usg.Other != nil && usage.Other != nil {
|
||||
*usage.Other += *usg.Other
|
||||
} else {
|
||||
usage.Other = nil
|
||||
}
|
||||
if usg.Free != nil && usage.Free != nil {
|
||||
*usage.Free += *usg.Free
|
||||
} else {
|
||||
usage.Free = nil
|
||||
}
|
||||
if usg.Objects != nil && usage.Objects != nil {
|
||||
*usage.Objects += *usg.Objects
|
||||
} else {
|
||||
usage.Objects = nil
|
||||
}
|
||||
}
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
// List the objects and directories in dir into entries. The
|
||||
@ -256,60 +531,125 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
|
||||
// This should return ErrDirNotFound if the directory isn't
|
||||
// found.
|
||||
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||
set := make(map[string]fs.DirEntry)
|
||||
found := false
|
||||
for _, remote := range f.remotes {
|
||||
var remoteEntries, err = remote.List(ctx, dir)
|
||||
if err == fs.ErrorDirNotFound {
|
||||
continue
|
||||
}
|
||||
entriess := make([][]upstream.Entry, len(f.upstreams))
|
||||
errs := Errors(make([]error, len(f.upstreams)))
|
||||
multithread(len(f.upstreams), func(i int) {
|
||||
u := f.upstreams[i]
|
||||
entries, err := u.List(ctx, dir)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "List failed on %v", remote)
|
||||
errs[i] = errors.Wrap(err, u.Name())
|
||||
return
|
||||
}
|
||||
found = true
|
||||
for _, remoteEntry := range remoteEntries {
|
||||
set[remoteEntry.Remote()] = remoteEntry
|
||||
uEntries := make([]upstream.Entry, len(entries))
|
||||
for j, e := range entries {
|
||||
uEntries[j], _ = u.WrapEntry(e)
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil, fs.ErrorDirNotFound
|
||||
}
|
||||
for _, entry := range set {
|
||||
if o, ok := entry.(fs.Object); ok {
|
||||
entry = f.wrapObject(o)
|
||||
entriess[i] = uEntries
|
||||
})
|
||||
if len(errs) == len(errs.FilterNil()) {
|
||||
errs = errs.Map(func(e error) error {
|
||||
if errors.Cause(e) == fs.ErrorDirNotFound {
|
||||
return nil
|
||||
}
|
||||
return e
|
||||
})
|
||||
if len(errs) == 0 {
|
||||
return nil, fs.ErrorDirNotFound
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
return nil, errs.Err()
|
||||
}
|
||||
return entries, nil
|
||||
return f.mergeDirEntries(entriess)
|
||||
}
|
||||
|
||||
// NewObject creates a new remote union file object based on the first Object it finds (reverse remote order)
|
||||
func (f *Fs) NewObject(ctx context.Context, path string) (fs.Object, error) {
|
||||
for i := range f.remotes {
|
||||
var remote = f.remotes[len(f.remotes)-i-1]
|
||||
var obj, err = remote.NewObject(ctx, path)
|
||||
if err == fs.ErrorObjectNotFound {
|
||||
continue
|
||||
// NewObject creates a new remote union file object
|
||||
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
objs := make([]*upstream.Object, len(f.upstreams))
|
||||
errs := Errors(make([]error, len(f.upstreams)))
|
||||
multithread(len(f.upstreams), func(i int) {
|
||||
u := f.upstreams[i]
|
||||
o, err := u.NewObject(ctx, remote)
|
||||
if err != nil && err != fs.ErrorObjectNotFound {
|
||||
errs[i] = errors.Wrap(err, u.Name())
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "NewObject failed on %v", remote)
|
||||
objs[i] = u.WrapObject(o)
|
||||
})
|
||||
var entries []upstream.Entry
|
||||
for _, o := range objs {
|
||||
if o != nil {
|
||||
entries = append(entries, o)
|
||||
}
|
||||
return f.wrapObject(obj), nil
|
||||
}
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
if len(entries) == 0 {
|
||||
return nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
e, err := f.wrapEntries(entries...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return e.(*Object), errs.Err()
|
||||
}
|
||||
|
||||
// Precision is the greatest Precision of all remotes
|
||||
// Precision is the greatest Precision of all upstreams
|
||||
func (f *Fs) Precision() time.Duration {
|
||||
var greatestPrecision time.Duration
|
||||
for _, remote := range f.remotes {
|
||||
if remote.Precision() > greatestPrecision {
|
||||
greatestPrecision = remote.Precision()
|
||||
for _, u := range f.upstreams {
|
||||
if u.Precision() > greatestPrecision {
|
||||
greatestPrecision = u.Precision()
|
||||
}
|
||||
}
|
||||
return greatestPrecision
|
||||
}
|
||||
|
||||
func (f *Fs) action(ctx context.Context, path string) ([]*upstream.Fs, error) {
|
||||
return f.actionPolicy.Action(ctx, f.upstreams, path)
|
||||
}
|
||||
|
||||
func (f *Fs) actionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
return f.actionPolicy.ActionEntries(entries...)
|
||||
}
|
||||
|
||||
func (f *Fs) create(ctx context.Context, path string) ([]*upstream.Fs, error) {
|
||||
return f.createPolicy.Create(ctx, f.upstreams, path)
|
||||
}
|
||||
|
||||
func (f *Fs) createEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
|
||||
return f.createPolicy.CreateEntries(entries...)
|
||||
}
|
||||
|
||||
func (f *Fs) search(ctx context.Context, path string) (*upstream.Fs, error) {
|
||||
return f.searchPolicy.Search(ctx, f.upstreams, path)
|
||||
}
|
||||
|
||||
func (f *Fs) searchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
|
||||
return f.searchPolicy.SearchEntries(entries...)
|
||||
}
|
||||
|
||||
func (f *Fs) mergeDirEntries(entriess [][]upstream.Entry) (fs.DirEntries, error) {
|
||||
entryMap := make(map[string]([]upstream.Entry))
|
||||
for _, en := range entriess {
|
||||
if en == nil {
|
||||
continue
|
||||
}
|
||||
for _, entry := range en {
|
||||
remote := entry.Remote()
|
||||
if f.Features().CaseInsensitive {
|
||||
remote = strings.ToLower(remote)
|
||||
}
|
||||
entryMap[remote] = append(entryMap[remote], entry)
|
||||
}
|
||||
}
|
||||
var entries fs.DirEntries
|
||||
for path := range entryMap {
|
||||
e, err := f.wrapEntries(entryMap[path]...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entries = append(entries, e)
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// NewFs constructs an Fs from the path.
|
||||
//
|
||||
// The returned Fs is the actual Fs, referenced by remote in the config
|
||||
@ -320,51 +660,64 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(opt.Remotes) == 0 {
|
||||
return nil, errors.New("union can't point to an empty remote - check the value of the remotes setting")
|
||||
// Backward compatible to old config
|
||||
if len(opt.Upstreams) == 0 && len(opt.Remotes) > 0 {
|
||||
for i := 0; i < len(opt.Remotes)-1; i++ {
|
||||
opt.Remotes[i] = opt.Remotes[i] + ":ro"
|
||||
}
|
||||
opt.Upstreams = opt.Remotes
|
||||
}
|
||||
if len(opt.Remotes) == 1 {
|
||||
return nil, errors.New("union can't point to a single remote - check the value of the remotes setting")
|
||||
if len(opt.Upstreams) == 0 {
|
||||
return nil, errors.New("union can't point to an empty upstream - check the value of the upstreams setting")
|
||||
}
|
||||
for _, remote := range opt.Remotes {
|
||||
if strings.HasPrefix(remote, name+":") {
|
||||
return nil, errors.New("can't point union remote at itself - check the value of the remote setting")
|
||||
if len(opt.Upstreams) == 1 {
|
||||
return nil, errors.New("union can't point to a single upstream - check the value of the upstreams setting")
|
||||
}
|
||||
for _, u := range opt.Upstreams {
|
||||
if strings.HasPrefix(u, name+":") {
|
||||
return nil, errors.New("can't point union remote at itself - check the value of the upstreams setting")
|
||||
}
|
||||
}
|
||||
|
||||
var remotes []fs.Fs
|
||||
for i := range opt.Remotes {
|
||||
// Last remote first so we return the correct (last) matching fs in case of fs.ErrorIsFile
|
||||
var remote = opt.Remotes[len(opt.Remotes)-i-1]
|
||||
_, configName, fsPath, err := fs.ParseRemote(remote)
|
||||
if err != nil {
|
||||
upstreams := make([]*upstream.Fs, len(opt.Upstreams))
|
||||
errs := Errors(make([]error, len(opt.Upstreams)))
|
||||
multithread(len(opt.Upstreams), func(i int) {
|
||||
u := opt.Upstreams[i]
|
||||
upstreams[i], errs[i] = upstream.New(u, root, time.Duration(opt.CacheTime)*time.Second)
|
||||
})
|
||||
var usedUpstreams []*upstream.Fs
|
||||
var fserr error
|
||||
for i, err := range errs {
|
||||
if err != nil && err != fs.ErrorIsFile {
|
||||
return nil, err
|
||||
}
|
||||
var rootString = path.Join(fsPath, filepath.ToSlash(root))
|
||||
if configName != "local" {
|
||||
rootString = configName + ":" + rootString
|
||||
// Only the upstreams returns ErrorIsFile would be used if any
|
||||
if err == fs.ErrorIsFile {
|
||||
usedUpstreams = append(usedUpstreams, upstreams[i])
|
||||
fserr = fs.ErrorIsFile
|
||||
}
|
||||
myFs, err := cache.Get(rootString)
|
||||
if err != nil {
|
||||
if err == fs.ErrorIsFile {
|
||||
return myFs, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
remotes = append(remotes, myFs)
|
||||
}
|
||||
|
||||
// Reverse the remotes again so they are in the order as before
|
||||
for i, j := 0, len(remotes)-1; i < j; i, j = i+1, j-1 {
|
||||
remotes[i], remotes[j] = remotes[j], remotes[i]
|
||||
if fserr == nil {
|
||||
usedUpstreams = upstreams
|
||||
}
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
root: root,
|
||||
opt: *opt,
|
||||
remotes: remotes,
|
||||
wr: remotes[len(remotes)-1],
|
||||
name: name,
|
||||
root: root,
|
||||
opt: *opt,
|
||||
upstreams: usedUpstreams,
|
||||
}
|
||||
f.actionPolicy, err = policy.Get(opt.ActionPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.createPolicy, err = policy.Get(opt.CreatePolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.searchPolicy, err = policy.Get(opt.SearchPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var features = (&fs.Features{
|
||||
CaseInsensitive: true,
|
||||
@ -376,9 +729,14 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||
SetTier: true,
|
||||
GetTier: true,
|
||||
}).Fill(f)
|
||||
features = features.Mask(f.wr) // mask the features just on the writable fs
|
||||
for _, f := range upstreams {
|
||||
if !f.IsWritable() {
|
||||
continue
|
||||
}
|
||||
features = features.Mask(f) // Mask all writable upstream fs
|
||||
}
|
||||
|
||||
// Really need the union of all remotes for these, so
|
||||
// Really need the union of all upstreams for these, so
|
||||
// re-instate and calculate separately.
|
||||
features.ChangeNotify = f.ChangeNotify
|
||||
features.DirCacheFlush = f.DirCacheFlush
|
||||
@ -388,12 +746,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||
// Clear ChangeNotify and DirCacheFlush if all are nil
|
||||
clearChangeNotify := true
|
||||
clearDirCacheFlush := true
|
||||
for _, remote := range f.remotes {
|
||||
remoteFeatures := remote.Features()
|
||||
if remoteFeatures.ChangeNotify != nil {
|
||||
for _, u := range f.upstreams {
|
||||
uFeatures := u.Features()
|
||||
if uFeatures.ChangeNotify != nil {
|
||||
clearChangeNotify = false
|
||||
}
|
||||
if remoteFeatures.DirCacheFlush != nil {
|
||||
if uFeatures.DirCacheFlush != nil {
|
||||
clearDirCacheFlush = false
|
||||
}
|
||||
}
|
||||
@ -407,13 +765,34 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||
f.features = features
|
||||
|
||||
// Get common intersection of hashes
|
||||
hashSet := f.remotes[0].Hashes()
|
||||
for _, remote := range f.remotes[1:] {
|
||||
hashSet = hashSet.Overlap(remote.Hashes())
|
||||
hashSet := f.upstreams[0].Hashes()
|
||||
for _, u := range f.upstreams[1:] {
|
||||
hashSet = hashSet.Overlap(u.Hashes())
|
||||
}
|
||||
f.hashSet = hashSet
|
||||
|
||||
return f, nil
|
||||
return f, fserr
|
||||
}
|
||||
|
||||
func parentDir(absPath string) string {
|
||||
parent := path.Dir(strings.TrimRight(filepath.ToSlash(absPath), "/"))
|
||||
if parent == "." {
|
||||
parent = ""
|
||||
}
|
||||
return parent
|
||||
}
|
||||
|
||||
func multithread(num int, fn func(int)) {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < num; i++ {
|
||||
wg.Add(1)
|
||||
i := i
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
fn(i)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
|
@ -2,17 +2,154 @@
|
||||
package union_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
_ "github.com/rclone/rclone/backend/local"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestIntegration runs integration tests against the remote
|
||||
func TestIntegration(t *testing.T) {
|
||||
if *fstest.RemoteName == "" {
|
||||
t.Skip("Skipping as -remote not set")
|
||||
}
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestUnion:",
|
||||
NilObject: nil,
|
||||
SkipFsMatch: true,
|
||||
RemoteName: *fstest.RemoteName,
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
||||
func TestStandard(t *testing.T) {
|
||||
if *fstest.RemoteName != "" {
|
||||
t.Skip("Skipping as -remote set")
|
||||
}
|
||||
tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-standard1")
|
||||
tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-standard2")
|
||||
tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-standard3")
|
||||
require.NoError(t, os.MkdirAll(tempdir1, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir2, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir3, 0744))
|
||||
upstreams := tempdir1 + " " + tempdir2 + " " + tempdir3
|
||||
name := "TestUnion"
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: name + ":",
|
||||
ExtraConfig: []fstests.ExtraConfigItem{
|
||||
{Name: name, Key: "type", Value: "union"},
|
||||
{Name: name, Key: "upstreams", Value: upstreams},
|
||||
{Name: name, Key: "action_policy", Value: "epall"},
|
||||
{Name: name, Key: "create_policy", Value: "epmfs"},
|
||||
{Name: name, Key: "search_policy", Value: "ff"},
|
||||
},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
||||
func TestRO(t *testing.T) {
|
||||
if *fstest.RemoteName != "" {
|
||||
t.Skip("Skipping as -remote set")
|
||||
}
|
||||
tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-ro1")
|
||||
tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-ro2")
|
||||
tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-ro3")
|
||||
require.NoError(t, os.MkdirAll(tempdir1, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir2, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir3, 0744))
|
||||
upstreams := tempdir1 + " " + tempdir2 + ":ro " + tempdir3 + ":ro"
|
||||
name := "TestUnionRO"
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: name + ":",
|
||||
ExtraConfig: []fstests.ExtraConfigItem{
|
||||
{Name: name, Key: "type", Value: "union"},
|
||||
{Name: name, Key: "upstreams", Value: upstreams},
|
||||
{Name: name, Key: "action_policy", Value: "epall"},
|
||||
{Name: name, Key: "create_policy", Value: "epmfs"},
|
||||
{Name: name, Key: "search_policy", Value: "ff"},
|
||||
},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
||||
func TestNC(t *testing.T) {
|
||||
if *fstest.RemoteName != "" {
|
||||
t.Skip("Skipping as -remote set")
|
||||
}
|
||||
tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-nc1")
|
||||
tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-nc2")
|
||||
tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-nc3")
|
||||
require.NoError(t, os.MkdirAll(tempdir1, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir2, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir3, 0744))
|
||||
upstreams := tempdir1 + " " + tempdir2 + ":nc " + tempdir3 + ":nc"
|
||||
name := "TestUnionNC"
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: name + ":",
|
||||
ExtraConfig: []fstests.ExtraConfigItem{
|
||||
{Name: name, Key: "type", Value: "union"},
|
||||
{Name: name, Key: "upstreams", Value: upstreams},
|
||||
{Name: name, Key: "action_policy", Value: "epall"},
|
||||
{Name: name, Key: "create_policy", Value: "epmfs"},
|
||||
{Name: name, Key: "search_policy", Value: "ff"},
|
||||
},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
||||
func TestPolicy1(t *testing.T) {
|
||||
if *fstest.RemoteName != "" {
|
||||
t.Skip("Skipping as -remote set")
|
||||
}
|
||||
tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-policy11")
|
||||
tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-policy12")
|
||||
tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-policy13")
|
||||
require.NoError(t, os.MkdirAll(tempdir1, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir2, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir3, 0744))
|
||||
upstreams := tempdir1 + " " + tempdir2 + " " + tempdir3
|
||||
name := "TestUnionPolicy1"
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: name + ":",
|
||||
ExtraConfig: []fstests.ExtraConfigItem{
|
||||
{Name: name, Key: "type", Value: "union"},
|
||||
{Name: name, Key: "upstreams", Value: upstreams},
|
||||
{Name: name, Key: "action_policy", Value: "all"},
|
||||
{Name: name, Key: "create_policy", Value: "lus"},
|
||||
{Name: name, Key: "search_policy", Value: "all"},
|
||||
},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
||||
func TestPolicy2(t *testing.T) {
|
||||
if *fstest.RemoteName != "" {
|
||||
t.Skip("Skipping as -remote set")
|
||||
}
|
||||
tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-policy21")
|
||||
tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-policy22")
|
||||
tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-policy23")
|
||||
require.NoError(t, os.MkdirAll(tempdir1, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir2, 0744))
|
||||
require.NoError(t, os.MkdirAll(tempdir3, 0744))
|
||||
upstreams := tempdir1 + " " + tempdir2 + " " + tempdir3
|
||||
name := "TestUnionPolicy2"
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: name + ":",
|
||||
ExtraConfig: []fstests.ExtraConfigItem{
|
||||
{Name: name, Key: "type", Value: "union"},
|
||||
{Name: name, Key: "upstreams", Value: upstreams},
|
||||
{Name: name, Key: "action_policy", Value: "all"},
|
||||
{Name: name, Key: "create_policy", Value: "rand"},
|
||||
{Name: name, Key: "search_policy", Value: "ff"},
|
||||
},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
348
backend/union/upstream/upstream.go
Normal file
348
backend/union/upstream/upstream.go
Normal file
@ -0,0 +1,348 @@
|
||||
package upstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"math"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrUsageFieldNotSupported stats the usage field is not supported by the backend
|
||||
ErrUsageFieldNotSupported = errors.New("this usage field is not supported")
|
||||
)
|
||||
|
||||
// Fs is a wrap of any fs and its configs
|
||||
type Fs struct {
|
||||
fs.Fs
|
||||
RootFs fs.Fs
|
||||
RootPath string
|
||||
writable bool
|
||||
creatable bool
|
||||
usage *fs.Usage // Cache the usage
|
||||
cacheTime time.Duration // cache duration
|
||||
cacheExpiry int64 // usage cache expiry time
|
||||
cacheMutex sync.RWMutex
|
||||
cacheOnce sync.Once
|
||||
cacheUpdate bool // if the cache is updating
|
||||
}
|
||||
|
||||
// Directory describes a wrapped Directory
|
||||
//
|
||||
// This is a wrapped Directory which contains the upstream Fs
|
||||
type Directory struct {
|
||||
fs.Directory
|
||||
f *Fs
|
||||
}
|
||||
|
||||
// Object describes a wrapped Object
|
||||
//
|
||||
// This is a wrapped Object which contains the upstream Fs
|
||||
type Object struct {
|
||||
fs.Object
|
||||
f *Fs
|
||||
}
|
||||
|
||||
// Entry describe a warpped fs.DirEntry interface with the
|
||||
// information of upstream Fs
|
||||
type Entry interface {
|
||||
fs.DirEntry
|
||||
UpstreamFs() *Fs
|
||||
}
|
||||
|
||||
// New creates a new Fs based on the
|
||||
// string formatted `type:root_path(:ro/:nc)`
|
||||
func New(remote, root string, cacheTime time.Duration) (*Fs, error) {
|
||||
_, configName, fsPath, err := fs.ParseRemote(remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f := &Fs{
|
||||
RootPath: root,
|
||||
writable: true,
|
||||
creatable: true,
|
||||
cacheExpiry: time.Now().Unix(),
|
||||
cacheTime: cacheTime,
|
||||
usage: &fs.Usage{},
|
||||
}
|
||||
if strings.HasSuffix(fsPath, ":ro") {
|
||||
f.writable = false
|
||||
f.creatable = false
|
||||
fsPath = fsPath[0 : len(fsPath)-3]
|
||||
} else if strings.HasSuffix(fsPath, ":nc") {
|
||||
f.writable = true
|
||||
f.creatable = false
|
||||
fsPath = fsPath[0 : len(fsPath)-3]
|
||||
}
|
||||
if configName != "local" {
|
||||
fsPath = configName + ":" + fsPath
|
||||
}
|
||||
rFs, err := cache.Get(fsPath)
|
||||
if err != nil && err != fs.ErrorIsFile {
|
||||
return nil, err
|
||||
}
|
||||
f.RootFs = rFs
|
||||
rootString := path.Join(fsPath, filepath.ToSlash(root))
|
||||
myFs, err := cache.Get(rootString)
|
||||
if err != nil && err != fs.ErrorIsFile {
|
||||
return nil, err
|
||||
}
|
||||
f.Fs = myFs
|
||||
return f, err
|
||||
}
|
||||
|
||||
// WrapDirectory wraps a fs.Directory to include the info
|
||||
// of the upstream Fs
|
||||
func (f *Fs) WrapDirectory(e fs.Directory) *Directory {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
return &Directory{
|
||||
Directory: e,
|
||||
f: f,
|
||||
}
|
||||
}
|
||||
|
||||
// WrapObject wraps a fs.Object to include the info
|
||||
// of the upstream Fs
|
||||
func (f *Fs) WrapObject(o fs.Object) *Object {
|
||||
if o == nil {
|
||||
return nil
|
||||
}
|
||||
return &Object{
|
||||
Object: o,
|
||||
f: f,
|
||||
}
|
||||
}
|
||||
|
||||
// WrapEntry wraps a fs.DirEntry to include the info
|
||||
// of the upstream Fs
|
||||
func (f *Fs) WrapEntry(e fs.DirEntry) (Entry, error) {
|
||||
switch e.(type) {
|
||||
case fs.Object:
|
||||
return f.WrapObject(e.(fs.Object)), nil
|
||||
case fs.Directory:
|
||||
return f.WrapDirectory(e.(fs.Directory)), nil
|
||||
default:
|
||||
return nil, errors.Errorf("unknown object type %T", e)
|
||||
}
|
||||
}
|
||||
|
||||
// UpstreamFs get the upstream Fs the entry is stored in
|
||||
func (e *Directory) UpstreamFs() *Fs {
|
||||
return e.f
|
||||
}
|
||||
|
||||
// UpstreamFs get the upstream Fs the entry is stored in
|
||||
func (o *Object) UpstreamFs() *Fs {
|
||||
return o.f
|
||||
}
|
||||
|
||||
// UnWrap returns the Object that this Object is wrapping or
|
||||
// nil if it isn't wrapping anything
|
||||
func (o *Object) UnWrap() fs.Object {
|
||||
return o.Object
|
||||
}
|
||||
|
||||
// IsCreatable return if the fs is allowed to create new objects
|
||||
func (f *Fs) IsCreatable() bool {
|
||||
return f.creatable
|
||||
}
|
||||
|
||||
// IsWritable return if the fs is allowed to write
|
||||
func (f *Fs) IsWritable() bool {
|
||||
return f.writable
|
||||
}
|
||||
|
||||
// Put in to the remote path with the modTime given of the given size
|
||||
//
|
||||
// May create the object even if it returns an error - if so
|
||||
// will return the object and the error, otherwise will return
|
||||
// nil and the error
|
||||
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
o, err := f.Fs.Put(ctx, in, src, options...)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
f.cacheMutex.Lock()
|
||||
defer f.cacheMutex.Unlock()
|
||||
size := src.Size()
|
||||
if f.usage.Used != nil {
|
||||
*f.usage.Used += size
|
||||
}
|
||||
if f.usage.Free != nil {
|
||||
*f.usage.Free -= size
|
||||
}
|
||||
if f.usage.Objects != nil {
|
||||
*f.usage.Objects++
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
||||
//
|
||||
// May create the object even if it returns an error - if so
|
||||
// will return the object and the error, otherwise will return
|
||||
// nil and the error
|
||||
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
do := f.Features().PutStream
|
||||
if do == nil {
|
||||
return nil, fs.ErrorNotImplemented
|
||||
}
|
||||
o, err := do(ctx, in, src, options...)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
f.cacheMutex.Lock()
|
||||
defer f.cacheMutex.Unlock()
|
||||
size := o.Size()
|
||||
if f.usage.Used != nil {
|
||||
*f.usage.Used += size
|
||||
}
|
||||
if f.usage.Free != nil {
|
||||
*f.usage.Free -= size
|
||||
}
|
||||
if f.usage.Objects != nil {
|
||||
*f.usage.Objects++
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// Update in to the object with the modTime given of the given size
|
||||
//
|
||||
// When called from outside a Fs by rclone, src.Size() will always be >= 0.
|
||||
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
|
||||
// return an error or update the object properly (rather than e.g. calling panic).
|
||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
size := o.Size()
|
||||
err := o.Object.Update(ctx, in, src, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.f.cacheMutex.Lock()
|
||||
defer o.f.cacheMutex.Unlock()
|
||||
delta := o.Size() - size
|
||||
if delta <= 0 {
|
||||
return nil
|
||||
}
|
||||
if o.f.usage.Used != nil {
|
||||
*o.f.usage.Used += size
|
||||
}
|
||||
if o.f.usage.Free != nil {
|
||||
*o.f.usage.Free -= size
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// About gets quota information from the Fs
|
||||
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
||||
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
||||
err := f.updateUsage()
|
||||
if err != nil {
|
||||
return nil, ErrUsageFieldNotSupported
|
||||
}
|
||||
}
|
||||
f.cacheMutex.RLock()
|
||||
defer f.cacheMutex.RUnlock()
|
||||
return f.usage, nil
|
||||
}
|
||||
|
||||
// GetFreeSpace get the free space of the fs
|
||||
func (f *Fs) GetFreeSpace() (int64, error) {
|
||||
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
||||
err := f.updateUsage()
|
||||
if err != nil {
|
||||
return math.MaxInt64, ErrUsageFieldNotSupported
|
||||
}
|
||||
}
|
||||
f.cacheMutex.RLock()
|
||||
defer f.cacheMutex.RUnlock()
|
||||
if f.usage.Free == nil {
|
||||
return math.MaxInt64, ErrUsageFieldNotSupported
|
||||
}
|
||||
return *f.usage.Free, nil
|
||||
}
|
||||
|
||||
// GetUsedSpace get the used space of the fs
|
||||
func (f *Fs) GetUsedSpace() (int64, error) {
|
||||
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
||||
err := f.updateUsage()
|
||||
if err != nil {
|
||||
return 0, ErrUsageFieldNotSupported
|
||||
}
|
||||
}
|
||||
f.cacheMutex.RLock()
|
||||
defer f.cacheMutex.RUnlock()
|
||||
if f.usage.Used == nil {
|
||||
return 0, ErrUsageFieldNotSupported
|
||||
}
|
||||
return *f.usage.Used, nil
|
||||
}
|
||||
|
||||
// GetNumObjects get the number of objects of the fs
|
||||
func (f *Fs) GetNumObjects() (int64, error) {
|
||||
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
||||
err := f.updateUsage()
|
||||
if err != nil {
|
||||
return 0, ErrUsageFieldNotSupported
|
||||
}
|
||||
}
|
||||
f.cacheMutex.RLock()
|
||||
defer f.cacheMutex.RUnlock()
|
||||
if f.usage.Objects == nil {
|
||||
return 0, ErrUsageFieldNotSupported
|
||||
}
|
||||
return *f.usage.Objects, nil
|
||||
}
|
||||
|
||||
func (f *Fs) updateUsage() (err error) {
|
||||
if do := f.RootFs.Features().About; do == nil {
|
||||
return ErrUsageFieldNotSupported
|
||||
}
|
||||
done := false
|
||||
f.cacheOnce.Do(func() {
|
||||
f.cacheMutex.Lock()
|
||||
err = f.updateUsageCore(false)
|
||||
f.cacheMutex.Unlock()
|
||||
done = true
|
||||
})
|
||||
if done {
|
||||
return err
|
||||
}
|
||||
if !f.cacheUpdate {
|
||||
f.cacheUpdate = true
|
||||
go func() {
|
||||
_ = f.updateUsageCore(true)
|
||||
f.cacheUpdate = false
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fs) updateUsageCore(lock bool) error {
|
||||
// Run in background, should not be cancelled by user
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
usage, err := f.RootFs.Features().About(ctx)
|
||||
if err != nil {
|
||||
f.cacheUpdate = false
|
||||
return err
|
||||
}
|
||||
if lock {
|
||||
f.cacheMutex.Lock()
|
||||
defer f.cacheMutex.Unlock()
|
||||
}
|
||||
// Store usage
|
||||
atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix())
|
||||
f.usage = usage
|
||||
return nil
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
---
|
||||
title: "Union"
|
||||
description: "Remote Unification"
|
||||
date: "2018-08-29"
|
||||
date: "2020-01-25"
|
||||
---
|
||||
|
||||
<i class="fa fa-link"></i> Union
|
||||
@ -12,22 +12,90 @@ The `union` remote provides a unification similar to UnionFS using other remotes
|
||||
Paths may be as deep as required or a local path,
|
||||
eg `remote:directory/subdirectory` or `/directory/subdirectory`.
|
||||
|
||||
During the initial setup with `rclone config` you will specify the target
|
||||
remotes as a space separated list. The target remotes can either be a local paths or other remotes.
|
||||
During the initial setup with `rclone config` you will specify the upstream
|
||||
remotes as a space separated list. The upstream remotes can either be a local paths or other remotes.
|
||||
|
||||
The order of the remotes is important as it defines which remotes take precedence over others if there are files with the same name in the same logical path.
|
||||
The last remote is the topmost remote and replaces files with the same name from previous remotes.
|
||||
Attribute `:ro` and `:nc` can be attach to the end of path to tag the remote as **read only** or **no create**,
|
||||
eg `remote:directory/subdirectory:ro` or `remote:directory/subdirectory:nc`.
|
||||
|
||||
Only the last remote is used to write to and delete from, all other remotes are read-only.
|
||||
|
||||
Subfolders can be used in target remote. Assume a union remote named `backup`
|
||||
with the remotes `mydrive:private/backup mydrive2:/backup`. Invoking `rclone mkdir backup:desktop`
|
||||
Subfolders can be used in upstream remotes. Assume a union remote named `backup`
|
||||
with the remotes `mydrive:private/backup`. Invoking `rclone mkdir backup:desktop`
|
||||
is exactly the same as invoking `rclone mkdir mydrive2:/backup/desktop`.
|
||||
|
||||
There will be no special handling of paths containing `..` segments.
|
||||
Invoking `rclone mkdir backup:../desktop` is exactly the same as invoking
|
||||
`rclone mkdir mydrive2:/backup/../desktop`.
|
||||
|
||||
### Behavior / Policies
|
||||
|
||||
The behavior of union backend is inspired by [trapexit/mergerfs](https://github.com/trapexit/mergerfs). All functions are grouped into 3 categories: **action**, **create** and **search**. These functions and categories can be assigned a policy which dictates what file or directory is chosen when performing that behavior. Any policy can be assigned to a function or category though some may not be very useful in practice. For instance: **rand** (random) may be useful for file creation (create) but could lead to very odd behavior if used for `delete` if there were more than one copy of the file.
|
||||
|
||||
#### Function / Category classifications
|
||||
|
||||
| Category | Description | Functions |
|
||||
|----------|--------------------------|-------------------------------------------------------------------------------------|
|
||||
| action | Writing Existing file | move, rmdir, rmdirs, delete, purge and copy, sync (as destination when file exist) |
|
||||
| create | Create non-existing file | copy, sync (as destination when file not exist) |
|
||||
| search | Reading and listing file | ls, lsd, lsl, cat, md5sum, sha1sum and copy, sync (as source) |
|
||||
| N/A | | size, about |
|
||||
|
||||
#### Path Preservation
|
||||
|
||||
Policies, as described below, are of two basic types. `path preserving` and `non-path preserving`.
|
||||
|
||||
All policies which start with `ep` (**epff**, **eplfs**, **eplus**, **epmfs**, **eprand**) are `path preserving`. `ep` stands for `existing path`.
|
||||
|
||||
A path preserving policy will only consider upstreams where the relative path being accessed already exists.
|
||||
|
||||
When using non-path preserving policies paths will be created in target upstreams as necessary.
|
||||
|
||||
#### Quota Relevant Policies
|
||||
|
||||
Some policies rely on quota information. These policies should be used only if your upstreams support the respective quota fields.
|
||||
|
||||
| Policy | Required Field |
|
||||
|------------|----------------|
|
||||
| lfs, eplfs | Free |
|
||||
| mfs, epmfs | Free |
|
||||
| lus, eplus | Used |
|
||||
| lno, eplno | Objects |
|
||||
|
||||
To check if your upstream support the field, run `rclone about remote: [flags]` and see if the reuqired field exists.
|
||||
|
||||
#### Filters
|
||||
|
||||
Policies basically search upstream remotes and create a list of files / paths for functions to work on. The policy is responsible for filtering and sorting. The policy type defines the sorting but filtering is mostly uniform as described below.
|
||||
|
||||
* No **search** policies filter.
|
||||
* All **action** policies will filter out remotes which are tagged as **read-only**.
|
||||
* All **create** policies will filter out remotes which are tagged **read-only** or **no-create**.
|
||||
|
||||
If all remotes are filtered an error will be returned.
|
||||
|
||||
#### Policy descriptions
|
||||
|
||||
THe policies definition are inspired by [trapexit/mergerfs](https://github.com/trapexit/mergerfs) but not exactly the same. Some policy definition could be different due to the much larger latency of remote file systems.
|
||||
|
||||
| Policy | Description |
|
||||
|------------------|------------------------------------------------------------|
|
||||
| all | Search category: same as **epall**. Action category: same as **epall**. Create category: act on all upstreams. |
|
||||
| epall (existing path, all) | Search category: Given this order configured, act on the first one found where the relative path exists. Action category: apply to all found. Create category: act on all upstreams where the relative path exists. |
|
||||
| epff (existing path, first found) | Act on the first one found, by the time upstreams reply, where the relative path exists. |
|
||||
| eplfs (existing path, least free space) | Of all the upstreams on which the relative path exists choose the one with the least free space. |
|
||||
| eplus (existing path, least used space) | Of all the upstreams on which the relative path exists choose the one with the least used space. |
|
||||
| eplno (existing path, least number of objects) | Of all the upstreams on which the relative path exists choose the one with the least number of objects. |
|
||||
| epmfs (existing path, most free space) | Of all the upstreams on which the relative path exists choose the one with the most free space. |
|
||||
| eprand (existing path, random) | Calls **epall** and then randomizes. Returns only one upstream. |
|
||||
| ff (first found) | Search category: same as **epff**. Action category: same as **epff**. Create category: Act on the first one found by the time upstreams reply. |
|
||||
| lfs (least free space) | Search category: same as **eplfs**. Action category: same as **eplfs**. Create category: Pick the upstream with the least available free space. |
|
||||
| lus (least used space) | Search category: same as **eplus**. Action category: same as **eplus**. Create category: Pick the upstream with the least used space. |
|
||||
| lno (least number of objects) | Search category: same as **eplno**. Action category: same as **eplno**. Create category: Pick the upstream with the least number of objects. |
|
||||
| mfs (most free space) | Search category: same as **epmfs**. Action category: same as **epmfs**. Create category: Pick the upstream with the most available free space. |
|
||||
| newest | Pick the file / directory with the largest mtime. |
|
||||
| rand (random) | Calls **all** and then randomizes. Returns only one upstream. |
|
||||
|
||||
### Setup
|
||||
|
||||
Here is an example of how to make a union called `remote` for local folders.
|
||||
First run:
|
||||
|
||||
@ -49,16 +117,27 @@ XX / Union merges the contents of several remotes
|
||||
\ "union"
|
||||
[snip]
|
||||
Storage> union
|
||||
List of space separated remotes.
|
||||
Can be 'remotea:test/dir remoteb:', '"remotea:test/space dir" remoteb:', etc.
|
||||
The last remote is used to write to.
|
||||
List of space separated upstreams.
|
||||
Can be 'upstreama:test/dir upstreamb:', '\"upstreama:test/space:ro dir\" upstreamb:', etc.
|
||||
Enter a string value. Press Enter for the default ("").
|
||||
remotes>
|
||||
upstreams>
|
||||
Policy to choose upstream on ACTION class.
|
||||
Enter a string value. Press Enter for the default ("epall").
|
||||
action_policy>
|
||||
Policy to choose upstream on CREATE class.
|
||||
Enter a string value. Press Enter for the default ("epmfs").
|
||||
create_policy>
|
||||
Policy to choose upstream on SEARCH class.
|
||||
Enter a string value. Press Enter for the default ("ff").
|
||||
search_policy>
|
||||
Cache time of usage and free space (in seconds). This option is only useful when a path preserving policy is used.
|
||||
Enter a signed integer. Press Enter for the default ("120").
|
||||
cache_time>
|
||||
Remote config
|
||||
--------------------
|
||||
[remote]
|
||||
type = union
|
||||
remotes = C:\dir1 C:\dir2 C:\dir3
|
||||
upstreams = C:\dir1 C:\dir2 C:\dir3
|
||||
--------------------
|
||||
y) Yes this is OK
|
||||
e) Edit this remote
|
||||
@ -97,17 +176,53 @@ Copy another local directory to the union directory called source, which will be
|
||||
<!--- autogenerated options start - DO NOT EDIT, instead edit fs.RegInfo in backend/union/union.go then run make backenddocs -->
|
||||
### Standard Options
|
||||
|
||||
Here are the standard options specific to union (Union merges the contents of several remotes).
|
||||
Here are the standard options specific to union (Union merges the contents of several upstream fs).
|
||||
|
||||
#### --union-remotes
|
||||
#### --union-upstreams
|
||||
|
||||
List of space separated remotes.
|
||||
Can be 'remotea:test/dir remoteb:', '"remotea:test/space dir" remoteb:', etc.
|
||||
The last remote is used to write to.
|
||||
List of space separated upstreams.
|
||||
Can be 'upstreama:test/dir upstreamb:', '"upstreama:test/space:ro dir" upstreamb:', etc.
|
||||
|
||||
- Config: remotes
|
||||
- Env Var: RCLONE_UNION_REMOTES
|
||||
|
||||
- Config: upstreams
|
||||
- Env Var: RCLONE_UNION_UPSTREAMS
|
||||
- Type: string
|
||||
- Default: ""
|
||||
|
||||
#### --union-action-policy
|
||||
|
||||
Policy to choose upstream on ACTION class.
|
||||
|
||||
- Config: action_policy
|
||||
- Env Var: RCLONE_UNION_ACTION_POLICY
|
||||
- Type: string
|
||||
- Default: "epall"
|
||||
|
||||
#### --union-create-policy
|
||||
|
||||
Policy to choose upstream on CREATE class.
|
||||
|
||||
- Config: create_policy
|
||||
- Env Var: RCLONE_UNION_CREATE_POLICY
|
||||
- Type: string
|
||||
- Default: "epmfs"
|
||||
|
||||
#### --union-search-policy
|
||||
|
||||
Policy to choose upstream on SEARCH class.
|
||||
|
||||
- Config: search_policy
|
||||
- Env Var: RCLONE_UNION_SEARCH_POLICY
|
||||
- Type: string
|
||||
- Default: "ff"
|
||||
|
||||
#### --union-cache-time
|
||||
|
||||
Cache time of usage and free space (in seconds). This option is only useful when a path preserving policy is used.
|
||||
|
||||
- Config: cache_time
|
||||
- Env Var: RCLONE_UNION_CACHE_TIME
|
||||
- Type: int
|
||||
- Default: 120
|
||||
|
||||
<!--- autogenerated options stop -->
|
||||
|
Loading…
Reference in New Issue
Block a user