1
0
mirror of https://github.com/rclone/rclone.git synced 2025-11-23 21:44:49 +02:00

rc: add job/batch for sending batches of rc commands to run concurrently

This commit is contained in:
Nick Craig-Wood
2025-07-22 16:10:51 +01:00
parent 31adc7d89f
commit a522c056fe
2 changed files with 463 additions and 0 deletions

View File

@@ -5,8 +5,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net/http"
"runtime/debug" "runtime/debug"
"slices" "slices"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -17,6 +19,7 @@ import (
"github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/filter" "github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/rc" "github.com/rclone/rclone/fs/rc"
"golang.org/x/sync/errgroup"
) )
// Fill in these to avoid circular dependencies // Fill in these to avoid circular dependencies
@@ -475,3 +478,223 @@ func rcGroupStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params) out = make(rc.Params)
return out, nil return out, nil
} }
// NewJobFromParams creates an rc job rc.Params.
//
// The JSON blob should contain a _path entry.
//
// It returns a rc.Params as output which may be an error.
func NewJobFromParams(ctx context.Context, in rc.Params) (out rc.Params) {
path := "unknown"
// Return an rc error blob
rcError := func(err error, status int) rc.Params {
fs.Errorf(nil, "rc: %q: error: %v", path, err)
out, _ = rc.Error(path, in, err, status)
return out
}
// Find the call
path, err := in.GetString("_path")
if err != nil {
return rcError(err, http.StatusNotFound)
}
delete(in, "_path")
call := rc.Calls.Get(path)
if call == nil {
return rcError(fmt.Errorf("couldn't find path %q", path), http.StatusNotFound)
}
if call.NeedsRequest {
return rcError(fmt.Errorf("can't run path %q as it needs the request", path), http.StatusBadRequest)
}
if call.NeedsResponse {
return rcError(fmt.Errorf("can't run path %q as it needs the response", path), http.StatusBadRequest)
}
// Pass on the group if one is set in the context and it isn't set in the input.
if _, found := in["_group"]; !found {
group, ok := accounting.StatsGroupFromContext(ctx)
if ok {
in["_group"] = group
}
}
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
_, out, err = NewJob(ctx, call.Fn, in)
if err != nil {
return rcError(err, http.StatusInternalServerError)
}
if out == nil {
out = make(rc.Params)
}
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
return out
}
func init() {
rc.Add(rc.Call{
Path: "job/batch",
AuthRequired: true, // require auth always since sub commands may require it
Fn: rcBatch,
Title: "Run a batch of rclone rc commands concurrently.",
Help: strings.ReplaceAll(`
This takes the following parameters:
- concurrency - int - do this many commands concurrently. Defaults to |--transfers| if not set.
- inputs - an list of inputs to the commands with an extra |_path| parameter
|||json
{
"_path": "rc/path",
"param1": "parameter for the path as documented",
"param2": "parameter for the path as documented, etc",
}
|||json
The inputs may use |_async|, |_group|, |_config| and |_filter| as normal when using the rc.
Returns:
- results - a list of results from the commands with one entry for each in inputs.
For example:
|||sh
rclone rc job/batch --json '{
"inputs": [
{
"_path": "rc/noop",
"parameter": "OK"
},
{
"_path": "rc/error",
"parameter": "BAD"
}
]
}
'
|||
Gives the result:
|||json
{
"results": [
{
"parameter": "OK"
},
{
"error": "arbitrary error on input map[parameter:BAD]",
"input": {
"parameter": "BAD"
},
"path": "rc/error",
"status": 500
}
]
}
|||
`, "|", "`"),
})
}
/*
// Run a single batch job
func runBatchJob(ctx context.Context, inputAny any) (out rc.Params, err error) {
var in rc.Params
path := "unknown"
defer func() {
if err != nil {
out, _ = rc.Error(path, in, err, http.StatusInternalServerError)
}
}()
// get the inputs to the job
input, ok := inputAny.(map[string]any)
if !ok {
return nil, rc.NewErrParamInvalid(fmt.Errorf("\"inputs\" items must be objects not %T", inputAny))
}
in = rc.Params(input)
path, err = in.GetString("_path")
if err != nil {
return nil, err
}
delete(in, "_path")
call := rc.Calls.Get(path)
// Check call
if call == nil {
return nil, rc.NewErrParamInvalid(fmt.Errorf("path %q does not exist", path))
}
path = call.Path
if call.NeedsRequest {
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the request", path))
}
if call.NeedsResponse {
return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the response", path))
}
// Run the job
_, out, err = NewJob(ctx, call.Fn, in)
if err != nil {
return nil, err
}
// Reshape (serialize then deserialize) the data so it is in the form expected
err = rc.Reshape(&out, out)
if err != nil {
return nil, err
}
return out, nil
}
*/
// Batch the registered commands
func rcBatch(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
// Read inputs
inputsAny, err := in.Get("inputs")
if err != nil {
return nil, err
}
inputs, ok := inputsAny.([]any)
if !ok {
return nil, rc.NewErrParamInvalid(fmt.Errorf("expecting list key %q (was %T)", "inputs", inputsAny))
}
// Read concurrency
concurrency, err := in.GetInt64("concurrency")
if rc.IsErrParamNotFound(err) {
ci := fs.GetConfig(ctx)
concurrency = int64(ci.Transfers)
} else if err != nil {
return nil, err
}
// Prepare outputs
results := make([]rc.Params, len(inputs))
out["results"] = results
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(concurrency))
for i, inputAny := range inputs {
input, ok := inputAny.(map[string]any)
if !ok {
results[i], _ = rc.Error("unknown", nil, fmt.Errorf("\"inputs\" items must be objects not %T", inputAny), http.StatusBadRequest)
continue
}
in := rc.Params(input)
if concurrency <= 1 {
results[i] = NewJobFromParams(ctx, in)
} else {
g.Go(func() error {
results[i] = NewJobFromParams(gCtx, in)
return nil
})
}
}
_ = g.Wait()
return out, nil
}

View File

@@ -2,6 +2,7 @@ package jobs
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"runtime" "runtime"
"testing" "testing"
@@ -602,3 +603,242 @@ func TestOnFinishDataRace(t *testing.T) {
} }
} }
} }
// Register some test rc calls
func init() {
rc.Add(rc.Call{
Path: "test/needs_request",
NeedsRequest: true,
})
rc.Add(rc.Call{
Path: "test/needs_response",
NeedsResponse: true,
})
}
func TestNewJobFromParams(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
in rc.Params
want rc.Params
}{{
in: rc.Params{
"_path": "rc/noop",
"a": "potato",
},
want: rc.Params{
"a": "potato",
},
}, {
in: rc.Params{
"_path": "rc/noop",
"b": "sausage",
},
want: rc.Params{
"b": "sausage",
},
}, {
in: rc.Params{
"_path": "rc/error",
"e": "sausage",
},
want: rc.Params{
"error": "arbitrary error on input map[e:sausage]",
"input": rc.Params{
"e": "sausage",
},
"path": "rc/error",
"status": 500,
},
}, {
in: rc.Params{
"_path": "bad/path",
"param": "sausage",
},
want: rc.Params{
"error": "couldn't find path \"bad/path\"",
"input": rc.Params{
"param": "sausage",
},
"path": "bad/path",
"status": 404,
},
}, {
in: rc.Params{
"_path": "test/needs_request",
},
want: rc.Params{
"error": "can't run path \"test/needs_request\" as it needs the request",
"input": rc.Params{},
"path": "test/needs_request",
"status": 400,
},
}, {
in: rc.Params{
"_path": "test/needs_response",
},
want: rc.Params{
"error": "can't run path \"test/needs_response\" as it needs the response",
"input": rc.Params{},
"path": "test/needs_response",
"status": 400,
},
}, {
in: rc.Params{
"nopath": "BOOM",
},
want: rc.Params{
"error": "Didn't find key \"_path\" in input",
"input": rc.Params{
"nopath": "BOOM",
},
"path": "",
"status": 400,
},
}} {
got := NewJobFromParams(ctx, test.in)
assert.Equal(t, test.want, got)
}
}
func TestJobsBatch(t *testing.T) {
ctx := context.Background()
call := rc.Calls.Get("job/batch")
assert.NotNil(t, call)
inJSON := `{
"inputs": [
{
"_path": "rc/noop",
"a": "potato"
},
"bad string",
{
"_path": "rc/noop",
"b": "sausage"
},
{
"_path": "rc/error",
"e": "sausage"
},
{
"_path": "bad/path",
"param": "sausage"
},
{
"_path": "test/needs_request"
},
{
"_path": "test/needs_response"
},
{
"nopath": "BOOM"
}
]
}
`
var in rc.Params
require.NoError(t, json.Unmarshal([]byte(inJSON), &in))
wantJSON := `{
"results": [
{
"a": "potato"
},
{
"error": "\"inputs\" items must be objects not string",
"input": null,
"path": "unknown",
"status": 400
},
{
"b": "sausage"
},
{
"error": "arbitrary error on input map[e:sausage]",
"input": {
"e": "sausage"
},
"path": "rc/error",
"status": 500
},
{
"error": "couldn't find path \"bad/path\"",
"input": {
"param": "sausage"
},
"path": "bad/path",
"status": 404
},
{
"error": "can't run path \"test/needs_request\" as it needs the request",
"input": {},
"path": "test/needs_request",
"status": 400
},
{
"error": "can't run path \"test/needs_response\" as it needs the response",
"input": {},
"path": "test/needs_response",
"status": 400
},
{
"error": "Didn't find key \"_path\" in input",
"input": {
"nopath": "BOOM"
},
"path": "",
"status": 400
}
]
}
`
var want rc.Params
require.NoError(t, json.Unmarshal([]byte(wantJSON), &want))
out, err := call.Fn(ctx, in)
require.NoError(t, err)
var got rc.Params
require.NoError(t, rc.Reshape(&got, out))
assert.Equal(t, want, got)
}
func TestJobsBatchConcurrent(t *testing.T) {
ctx := context.Background()
for concurrency := range 10 {
in := rc.Params{}
var inputs []any
var results []rc.Params
for i := range 100 {
in := map[string]any{
"_path": "rc/noop",
"i": i,
}
inputs = append(inputs, in)
results = append(results, rc.Params{
"i": i,
})
}
in["inputs"] = inputs
want := rc.Params{
"results": results,
}
if concurrency > 0 {
in["concurrency"] = concurrency
}
call := rc.Calls.Get("job/batch")
assert.NotNil(t, call)
got, err := call.Fn(ctx, in)
require.NoError(t, err)
assert.Equal(t, want, got)
}
}