1
0
mirror of https://github.com/rclone/rclone.git synced 2025-11-23 21:44:49 +02:00
Files
rclone/fs/list/sorter.go
2025-08-13 17:54:45 +01:00

343 lines
9.4 KiB
Go

package list
import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"
"github.com/lanrat/extsort"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/errcount"
"golang.org/x/sync/errgroup"
)
// NewObjecter is the minimum facilities we need from the fs.Fs passed into NewSorter.
type NewObjecter interface {
// NewObject finds the Object at remote. If it can't be found
// it returns the error ErrorObjectNotFound.
NewObject(ctx context.Context, remote string) (fs.Object, error)
}
// Sorter implements an efficient mechanism for sorting list entries.
//
// If there are a large number of entries (above `--list-cutoff`),
// this may be done on disk instead of in memory.
//
// Supply entries with the Add method, call Send at the end to deliver
// the sorted entries and finalise with CleanUp regardless of whether
// you called Add or Send.
//
// Sorted entries are delivered to the callback supplied to NewSorter
// when the Send method is called.
type Sorter struct {
ctx context.Context // context for everything
ci *fs.ConfigInfo // config we are using
cancel func() // cancel all background operations
mu sync.Mutex // protect the below
f NewObjecter // fs that we are listing
callback fs.ListRCallback // where to send the sorted entries to
entries fs.DirEntries // accumulated entries
keyFn KeyFn // transform an entry into a sort key
cutoff int // number of entries above which we start extsort
extSort bool // true if we are ext sorting
inputChan chan string // for sending data to the ext sort
outputChan <-chan string // for receiving data from the ext sort
errChan <-chan error // for getting errors from the ext sort
sorter *extsort.StringSorter // external string sort
errs *errcount.ErrCount // accumulate errors
}
// KeyFn turns an entry into a sort key
type KeyFn func(entry fs.DirEntry) string
// identityKeyFn maps an entry to its Remote
func identityKeyFn(entry fs.DirEntry) string {
return entry.Remote()
}
// NewSorter creates a new Sorter with callback for sorted entries to
// be delivered to. keyFn is used to process each entry to get a key
// function, if nil then it will just use entry.Remote()
func NewSorter(ctx context.Context, f NewObjecter, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) {
ci := fs.GetConfig(ctx)
ctx, cancel := context.WithCancel(ctx)
if keyFn == nil {
keyFn = identityKeyFn
}
return &Sorter{
ctx: ctx,
ci: ci,
cancel: cancel,
f: f,
callback: callback,
keyFn: keyFn,
cutoff: ci.ListCutoff,
errs: errcount.New(),
}, nil
}
// Turn a directory entry into a combined key and data for extsort
func (ls *Sorter) entryToKey(entry fs.DirEntry) string {
// To start with we just use the Remote to recover the object
// To make more efficient we would serialize the object here
remote := entry.Remote()
remote = strings.TrimRight(remote, "/")
if _, isDir := entry.(fs.Directory); isDir {
remote += "/"
}
key := ls.keyFn(entry) + "\x00" + remote
return key
}
// Turn an exsort key back into a directory entry
func (ls *Sorter) keyToEntry(ctx context.Context, key string) (entry fs.DirEntry, err error) {
null := strings.IndexRune(key, '\x00')
if null < 0 {
return nil, errors.New("sorter: failed to deserialize: missing null")
}
remote := key[null+1:]
if remote, isDir := strings.CutSuffix(remote, "/"); isDir {
// Is a directory
//
// Note this creates a very minimal directory entry which should be fine for the
// bucket based remotes this code will be run on.
entry = fs.NewDir(remote, time.Time{})
} else {
obj, err := ls.f.NewObject(ctx, remote)
if err != nil {
fs.Errorf(ls.f, "sorter: failed to re-create object %q: %v", remote, err)
return nil, fmt.Errorf("sorter: failed to re-create object: %w", err)
}
entry = obj
}
return entry, nil
}
func (ls *Sorter) sendEntriesToExtSort(entries fs.DirEntries) (err error) {
for _, entry := range entries {
select {
case ls.inputChan <- ls.entryToKey(entry):
case err = <-ls.errChan:
if err != nil {
return err
}
}
}
select {
case err = <-ls.errChan:
default:
}
return err
}
func (ls *Sorter) startExtSort() (err error) {
fs.Logf(ls.f, "Switching to on disk sorting as more than %d entries in one directory detected", ls.cutoff)
ls.inputChan = make(chan string, 100)
// Options to control the extsort
opt := extsort.Config{
NumWorkers: 8, // small effect
ChanBuffSize: 1024, // small effect
SortedChanBuffSize: 1024, // makes a lot of difference
ChunkSize: 32 * 1024, // tuned for 50 char records (UUID sized)
// Defaults
// ChunkSize: int(1e6), // amount of records to store in each chunk which will be written to disk
// NumWorkers: 2, // maximum number of workers to use for parallel sorting
// ChanBuffSize: 1, // buffer size for merging chunks
// SortedChanBuffSize: 10, // buffer size for passing records to output
// TempFilesDir: "", // empty for use OS default ex: /tmp
}
ls.sorter, ls.outputChan, ls.errChan = extsort.Strings(ls.inputChan, &opt)
go ls.sorter.Sort(ls.ctx)
// Show we are extsorting now
ls.extSort = true
// Send the accumulated entries to the sorter
fs.Debugf(ls.f, "Sending accumulated directory entries to disk")
err = ls.sendEntriesToExtSort(ls.entries)
fs.Debugf(ls.f, "Done sending accumulated directory entries to disk")
clear(ls.entries)
ls.entries = nil
return err
}
// Add entries to the list sorter.
//
// Does not call the callback.
//
// Safe to call from concurrent go routines
func (ls *Sorter) Add(entries fs.DirEntries) error {
ls.mu.Lock()
defer ls.mu.Unlock()
if ls.extSort {
err := ls.sendEntriesToExtSort(entries)
if err != nil {
return err
}
} else {
ls.entries = append(ls.entries, entries...)
if len(ls.entries) >= ls.cutoff {
err := ls.startExtSort()
if err != nil {
return err
}
}
}
return nil
}
// Number of entries to batch in list helper
const listHelperBatchSize = 100
// listHelper is used to turn keys into entries concurrently
type listHelper struct {
ls *Sorter // parent
keys []string // keys being built up
entries fs.DirEntries // entries processed concurrently as a batch
errs []error // errors processed concurrently
}
// NewlistHelper should be with the callback passed in
func (ls *Sorter) newListHelper() *listHelper {
return &listHelper{
ls: ls,
entries: make(fs.DirEntries, listHelperBatchSize),
errs: make([]error, listHelperBatchSize),
}
}
// send sends the stored entries to the callback if there are >= max
// entries.
func (lh *listHelper) send(max int) (err error) {
if len(lh.keys) < max {
return nil
}
// Turn this batch into objects in parallel
g, gCtx := errgroup.WithContext(lh.ls.ctx)
g.SetLimit(lh.ls.ci.Checkers)
for i, key := range lh.keys {
i, key := i, key // can remove when go1.22 is minimum version
g.Go(func() error {
lh.entries[i], lh.errs[i] = lh.ls.keyToEntry(gCtx, key)
return nil
})
}
err = g.Wait()
if err != nil {
return err
}
// Account errors and collect OK entries
toSend := lh.entries[:0]
for i := range lh.keys {
entry, err := lh.entries[i], lh.errs[i]
if err != nil {
lh.ls.errs.Add(err)
} else if entry != nil {
toSend = append(toSend, entry)
}
}
// fmt.Println(lh.keys)
// fmt.Println(toSend)
err = lh.ls.callback(toSend)
clear(lh.entries)
clear(lh.errs)
lh.keys = lh.keys[:0]
return err
}
// Add an entry to the stored entries and send them if there are more
// than a certain amount
func (lh *listHelper) Add(key string) error {
lh.keys = append(lh.keys, key)
return lh.send(100)
}
// Flush the stored entries (if any) sending them to the callback
func (lh *listHelper) Flush() error {
return lh.send(1)
}
// Send the sorted entries to the callback.
func (ls *Sorter) Send() (err error) {
ls.mu.Lock()
defer ls.mu.Unlock()
if ls.extSort {
close(ls.inputChan)
list := ls.newListHelper()
outer:
for {
select {
case key, ok := <-ls.outputChan:
if !ok {
break outer
}
err := list.Add(key)
if err != nil {
return err
}
case err := <-ls.errChan:
if err != nil {
return err
}
}
}
err = list.Flush()
if err != nil {
return err
}
return ls.errs.Err("sorter")
}
// Sort the directory entries by Remote
//
// We use a stable sort here just in case there are
// duplicates. Assuming the remote delivers the entries in a
// consistent order, this will give the best user experience
// in syncing as it will use the first entry for the sync
// comparison.
slices.SortStableFunc(ls.entries, func(a, b fs.DirEntry) int {
return cmp.Compare(ls.keyFn(a), ls.keyFn(b))
})
return ls.callback(ls.entries)
}
// CleanUp the Sorter, cleaning up any memory / files.
//
// It is safe and encouraged to call this regardless of whether you
// called Send or not.
//
// This does not call the callback
func (ls *Sorter) CleanUp() {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.cancel()
clear(ls.entries)
ls.entries = nil
ls.extSort = false
}
// SortToChan makes a callback for the Sorter which sends the output
// to the channel provided.
func SortToChan(out chan<- fs.DirEntry) fs.ListRCallback {
return func(entries fs.DirEntries) error {
for _, entry := range entries {
out <- entry
}
return nil
}
}