diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 0df3cb7b1..deacd9105 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -5,8 +5,10 @@ import ( "context" "errors" "fmt" + "net/http" "runtime/debug" "slices" + "strings" "sync" "sync/atomic" "time" @@ -17,6 +19,7 @@ import ( "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/filter" "github.com/rclone/rclone/fs/rc" + "golang.org/x/sync/errgroup" ) // Fill in these to avoid circular dependencies @@ -475,3 +478,223 @@ func rcGroupStop(ctx context.Context, in rc.Params) (out rc.Params, err error) { out = make(rc.Params) return out, nil } + +// NewJobFromParams creates an rc job rc.Params. +// +// The JSON blob should contain a _path entry. +// +// It returns a rc.Params as output which may be an error. +func NewJobFromParams(ctx context.Context, in rc.Params) (out rc.Params) { + path := "unknown" + + // Return an rc error blob + rcError := func(err error, status int) rc.Params { + fs.Errorf(nil, "rc: %q: error: %v", path, err) + out, _ = rc.Error(path, in, err, status) + return out + } + + // Find the call + path, err := in.GetString("_path") + if err != nil { + return rcError(err, http.StatusNotFound) + } + delete(in, "_path") + call := rc.Calls.Get(path) + if call == nil { + return rcError(fmt.Errorf("couldn't find path %q", path), http.StatusNotFound) + } + if call.NeedsRequest { + return rcError(fmt.Errorf("can't run path %q as it needs the request", path), http.StatusBadRequest) + } + if call.NeedsResponse { + return rcError(fmt.Errorf("can't run path %q as it needs the response", path), http.StatusBadRequest) + } + + // Pass on the group if one is set in the context and it isn't set in the input. + if _, found := in["_group"]; !found { + group, ok := accounting.StatsGroupFromContext(ctx) + if ok { + in["_group"] = group + } + } + + fs.Debugf(nil, "rc: %q: with parameters %+v", path, in) + _, out, err = NewJob(ctx, call.Fn, in) + if err != nil { + return rcError(err, http.StatusInternalServerError) + } + if out == nil { + out = make(rc.Params) + } + + fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err) + return out +} + +func init() { + rc.Add(rc.Call{ + Path: "job/batch", + AuthRequired: true, // require auth always since sub commands may require it + Fn: rcBatch, + Title: "Run a batch of rclone rc commands concurrently.", + Help: strings.ReplaceAll(` +This takes the following parameters: + +- concurrency - int - do this many commands concurrently. Defaults to |--transfers| if not set. +- inputs - an list of inputs to the commands with an extra |_path| parameter + +|||json +{ + "_path": "rc/path", + "param1": "parameter for the path as documented", + "param2": "parameter for the path as documented, etc", +} +|||json + +The inputs may use |_async|, |_group|, |_config| and |_filter| as normal when using the rc. + +Returns: + +- results - a list of results from the commands with one entry for each in inputs. + +For example: + +|||sh +rclone rc job/batch --json '{ + "inputs": [ + { + "_path": "rc/noop", + "parameter": "OK" + }, + { + "_path": "rc/error", + "parameter": "BAD" + } + ] +} +' +||| + +Gives the result: + +|||json +{ + "results": [ + { + "parameter": "OK" + }, + { + "error": "arbitrary error on input map[parameter:BAD]", + "input": { + "parameter": "BAD" + }, + "path": "rc/error", + "status": 500 + } + ] +} +||| +`, "|", "`"), + }) +} + +/* +// Run a single batch job +func runBatchJob(ctx context.Context, inputAny any) (out rc.Params, err error) { + var in rc.Params + path := "unknown" + defer func() { + if err != nil { + out, _ = rc.Error(path, in, err, http.StatusInternalServerError) + } + }() + + // get the inputs to the job + input, ok := inputAny.(map[string]any) + if !ok { + return nil, rc.NewErrParamInvalid(fmt.Errorf("\"inputs\" items must be objects not %T", inputAny)) + } + in = rc.Params(input) + path, err = in.GetString("_path") + if err != nil { + return nil, err + } + delete(in, "_path") + call := rc.Calls.Get(path) + + // Check call + if call == nil { + return nil, rc.NewErrParamInvalid(fmt.Errorf("path %q does not exist", path)) + } + path = call.Path + if call.NeedsRequest { + return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the request", path)) + } + if call.NeedsResponse { + return nil, rc.NewErrParamInvalid(fmt.Errorf("can't run path %q as it needs the response", path)) + } + + // Run the job + _, out, err = NewJob(ctx, call.Fn, in) + if err != nil { + return nil, err + } + + // Reshape (serialize then deserialize) the data so it is in the form expected + err = rc.Reshape(&out, out) + if err != nil { + return nil, err + } + return out, nil + } +*/ + +// Batch the registered commands +func rcBatch(ctx context.Context, in rc.Params) (out rc.Params, err error) { + out = make(rc.Params) + + // Read inputs + inputsAny, err := in.Get("inputs") + if err != nil { + return nil, err + } + inputs, ok := inputsAny.([]any) + if !ok { + return nil, rc.NewErrParamInvalid(fmt.Errorf("expecting list key %q (was %T)", "inputs", inputsAny)) + } + + // Read concurrency + concurrency, err := in.GetInt64("concurrency") + if rc.IsErrParamNotFound(err) { + ci := fs.GetConfig(ctx) + concurrency = int64(ci.Transfers) + } else if err != nil { + return nil, err + } + + // Prepare outputs + results := make([]rc.Params, len(inputs)) + out["results"] = results + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(int(concurrency)) + for i, inputAny := range inputs { + input, ok := inputAny.(map[string]any) + if !ok { + results[i], _ = rc.Error("unknown", nil, fmt.Errorf("\"inputs\" items must be objects not %T", inputAny), http.StatusBadRequest) + continue + } + in := rc.Params(input) + if concurrency <= 1 { + results[i] = NewJobFromParams(ctx, in) + } else { + g.Go(func() error { + results[i] = NewJobFromParams(gCtx, in) + return nil + }) + } + } + _ = g.Wait() + return out, nil +} diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index de90e26a4..d29252534 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "encoding/json" "errors" "runtime" "testing" @@ -602,3 +603,242 @@ func TestOnFinishDataRace(t *testing.T) { } } } + +// Register some test rc calls +func init() { + rc.Add(rc.Call{ + Path: "test/needs_request", + NeedsRequest: true, + }) + rc.Add(rc.Call{ + Path: "test/needs_response", + NeedsResponse: true, + }) + +} + +func TestNewJobFromParams(t *testing.T) { + ctx := context.Background() + for _, test := range []struct { + in rc.Params + want rc.Params + }{{ + in: rc.Params{ + "_path": "rc/noop", + "a": "potato", + }, + want: rc.Params{ + "a": "potato", + }, + }, { + in: rc.Params{ + "_path": "rc/noop", + "b": "sausage", + }, + want: rc.Params{ + "b": "sausage", + }, + }, { + in: rc.Params{ + "_path": "rc/error", + "e": "sausage", + }, + want: rc.Params{ + "error": "arbitrary error on input map[e:sausage]", + "input": rc.Params{ + "e": "sausage", + }, + "path": "rc/error", + "status": 500, + }, + }, { + in: rc.Params{ + "_path": "bad/path", + "param": "sausage", + }, + want: rc.Params{ + "error": "couldn't find path \"bad/path\"", + "input": rc.Params{ + "param": "sausage", + }, + "path": "bad/path", + "status": 404, + }, + }, { + in: rc.Params{ + "_path": "test/needs_request", + }, + want: rc.Params{ + "error": "can't run path \"test/needs_request\" as it needs the request", + "input": rc.Params{}, + "path": "test/needs_request", + "status": 400, + }, + }, { + in: rc.Params{ + "_path": "test/needs_response", + }, + want: rc.Params{ + "error": "can't run path \"test/needs_response\" as it needs the response", + "input": rc.Params{}, + "path": "test/needs_response", + "status": 400, + }, + }, { + in: rc.Params{ + "nopath": "BOOM", + }, + want: rc.Params{ + "error": "Didn't find key \"_path\" in input", + "input": rc.Params{ + "nopath": "BOOM", + }, + "path": "", + "status": 400, + }, + }} { + got := NewJobFromParams(ctx, test.in) + assert.Equal(t, test.want, got) + } +} + +func TestJobsBatch(t *testing.T) { + ctx := context.Background() + + call := rc.Calls.Get("job/batch") + assert.NotNil(t, call) + + inJSON := `{ + "inputs": [ + { + "_path": "rc/noop", + "a": "potato" + }, + "bad string", + { + "_path": "rc/noop", + "b": "sausage" + }, + { + "_path": "rc/error", + "e": "sausage" + }, + { + "_path": "bad/path", + "param": "sausage" + }, + { + "_path": "test/needs_request" + }, + { + "_path": "test/needs_response" + }, + { + "nopath": "BOOM" + } + ] +} +` + var in rc.Params + require.NoError(t, json.Unmarshal([]byte(inJSON), &in)) + + wantJSON := `{ + "results": [ + { + "a": "potato" + }, + { + "error": "\"inputs\" items must be objects not string", + "input": null, + "path": "unknown", + "status": 400 + }, + { + "b": "sausage" + }, + { + "error": "arbitrary error on input map[e:sausage]", + "input": { + "e": "sausage" + }, + "path": "rc/error", + "status": 500 + }, + { + "error": "couldn't find path \"bad/path\"", + "input": { + "param": "sausage" + }, + "path": "bad/path", + "status": 404 + }, + { + "error": "can't run path \"test/needs_request\" as it needs the request", + "input": {}, + "path": "test/needs_request", + "status": 400 + }, + { + "error": "can't run path \"test/needs_response\" as it needs the response", + "input": {}, + "path": "test/needs_response", + "status": 400 + }, + { + "error": "Didn't find key \"_path\" in input", + "input": { + "nopath": "BOOM" + }, + "path": "", + "status": 400 + } + ] +} +` + + var want rc.Params + require.NoError(t, json.Unmarshal([]byte(wantJSON), &want)) + + out, err := call.Fn(ctx, in) + require.NoError(t, err) + + var got rc.Params + require.NoError(t, rc.Reshape(&got, out)) + + assert.Equal(t, want, got) +} + +func TestJobsBatchConcurrent(t *testing.T) { + ctx := context.Background() + for concurrency := range 10 { + in := rc.Params{} + var inputs []any + var results []rc.Params + for i := range 100 { + in := map[string]any{ + "_path": "rc/noop", + "i": i, + } + inputs = append(inputs, in) + results = append(results, rc.Params{ + "i": i, + }) + } + in["inputs"] = inputs + want := rc.Params{ + "results": results, + } + + if concurrency > 0 { + in["concurrency"] = concurrency + } + call := rc.Calls.Get("job/batch") + assert.NotNil(t, call) + + got, err := call.Fn(ctx, in) + require.NoError(t, err) + + assert.Equal(t, want, got) + } + +}