1
0
mirror of https://github.com/rclone/rclone.git synced 2025-09-16 08:36:38 +02:00

cluster: implement --cluster and related flags FIXME WIP

Needs
- tests
This commit is contained in:
Nick Craig-Wood
2025-07-25 15:40:02 +01:00
parent e54a4f8557
commit 53dab15c0b
5 changed files with 973 additions and 0 deletions

View File

@@ -23,6 +23,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/cluster"
"github.com/rclone/rclone/fs/config/configfile"
"github.com/rclone/rclone/fs/config/configflags"
"github.com/rclone/rclone/fs/config/flags"
@@ -481,6 +482,22 @@ func initConfig() {
}
})
}
// Run as a cluster worker if configured, otherwise ignoring
// the command given on the command line
if ci.Cluster != "" {
if ci.ClusterID == "" || ci.ClusterID == "0" {
fs.Infof(nil, "Running in cluster mode %q as controller", ci.ClusterID)
} else {
fs.Infof(nil, "Running in cluster mode %q as worker with id %q", ci.ClusterID, ci.ClusterID)
worker, err := cluster.NewWorker(ctx)
if err != nil || worker == nil {
fs.Fatalf(nil, "Failed to start cluster worker: %v", err)
}
// Do not continue with the main thread
select {}
}
}
}
func resolveExitCode(err error) {

498
fs/cluster/cluster.go Normal file
View File

@@ -0,0 +1,498 @@
// Package cluster implements a machanism to distribute work over a
// cluster of rclone instances.
package cluster
import (
"context"
"errors"
"fmt"
"path"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/lib/atexit"
)
// ErrClusterNotConfigured is returned from creation functions.
var ErrClusterNotConfigured = errors.New("cluster is not configured")
// Cluster describes the workings of the current cluster.
type Cluster struct {
jobs *Jobs
id string
batchFiles int
batchSize fs.SizeSuffix
cleanup fs.ClusterCleanup // how we cleanup cluster files
_config rc.Params // for rc
_filter rc.Params // for rc
cancel func() // stop bg job
wg sync.WaitGroup // bg job finished
quit chan struct{} // signal graceful stop
sync chan chan<- struct{} // sync the current jobs
quitWorkers bool // if set, send workers a stop signal on Shutdown
mu sync.Mutex
currentBatch Batch
inflight map[string]Batch
shutdown bool
}
// Batch is a collection of rc tasks to do
type Batch struct {
size int64 // size in batch
Path string `json:"_path"`
Inputs []rc.Params `json:"inputs"`
Config rc.Params `json:"_config,omitempty"`
Filter rc.Params `json:"_filter,omitempty"`
trs []*accounting.Transfer // transfer for each Input
sizes []int64 // sizes for each Input
}
// BatchResult has the results of the batch as received.
type BatchResult struct {
Results []rc.Params `json:"results"`
// Error returns
Error string `json:"error"`
Status int `json:"status"`
Input string `json:"input"`
Path string `json:"path"`
}
// NewCluster creates a new cluster from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewCluster(ctx context.Context) (*Cluster, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
jobs, err := NewJobs(ctx)
if err != nil {
return nil, err
}
c := &Cluster{
jobs: jobs,
id: ci.ClusterID,
quitWorkers: ci.ClusterQuitWorkers,
batchFiles: ci.ClusterBatchFiles,
batchSize: ci.ClusterBatchSize,
cleanup: ci.ClusterCleanup,
quit: make(chan struct{}),
sync: make(chan chan<- struct{}),
inflight: make(map[string]Batch),
}
// Configure _config
configParams, err := fs.ConfigOptionsInfo.NonDefaultRC(ci)
if err != nil {
return nil, fmt.Errorf("failed to read global config: %w", err)
}
// Remove any global cluster config
for k := range configParams {
if strings.HasPrefix(k, "Cluster") {
delete(configParams, k)
}
}
if len(configParams) != 0 {
fs.Debugf(nil, "Overridden global config: %#v", configParams)
}
c._config = rc.Params(configParams)
// Configure _filter
fi := filter.GetConfig(ctx)
if !fi.InActive() {
filterParams, err := filter.OptionsInfo.NonDefaultRC(fi)
if err != nil {
return nil, fmt.Errorf("failed to read filter config: %w", err)
}
fs.Debugf(nil, "Overridden filter config: %#v", filterParams)
c._filter = rc.Params(filterParams)
}
err = c.jobs.createDirectoryStructure(ctx)
if err != nil {
return nil, err
}
// Start the background worker
bgCtx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.wg.Add(1)
go c.run(bgCtx)
fs.Logf(c.jobs.f, "Started cluster master")
return c, nil
}
var (
globalClusterMu sync.Mutex
globalCluster *Cluster
)
// GetCluster starts or gets a cluster.
//
// If no cluster is configured or the cluster can't be started then it
// returns nil.
func GetCluster(ctx context.Context) *Cluster {
globalClusterMu.Lock()
defer globalClusterMu.Unlock()
if globalCluster != nil {
return globalCluster
}
cluster, err := NewCluster(ctx)
if err != nil {
fs.Errorf(nil, "Failed to start cluster: %v", err)
return nil
}
if cluster != nil {
atexit.Register(func() {
err := cluster.Shutdown(context.Background())
if err != nil {
fs.Errorf(nil, "Failed to shutdown cluster: %v", err)
}
})
}
globalCluster = cluster
return globalCluster
}
// Send the current batch for processing
//
// call with c.mu held
func (c *Cluster) sendBatch(ctx context.Context) (err error) {
// Do nothing if the batch is empty
if len(c.currentBatch.Inputs) == 0 {
return nil
}
// Get and reset current batch
b := c.currentBatch
c.currentBatch = Batch{}
b.Path = "job/batch"
b.Config = c._config
b.Filter = c._filter
// write the pending job
name, err := c.jobs.writeJob(ctx, clusterPending, &b)
if err != nil {
return err
}
fs.Infof(name, "written cluster batch file")
c.inflight[name] = b
return nil
}
// Add the command to the current batch
func (c *Cluster) addToBatch(ctx context.Context, obj fs.Object, in rc.Params, size int64, tr *accounting.Transfer) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.shutdown {
return errors.New("internal error: can't add file to Shutdown cluster")
}
c.currentBatch.Inputs = append(c.currentBatch.Inputs, in)
c.currentBatch.size += size
c.currentBatch.trs = append(c.currentBatch.trs, tr)
c.currentBatch.sizes = append(c.currentBatch.sizes, size)
if c.currentBatch.size >= int64(c.batchSize) || len(c.currentBatch.Inputs) >= c.batchFiles {
err = c.sendBatch(ctx)
if err != nil {
return err
}
}
return nil
}
// Move does operations.Move via the cluster.
//
// Move src object to dst or fdst if nil. If dst is nil then it uses
// remote as the name of the new object.
func (c *Cluster) Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
if operations.SkipDestructive(ctx, src, "cluster move") {
in := tr.Account(ctx, nil)
in.DryRun(src.Size())
tr.Done(ctx, nil)
return nil
}
fsrc, ok := src.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster move: can't cast src.Fs() to fs.Fs")
tr.Done(ctx, err)
return err
}
in := rc.Params{
"_path": "operations/movefile",
"dstFs": fs.ConfigStringFull(fdst),
"dstRemote": remote,
"srcFs": fs.ConfigStringFull(fsrc),
"srcRemote": src.Remote(),
}
if dst != nil {
in["dstRemote"] = dst.Remote()
}
return c.addToBatch(ctx, src, in, src.Size(), tr)
}
// Copy does operations.Copy via the cluster.
//
// Copy src object to dst or fdst if nil. If dst is nil then it uses
// remote as the name of the new object.
func (c *Cluster) Copy(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (err error) {
tr := accounting.Stats(ctx).NewTransfer(src, fdst)
if operations.SkipDestructive(ctx, src, "cluster copy") {
in := tr.Account(ctx, nil)
in.DryRun(src.Size())
tr.Done(ctx, nil)
return nil
}
fsrc, ok := src.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster copy: can't cast src.Fs() to fs.Fs")
tr.Done(ctx, err)
return err
}
in := rc.Params{
"_path": "operations/copyfile",
"dstFs": fs.ConfigStringFull(fdst),
"dstRemote": remote,
"srcFs": fs.ConfigStringFull(fsrc),
"srcRemote": src.Remote(),
}
if dst != nil {
in["dstRemote"] = dst.Remote()
}
return c.addToBatch(ctx, src, in, src.Size(), tr)
}
// DeleteFile does operations.DeleteFile via the cluster
//
// If useBackupDir is set and --backup-dir is in effect then it moves
// the file to there instead of deleting
func (c *Cluster) DeleteFile(ctx context.Context, dst fs.Object) (err error) {
tr := accounting.Stats(ctx).NewCheckingTransfer(dst, "deleting")
err = accounting.Stats(ctx).DeleteFile(ctx, dst.Size())
if err != nil {
tr.Done(ctx, err)
return err
}
if operations.SkipDestructive(ctx, dst, "cluster delete") {
tr.Done(ctx, nil)
return
}
fdst, ok := dst.Fs().(fs.Fs)
if !ok {
err = errors.New("internal error: cluster delete: can't cast dst.Fs() to fs.Fs")
tr.Done(ctx, nil)
return err
}
in := rc.Params{
"_path": "operations/deletefile",
"fs": fs.ConfigStringFull(fdst),
"remote": dst.Remote(),
}
return c.addToBatch(ctx, dst, in, 0, tr)
}
// processCompletedJob loads the job and checks it off
func (c *Cluster) processCompletedJob(ctx context.Context, obj fs.Object) error {
name := path.Base(obj.Remote())
name, _ = strings.CutSuffix(name, ".json")
fs.Debugf(nil, "cluster: processing completed job %q", name)
var output BatchResult
err := c.jobs.readJob(ctx, obj, &output)
if err != nil {
return fmt.Errorf("check jobs read: %w", err)
}
c.mu.Lock()
input, ok := c.inflight[name]
// FIXME delete or save job
if !ok {
for k := range c.inflight {
fs.Debugf(nil, "key %q", k)
}
c.mu.Unlock()
return fmt.Errorf("check jobs: job %q not found", name)
}
c.mu.Unlock()
// Delete the inflight entry when batch is processed
defer func() {
c.mu.Lock()
delete(c.inflight, name)
c.mu.Unlock()
}()
// Check job
if output.Error != "" {
return fmt.Errorf("cluster: failed to run batch job: %s (%d)", output.Error, output.Status)
}
if len(input.Inputs) != len(output.Results) {
return fmt.Errorf("cluster: input had %d jobs but output had %d", len(input.Inputs), len(output.Results))
}
// Run through the batch and mark operations as successful or not
for i := range input.Inputs {
in := input.Inputs[i]
tr := input.trs[i]
size := input.sizes[i]
out := output.Results[i]
errorString, hasError := out["error"]
var err error
if hasError && errorString != "" {
err = fmt.Errorf("cluster: worker error: %s (%v)", errorString, out["status"])
}
if err == nil && in["_path"] == "operations/movefile" {
accounting.Stats(ctx).Renames(1)
}
acc := tr.Account(ctx, nil)
acc.AccountReadN(size)
tr.Done(ctx, err)
remote, ok := in["dstRemote"]
if !ok {
remote = in["remote"]
}
if err == nil {
fs.Infof(remote, "cluster %s successful", in["_path"])
} else {
fs.Errorf(remote, "cluster %s failed: %v", in["_path"], err)
}
}
return nil
}
// checkJobs sees if there are any completed jobs
func (c *Cluster) checkJobs(ctx context.Context) {
objs, err := c.jobs.listDir(ctx, clusterDone)
if err != nil {
fs.Errorf(nil, "cluster: get completed job list failed: %v", err)
return
}
for _, obj := range objs {
err := c.processCompletedJob(ctx, obj)
status := "output-ok"
ok := true
if err != nil {
status = "output-failed"
ok = false
fs.Errorf(nil, "cluster: process completed job failed: %v", err)
}
c.jobs.finish(ctx, obj, status, ok)
}
}
// Run the background process
func (c *Cluster) run(ctx context.Context) {
defer c.wg.Done()
checkJobs := time.NewTicker(clusterCheckJobsInterval)
defer checkJobs.Stop()
var syncedChans []chan<- struct{}
for {
select {
case <-ctx.Done():
return
case <-c.quit:
fs.Debugf(nil, "cluster: quit request received")
return
case synced := <-c.sync:
syncedChans = append(syncedChans, synced)
fs.Debugf(nil, "cluster: sync request received")
case <-checkJobs.C:
}
c.checkJobs(ctx)
if len(syncedChans) > 0 {
c.mu.Lock()
n := len(c.inflight)
c.mu.Unlock()
if n == 0 {
fs.Debugf(nil, "cluster: synced")
for _, synced := range syncedChans {
synced <- struct{}{}
}
syncedChans = nil
}
}
}
}
// Sync the cluster.
//
// Call this when all job items have been added to the cluster.
//
// This will wait for any outstanding jobs to finish regardless of who
// put them in
func (c *Cluster) Sync(ctx context.Context) error {
// Flush any outstanding
c.mu.Lock()
err := c.sendBatch(ctx)
c.mu.Unlock()
// Wait for the cluster to be empty
done := make(chan struct{})
c.sync <- done
<-done
return err
}
// Shutdown the cluster.
//
// Call this when all job items have been added to the cluster.
//
// This will wait for any outstanding jobs to finish.
func (c *Cluster) Shutdown(ctx context.Context) (err error) {
c.mu.Lock()
inBatch := len(c.currentBatch.Inputs)
inFlight := len(c.inflight)
shutdown := c.shutdown
c.shutdown = true
c.mu.Unlock()
if inBatch > 0 {
err = errors.Join(nil, fmt.Errorf("%d items batched on cluster shutdown", inBatch))
}
if inFlight > 0 {
err = errors.Join(nil, fmt.Errorf("%d items in flight on cluster shutdown", inFlight))
}
if shutdown {
fs.Debugf(nil, "cluster: already shutdown")
return nil
}
c.quit <- struct{}{}
fs.Debugf(nil, "Waiting for cluster to finish")
c.wg.Wait()
// Send a quit job
if c.quitWorkers {
fs.Logf(nil, "Sending quit to workers")
quitErr := c.jobs.writeQuitJob(ctx, clusterPending)
if quitErr != nil {
err = errors.Join(err, fmt.Errorf("shutdown quit: %w", quitErr))
}
}
return err
}
// Abort the cluster and any outstanding jobs.
func (c *Cluster) Abort() {
c.cancel()
c.wg.Wait()
}

293
fs/cluster/jobs.go Normal file
View File

@@ -0,0 +1,293 @@
package cluster
import (
"bytes"
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"slices"
"strings"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
)
// Batches flow from queue/pending to queue/processing/
const (
clusterQueue = "queue"
clusterPending = clusterQueue + "/pending"
clusterProcessing = clusterQueue + "/processing"
clusterDone = clusterQueue + "/done"
clusterFinished = clusterQueue + "/finished"
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
// Read the queue this often
clusterCheckJobsInterval = time.Second
// Name of job which signals to the workers to quit
quitJob = "QUIT"
)
// Jobs is a container for sending and receiving jobs to the cluster.
type Jobs struct {
remote string // remote for job storage
f fs.Fs // cluster remote storage
partial bool // do we need to write and rename
hasMove bool // set if f has server side move otherwise has server side copy
cleanup fs.ClusterCleanup // how we cleanup the cluster files
pacer *fs.Pacer // To pace the API calls
}
// NewJobs creates a Jobs source from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewJobs(ctx context.Context) (*Jobs, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
f, err := cache.Get(ctx, ci.Cluster)
if err != nil {
return nil, fmt.Errorf("cluster remote creation: %w", err)
}
features := f.Features()
if features.Move == nil && features.Copy == nil {
return nil, fmt.Errorf("cluster remote must have server side move and %q doesn't", ci.Cluster)
}
jobs := &Jobs{
remote: ci.Cluster,
f: f,
partial: features.PartialUploads,
hasMove: features.Move != nil,
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
cleanup: ci.ClusterCleanup,
}
return jobs, nil
}
// Create the cluster directory structure
func (jobs *Jobs) createDirectoryStructure(ctx context.Context) (err error) {
for _, dir := range []string{clusterPending, clusterProcessing, clusterDone, clusterFinished} {
err = jobs.f.Mkdir(ctx, dir)
if err != nil {
return fmt.Errorf("cluster mkdir %q: %w", dir, err)
}
}
return nil
}
// rename a file
//
// if this returns fs.ErrorObjectNotFound then the file has already been renamed.
func (jobs *Jobs) rename(ctx context.Context, src fs.Object, dstRemote string) (dst fs.Object, err error) {
features := jobs.f.Features()
if jobs.hasMove {
dst, err = features.Move(ctx, src, dstRemote)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename job file: %w", err)
}
} else {
dst, err = features.Copy(ctx, src, dstRemote)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename (copy phase) job file: %w", err)
}
err = src.Remove(ctx)
if err != nil {
return nil, fmt.Errorf("cluster: failed to rename (delete phase) job file: %w", err)
}
}
return dst, nil
}
// Finish with a jobs file
func (jobs *Jobs) finish(ctx context.Context, obj fs.Object, status string, ok bool) {
var err error
if (ok && jobs.cleanup == fs.ClusterCleanupCompleted) || jobs.cleanup == fs.ClusterCleanupFull {
err = obj.Remove(ctx)
} else {
name := path.Join(clusterFinished, status, path.Base(obj.Remote()))
_, err = jobs.rename(ctx, obj, name)
}
if err != nil {
fs.Errorf(nil, "cluster: removing completed job failed: %v", err)
}
}
// write buf into remote
func (jobs *Jobs) writeFile(ctx context.Context, remote string, modTime time.Time, buf []byte) error {
partialRemote := remote
if jobs.partial {
partialRemote = remote + ".partial"
}
// Calculate hashes
w, err := hash.NewMultiHasherTypes(jobs.f.Hashes())
if err != nil {
return err
}
_, err = w.Write(buf)
if err != nil {
return err
}
obji := object.NewStaticObjectInfo(partialRemote, modTime, int64(len(buf)), true, w.Sums(), jobs.f)
var obj fs.Object
err = jobs.pacer.Call(func() (bool, error) {
in := bytes.NewBuffer(buf)
obj, err = jobs.f.Put(ctx, in, obji)
if err != nil {
return true, fmt.Errorf("cluster: failed to write %q: %q", remote, err)
}
return false, nil
})
if err != nil {
return err
}
if jobs.partial {
obj, err = jobs.rename(ctx, obj, remote)
if err != nil {
return err
}
}
return nil
}
// write a job to a file returning the name
func (jobs *Jobs) writeJob(ctx context.Context, where string, job any) (name string, err error) {
now := time.Now().UTC()
name = fmt.Sprintf("%s-%s", now.Format(time.RFC3339Nano), random.String(20))
remote := path.Join(where, name+".json")
buf, err := json.MarshalIndent(job, "", "\t")
if err != nil {
return "", fmt.Errorf("cluster: job json: %w", err)
}
err = jobs.writeFile(ctx, remote, now, buf)
if err != nil {
return "", fmt.Errorf("cluster: job write: %w", err)
}
return name, nil
}
// write a quit job to a file
func (jobs *Jobs) writeQuitJob(ctx context.Context, where string) (err error) {
now := time.Now().UTC()
remote := path.Join(where, quitJob+".json")
err = jobs.writeFile(ctx, remote, now, []byte("{}"))
if err != nil {
return fmt.Errorf("cluster: quit job write: %w", err)
}
return nil
}
// read buf from object
func (jobs *Jobs) readFile(ctx context.Context, o fs.Object) (buf []byte, err error) {
err = jobs.pacer.Call(func() (bool, error) {
in, err := operations.Open(ctx, o)
if err != nil {
return true, fmt.Errorf("cluster: failed to open %q: %w", o, err)
}
buf, err = io.ReadAll(in)
if err != nil {
return true, fmt.Errorf("cluster: failed to read %q: %w", o, err)
}
err = in.Close()
if err != nil {
return true, fmt.Errorf("cluster: failed to close %q: %w", o, err)
}
return false, nil
})
if err != nil {
return nil, err
}
return buf, nil
}
// read a job from a file
//
// job should be a pointer to something to be unmarshalled
func (jobs *Jobs) readJob(ctx context.Context, obj fs.Object, job any) error {
buf, err := jobs.readFile(ctx, obj)
if err != nil {
return fmt.Errorf("cluster: job read: %w", err)
}
err = json.Unmarshal(buf, job)
if err != nil {
return fmt.Errorf("cluster: job read json: %w", err)
}
return nil
}
// lists the json files in a cluster directory
func (jobs *Jobs) listDir(ctx context.Context, dir string) (objects []fs.Object, err error) {
entries, err := jobs.f.List(ctx, dir)
if err != nil {
return nil, fmt.Errorf("cluster: failed to list %q: %w", dir, err)
}
entries.ForObject(func(o fs.Object) {
if strings.HasSuffix(o.Remote(), ".json") {
objects = append(objects, o)
}
})
slices.SortStableFunc(objects, func(a, b fs.Object) int {
return cmp.Compare(a.Remote(), b.Remote())
})
return objects, nil
}
// get a job from pending if there is one available.
//
// Returns a nil object if no jobs are available.
//
// FIXME should mark jobs as error jobs in here if they can't be read properly?
func (jobs *Jobs) getJob(ctx context.Context, id string) (name string, obj fs.Object, err error) {
objs, err := jobs.listDir(ctx, clusterPending)
if err != nil {
return "", nil, fmt.Errorf("get job list: %w", err)
}
quit := false
for len(objs) > 0 {
obj = objs[0]
objs = objs[1:]
name = path.Base(obj.Remote())
name, _ = strings.CutSuffix(name, ".json")
// See if we have been asked to quit
if name == quitJob {
quit = true
continue
}
// claim the job
newName := fmt.Sprintf("%s-%s.json", name, id)
newRemote := path.Join(clusterProcessing, newName)
obj, err = jobs.rename(ctx, obj, newRemote)
if errors.Is(err, fs.ErrorObjectNotFound) {
// claim failed - try again
continue
}
if err != nil {
return "", nil, fmt.Errorf("get job claim: %w", err)
}
return name, obj, nil
}
// No jobs found
if quit {
fs.Logf(nil, "Exiting cluster worker on command")
atexit.Run()
os.Exit(0)
}
return "", nil, nil
}

101
fs/cluster/worker.go Normal file
View File

@@ -0,0 +1,101 @@
package cluster
import (
"context"
"path"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc/jobs"
"github.com/rclone/rclone/lib/random"
)
// Worker describes a single instance of a cluster worker.
type Worker struct {
jobs *Jobs
cancel func() // stop bg job
wg sync.WaitGroup // bg job finished
id string // id of this worker
}
// NewWorker creates a new cluster from the config in ctx.
//
// It may return nil for no cluster is configured.
func NewWorker(ctx context.Context) (*Worker, error) {
ci := fs.GetConfig(ctx)
if ci.Cluster == "" {
return nil, nil
}
jobs, err := NewJobs(ctx)
if err != nil {
return nil, err
}
w := &Worker{
jobs: jobs,
id: ci.ClusterID,
}
if w.id == "" {
w.id = random.String(10)
}
// Start the background worker
bgCtx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
w.wg.Add(1)
go w.run(bgCtx)
fs.Logf(w.jobs.f, "Started cluster worker")
return w, nil
}
// Check to see if a job exists and run it if available
func (w *Worker) checkJobs(ctx context.Context) {
name, obj, err := w.jobs.getJob(ctx, w.id)
if err != nil {
fs.Errorf(nil, "check jobs get: %v", err)
return
}
if obj == nil {
return // no jobs available
}
fs.Debugf(nil, "cluster: processing pending job %q", name)
inBuf, err := w.jobs.readFile(ctx, obj)
if err != nil {
fs.Errorf(nil, "check jobs read: %v", err)
w.jobs.finish(ctx, obj, "input-error", false)
return
}
w.jobs.finish(ctx, obj, "input-ok", true)
outBuf := jobs.NewJobFromBytes(ctx, inBuf)
remote := path.Join(clusterDone, name+".json")
err = w.jobs.writeFile(ctx, remote, time.Now(), outBuf)
if err != nil {
fs.Errorf(nil, "check jobs failed to write output: %v", err)
return
}
fs.Debugf(nil, "cluster: processed pending job %q", name)
}
// Run the background process
func (w *Worker) run(ctx context.Context) {
defer w.wg.Done()
checkJobs := time.NewTicker(clusterCheckJobsInterval)
defer checkJobs.Stop()
for {
select {
case <-ctx.Done():
return
case <-checkJobs.C:
w.checkJobs(ctx)
}
}
}
// Shutdown the worker regardless of whether it has work to process or not.
func (w *Worker) Shutdown(ctx context.Context) error {
w.cancel()
w.wg.Wait()
return nil
}

View File

@@ -50,6 +50,34 @@ var (
ConfigEdit = "config_fs_edit"
)
// ClusterCleanup describes the cluster cleanup choices.
type ClusterCleanup = Enum[clusterCleanupChoices]
// Cluster cleanup choices.
//
// ClusterCleanupNone don't remove any cluster files
// ClusterCleanupCompleted remove successfully completed jobs
// ClusterCleanupFull remove everything on exit
const (
ClusterCleanupNone ClusterCleanup = iota
ClusterCleanupCompleted
ClusterCleanupFull
)
type clusterCleanupChoices struct{}
func (clusterCleanupChoices) Choices() []string {
return []string{
ClusterCleanupNone: "none",
ClusterCleanupCompleted: "completed",
ClusterCleanupFull: "full",
}
}
func (clusterCleanupChoices) Type() string {
return "ClusterCleanup"
}
// ConfigOptionsInfo describes the Options in use
var ConfigOptionsInfo = Options{{
Name: "modify_window",
@@ -566,6 +594,36 @@ var ConfigOptionsInfo = Options{{
Default: "",
Help: "HTTP proxy URL.",
Groups: "Networking",
}, {
Name: "cluster",
Default: "",
Help: "Enable cluster mode with remote to use as shared storage.",
Groups: "Networking",
}, {
Name: "cluster_id",
Default: "",
Help: "Set to an ID for the cluster. An ID of 0 or empty becomes the controller.",
Groups: "Networking",
}, {
Name: "cluster_quit_workers",
Default: false,
Help: "Set to cause the controller to quit the workers when it finished.",
Groups: "Networking",
}, {
Name: "cluster_batch_files",
Default: 1000,
Help: "Max number of files for a cluster batch.",
Groups: "Networking",
}, {
Name: "cluster_batch_size",
Default: Tebi,
Help: "Max size of files for a cluster batch.",
Groups: "Networking",
}, {
Name: "cluster_cleanup",
Default: ClusterCleanupFull,
Help: "Control which cluster files get cleaned up.",
Groups: "Networking",
}}
// ConfigInfo is filesystem config options
@@ -680,6 +738,12 @@ type ConfigInfo struct {
MaxConnections int `config:"max_connections"`
NameTransform []string `config:"name_transform"`
HTTPProxy string `config:"http_proxy"`
Cluster string `config:"cluster"`
ClusterID string `config:"cluster_id"`
ClusterQuitWorkers bool `config:"cluster_quit_workers"`
ClusterBatchFiles int `config:"cluster_batch_files"`
ClusterBatchSize SizeSuffix `config:"cluster_batch_size"`
ClusterCleanup ClusterCleanup `config:"cluster_cleanup"`
}
func init() {