From 4a4379b3128de7ace692120e1c5ae84754d42047 Mon Sep 17 00:00:00 2001 From: Evan Spensley Date: Sun, 10 Jul 2022 10:42:04 -0400 Subject: [PATCH] jobs: add ability to stop group Adds new rc call to stop all running jobs in a group. Fixes #5561 --- fs/rc/jobs/job.go | 31 +++++++++++++++++++++++++++++++ fs/rc/jobs/job_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index e452fe241..cb3aaa833 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -406,3 +406,34 @@ func rcJobStop(ctx context.Context, in rc.Params) (out rc.Params, err error) { job.Stop() return out, nil } + +func init() { + rc.Add(rc.Call{ + Path: "job/stopgroup", + Fn: rcGroupStop, + Title: "Stop all running jobs in a group", + Help: `Parameters: + +- group - name of the group (string). +`, + }) +} + +// Stops all running jobs in a group +func rcGroupStop(ctx context.Context, in rc.Params) (out rc.Params, err error) { + group, err := in.GetString("group") + if err != nil { + return nil, err + } + running.mu.RLock() + defer running.mu.RUnlock() + for _, job := range running.jobs { + if job.Group == group { + job.mu.Lock() + job.Stop() + job.mu.Unlock() + } + } + out = make(rc.Params) + return out, nil +} diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index cb92e602a..7552465de 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -452,6 +452,48 @@ func TestRcSyncJobStop(t *testing.T) { assert.Equal(t, false, out["success"]) } +func TestRcJobStopGroup(t *testing.T) { + ctx := context.Background() + jobID = 0 + _, _, err := NewJob(ctx, ctxFn, rc.Params{ + "_async": true, + "_group": "myparty", + }) + require.NoError(t, err) + _, _, err = NewJob(ctx, ctxFn, rc.Params{ + "_async": true, + "_group": "myparty", + }) + require.NoError(t, err) + + call := rc.Calls.Get("job/stopgroup") + assert.NotNil(t, call) + in := rc.Params{"group": "myparty"} + out, err := call.Fn(context.Background(), in) + require.NoError(t, err) + require.Empty(t, out) + + in = rc.Params{} + _, err = call.Fn(context.Background(), in) + require.Error(t, err) + assert.Contains(t, err.Error(), "Didn't find key") + + time.Sleep(10 * time.Millisecond) + + call = rc.Calls.Get("job/status") + assert.NotNil(t, call) + for i := 1; i <= 2; i++ { + in = rc.Params{"jobid": i} + out, err = call.Fn(context.Background(), in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, "myparty", out["group"]) + assert.Equal(t, "context canceled", out["error"]) + assert.Equal(t, true, out["finished"]) + assert.Equal(t, false, out["success"]) + } +} + func TestOnFinish(t *testing.T) { jobID = 0 done := make(chan struct{})