From 2337f1854a2ffcd9d0a60923f3910be130a12718 Mon Sep 17 00:00:00 2001 From: Anbraten Date: Mon, 20 Mar 2023 04:50:56 +0100 Subject: [PATCH] Add queue details UI for admins (#1632) # Changes - Adds an admin view to see the whole work-queue of the server. - The admin can also pause / resume the queue. - The view is reloading data every 5 seconds automatically. - The task model from queue got removed in favor of the one from models. --- docs/docs/91-migrations.md | 1 + server/grpc/filter.go | 3 +- server/grpc/filter_test.go | 16 +- server/model/task.go | 80 +++++++- server/pipeline/queue.go | 7 +- server/queue/fifo.go | 53 +++-- server/queue/fifo_test.go | 193 +++++++++--------- server/queue/persistent.go | 37 +--- server/queue/queue.go | 93 +-------- server/router/api.go | 4 +- server/store/datastore/task_test.go | 4 +- web/components.d.ts | 3 + web/src/assets/locales/en.json | 12 ++ .../admin/settings/AdminQueueTab.vue | 147 +++++++++++++ web/src/components/atomic/Icon.vue | 4 +- web/src/lib/api/index.ts | 13 ++ web/src/lib/api/types/index.ts | 1 + web/src/lib/api/types/queue.ts | 22 ++ web/src/views/admin/AdminSettings.vue | 4 + 19 files changed, 432 insertions(+), 265 deletions(-) create mode 100644 web/src/components/admin/settings/AdminQueueTab.vue create mode 100644 web/src/lib/api/types/queue.ts diff --git a/docs/docs/91-migrations.md b/docs/docs/91-migrations.md index 0dc91b253..82b5b29f1 100644 --- a/docs/docs/91-migrations.md +++ b/docs/docs/91-migrations.md @@ -16,6 +16,7 @@ Some versions need some changes to the server configuration or the pipeline conf - Renamed config env `WOODPECKER_MAX_PROCS` to `WOODPECKER_MAX_WORKFLOWS` (still available as fallback) - The pipelines are now also read from `.yaml` files, the new default order is `.woodpecker/*.yml` and `.woodpecker/*.yaml` (without any prioritization) -> `.woodpecker.yml` -> `.woodpecker.yaml` -> `.drone.yml` - Dropped support for [Coding](https://coding.net/). +- `/api/queue/resume` & `/api/queue/pause` endpoint methods were changed from `GET` to `POST` ## 0.15.0 diff --git a/server/grpc/filter.go b/server/grpc/filter.go index 78ecd227d..b1daab23b 100644 --- a/server/grpc/filter.go +++ b/server/grpc/filter.go @@ -16,11 +16,12 @@ package grpc import ( "github.com/woodpecker-ci/woodpecker/pipeline/rpc" + "github.com/woodpecker-ci/woodpecker/server/model" "github.com/woodpecker-ci/woodpecker/server/queue" ) func createFilterFunc(agentFilter rpc.Filter) (queue.FilterFn, error) { - return func(task *queue.Task) bool { + return func(task *model.Task) bool { for taskLabel, taskLabelValue := range task.Labels { // if a task label is empty it will be ignored if taskLabelValue == "" { diff --git a/server/grpc/filter_test.go b/server/grpc/filter_test.go index 5b8780cd1..23fad5449 100644 --- a/server/grpc/filter_test.go +++ b/server/grpc/filter_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/woodpecker-ci/woodpecker/pipeline/rpc" - "github.com/woodpecker-ci/woodpecker/server/queue" + "github.com/woodpecker-ci/woodpecker/server/model" ) func TestCreateFilterFunc(t *testing.T) { @@ -29,13 +29,13 @@ func TestCreateFilterFunc(t *testing.T) { tests := []struct { name string agentLabels map[string]string - task queue.Task + task model.Task exp bool }{ { name: "agent with missing labels", agentLabels: map[string]string{"repo": "test/woodpecker"}, - task: queue.Task{ + task: model.Task{ Labels: map[string]string{"platform": "linux/amd64", "repo": "test/woodpecker"}, }, exp: false, @@ -43,7 +43,7 @@ func TestCreateFilterFunc(t *testing.T) { { name: "agent with wrong labels", agentLabels: map[string]string{"platform": "linux/arm64"}, - task: queue.Task{ + task: model.Task{ Labels: map[string]string{"platform": "linux/amd64"}, }, exp: false, @@ -51,7 +51,7 @@ func TestCreateFilterFunc(t *testing.T) { { name: "agent with correct labels", agentLabels: map[string]string{"platform": "linux/amd64", "location": "europe"}, - task: queue.Task{ + task: model.Task{ Labels: map[string]string{"platform": "linux/amd64", "location": "europe"}, }, exp: true, @@ -59,7 +59,7 @@ func TestCreateFilterFunc(t *testing.T) { { name: "agent with additional labels", agentLabels: map[string]string{"platform": "linux/amd64", "location": "europe"}, - task: queue.Task{ + task: model.Task{ Labels: map[string]string{"platform": "linux/amd64"}, }, exp: true, @@ -67,7 +67,7 @@ func TestCreateFilterFunc(t *testing.T) { { name: "agent with wildcard label", agentLabels: map[string]string{"platform": "linux/amd64", "location": "*"}, - task: queue.Task{ + task: model.Task{ Labels: map[string]string{"platform": "linux/amd64", "location": "america"}, }, exp: true, @@ -75,7 +75,7 @@ func TestCreateFilterFunc(t *testing.T) { { name: "agent with platform label and task without", agentLabels: map[string]string{"platform": "linux/amd64"}, - task: queue.Task{ + task: model.Task{ Labels: map[string]string{"platform": ""}, }, exp: true, diff --git a/server/model/task.go b/server/model/task.go index 1f4cd758f..0ec4ae3e1 100644 --- a/server/model/task.go +++ b/server/model/task.go @@ -14,6 +14,11 @@ package model +import ( + "fmt" + "strings" +) + // TaskStore defines storage for scheduled Tasks. type TaskStore interface { TaskList() ([]*Task, error) @@ -21,17 +26,80 @@ type TaskStore interface { TaskDelete(string) error } +type TaskStatusValue string + +const ( + TaskStatusSkipped TaskStatusValue = "skipped" + TaskStatusSuccess TaskStatusValue = "success" + TaskStatusFailure TaskStatusValue = "failure" +) + // Task defines scheduled pipeline Task. type Task struct { - ID string `xorm:"PK UNIQUE 'task_id'"` - Data []byte `xorm:"'task_data'"` - Labels map[string]string `xorm:"json 'task_labels'"` - Dependencies []string `xorm:"json 'task_dependencies'"` - RunOn []string `xorm:"json 'task_run_on'"` - DepStatus map[string]string `xorm:"json 'task_dep_status'"` + ID string `json:"id" xorm:"PK UNIQUE 'task_id'"` + Data []byte `json:"data" xorm:"'task_data'"` + Labels map[string]string `json:"labels" xorm:"json 'task_labels'"` + Dependencies []string `json:"dependencies" xorm:"json 'task_dependencies'"` + RunOn []string `json:"run_on" xorm:"json 'task_run_on'"` + DepStatus map[string]StatusValue `json:"dep_status" xorm:"json 'task_dep_status'"` } // TableName return database table name for xorm func (Task) TableName() string { return "tasks" } + +func (t *Task) String() string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("%s (%s) - %s", t.ID, t.Dependencies, t.DepStatus)) + return sb.String() +} + +// ShouldRun tells if a task should be run or skipped, based on dependencies +func (t *Task) ShouldRun() bool { + if t.runsOnFailure() && t.runsOnSuccess() { + return true + } + + if !t.runsOnFailure() && t.runsOnSuccess() { + for _, status := range t.DepStatus { + if status != StatusSuccess { + return false + } + } + return true + } + + if t.runsOnFailure() && !t.runsOnSuccess() { + for _, status := range t.DepStatus { + if status == StatusSuccess { + return false + } + } + return true + } + + return false +} + +func (t *Task) runsOnFailure() bool { + for _, status := range t.RunOn { + if status == string(StatusFailure) { + return true + } + } + return false +} + +func (t *Task) runsOnSuccess() bool { + if len(t.RunOn) == 0 { + return true + } + + for _, status := range t.RunOn { + if status == string(StatusSuccess) { + return true + } + } + return false +} diff --git a/server/pipeline/queue.go b/server/pipeline/queue.go index 947f2947b..5fa359b55 100644 --- a/server/pipeline/queue.go +++ b/server/pipeline/queue.go @@ -23,16 +23,15 @@ import ( "github.com/woodpecker-ci/woodpecker/pipeline/rpc" "github.com/woodpecker-ci/woodpecker/server" "github.com/woodpecker-ci/woodpecker/server/model" - "github.com/woodpecker-ci/woodpecker/server/queue" ) func queuePipeline(repo *model.Repo, pipelineItems []*pipeline.Item) error { - var tasks []*queue.Task + var tasks []*model.Task for _, item := range pipelineItems { if item.Step.State == model.StatusSkipped { continue } - task := new(queue.Task) + task := new(model.Task) task.ID = fmt.Sprint(item.Step.ID) task.Labels = map[string]string{} for k, v := range item.Labels { @@ -42,7 +41,7 @@ func queuePipeline(repo *model.Repo, pipelineItems []*pipeline.Item) error { task.Labels["repo"] = repo.FullName task.Dependencies = taskIds(item.DependsOn, pipelineItems) task.RunOn = item.RunsOn - task.DepStatus = make(map[string]string) + task.DepStatus = make(map[string]model.StatusValue) task.Data, _ = json.Marshal(rpc.Pipeline{ ID: fmt.Sprint(item.Step.ID), diff --git a/server/queue/fifo.go b/server/queue/fifo.go index cd3c78b17..e38425678 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -26,14 +26,8 @@ import ( "github.com/rs/zerolog/log" ) -const ( - StatusSkipped = "skipped" - StatusSuccess = "success" - StatusFailure = "failure" -) - type entry struct { - item *Task + item *model.Task done chan bool error error deadline time.Time @@ -41,7 +35,7 @@ type entry struct { type worker struct { filter FilterFn - channel chan *Task + channel chan *model.Task } type fifo struct { @@ -68,7 +62,7 @@ func New(_ context.Context) Queue { } // Push pushes an item to the tail of this queue. -func (q *fifo) Push(_ context.Context, task *Task) error { +func (q *fifo) Push(_ context.Context, task *model.Task) error { q.Lock() q.pending.PushBack(task) q.Unlock() @@ -77,7 +71,7 @@ func (q *fifo) Push(_ context.Context, task *Task) error { } // Push pushes an item to the tail of this queue. -func (q *fifo) PushAtOnce(_ context.Context, tasks []*Task) error { +func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { q.Lock() for _, task := range tasks { q.pending.PushBack(task) @@ -88,10 +82,10 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*Task) error { } // Poll retrieves and removes the head of this queue. -func (q *fifo) Poll(c context.Context, f FilterFn) (*Task, error) { +func (q *fifo) Poll(c context.Context, f FilterFn) (*model.Task, error) { q.Lock() w := &worker{ - channel: make(chan *Task, 1), + channel: make(chan *model.Task, 1), filter: f, } q.workers[w] = struct{}{} @@ -113,20 +107,20 @@ func (q *fifo) Poll(c context.Context, f FilterFn) (*Task, error) { // Done signals that the item is done executing. func (q *fifo) Done(_ context.Context, id string, exitStatus model.StatusValue) error { - return q.finished([]string{id}, string(exitStatus), nil) + return q.finished([]string{id}, exitStatus, nil) } // Error signals that the item is done executing with error. func (q *fifo) Error(_ context.Context, id string, err error) error { - return q.finished([]string{id}, StatusFailure, err) + return q.finished([]string{id}, model.StatusFailure, err) } // Error signals that the item is done executing with error. func (q *fifo) ErrorAtOnce(_ context.Context, id []string, err error) error { - return q.finished(id, StatusFailure, err) + return q.finished(id, model.StatusFailure, err) } -func (q *fifo) finished(ids []string, exitStatus string, err error) error { +func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error { q.Lock() for _, id := range ids { @@ -159,7 +153,7 @@ func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error { var next *list.Element for e := q.pending.Front(); e != nil; e = next { next = e.Next() - task, ok := e.Value.(*Task) + task, ok := e.Value.(*model.Task) if ok && task.ID == id { q.pending.Remove(e) return nil @@ -205,12 +199,13 @@ func (q *fifo) Info(_ context.Context) InfoT { stats.Stats.Pending = q.pending.Len() stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.Running = len(q.running) + stats.Stats.Complete = 0 // TODO: implement this for e := q.pending.Front(); e != nil; e = e.Next() { - stats.Pending = append(stats.Pending, e.Value.(*Task)) + stats.Pending = append(stats.Pending, e.Value.(*model.Task)) } for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() { - stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*Task)) + stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*model.Task)) } for _, entry := range q.running { stats.Running = append(stats.Running, entry.item) @@ -258,7 +253,7 @@ func (q *fifo) process() { q.resubmitExpiredPipelines() q.filterWaiting() for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { - task := pending.Value.(*Task) + task := pending.Value.(*model.Task) delete(q.workers, worker) q.pending.Remove(pending) q.running[task.ID] = &entry{ @@ -275,7 +270,7 @@ func (q *fifo) filterWaiting() { var nextWaiting *list.Element for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting { nextWaiting = e.Next() - task := e.Value.(*Task) + task := e.Value.(*model.Task) q.pending.PushBack(task) } @@ -285,7 +280,7 @@ func (q *fifo) filterWaiting() { var nextPending *list.Element for e := q.pending.Front(); e != nil; e = nextPending { nextPending = e.Next() - task := e.Value.(*Task) + task := e.Value.(*model.Task) if q.depsInQueue(task) { log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID) q.waitingOnDeps.PushBack(task) @@ -303,7 +298,7 @@ func (q *fifo) assignToWorker() (*list.Element, *worker) { var next *list.Element for e := q.pending.Front(); e != nil; e = next { next = e.Next() - task := e.Value.(*Task) + task := e.Value.(*model.Task) log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) for w := range q.workers { @@ -327,11 +322,11 @@ func (q *fifo) resubmitExpiredPipelines() { } } -func (q *fifo) depsInQueue(task *Task) bool { +func (q *fifo) depsInQueue(task *model.Task) bool { var next *list.Element for e := q.pending.Front(); e != nil; e = next { next = e.Next() - possibleDep, ok := e.Value.(*Task) + possibleDep, ok := e.Value.(*model.Task) log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID) for _, dep := range task.Dependencies { if ok && possibleDep.ID == dep { @@ -350,11 +345,11 @@ func (q *fifo) depsInQueue(task *Task) bool { return false } -func (q *fifo) updateDepStatusInQueue(taskID, status string) { +func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { var next *list.Element for e := q.pending.Front(); e != nil; e = next { next = e.Next() - pending, ok := e.Value.(*Task) + pending, ok := e.Value.(*model.Task) for _, dep := range pending.Dependencies { if ok && taskID == dep { pending.DepStatus[dep] = status @@ -372,7 +367,7 @@ func (q *fifo) updateDepStatusInQueue(taskID, status string) { for e := q.waitingOnDeps.Front(); e != nil; e = next { next = e.Next() - waiting, ok := e.Value.(*Task) + waiting, ok := e.Value.(*model.Task) for _, dep := range waiting.Dependencies { if ok && taskID == dep { waiting.DepStatus[dep] = status @@ -386,7 +381,7 @@ func (q *fifo) removeFromPending(taskID string) { var next *list.Element for e := q.pending.Front(); e != nil; e = next { next = e.Next() - task := e.Value.(*Task) + task := e.Value.(*model.Task) if task.ID == taskID { log.Debug().Msgf("queue: %s is removed from pending", taskID) q.pending.Remove(e) diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index 668ae6cbf..595238f58 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -9,12 +9,13 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/woodpecker-ci/woodpecker/server/model" ) var noContext = context.Background() func TestFifo(t *testing.T) { - want := &Task{ID: "1"} + want := &model.Task{ID: "1"} q := New(context.Background()) assert.NoError(t, q.Push(noContext, want)) @@ -24,7 +25,7 @@ func TestFifo(t *testing.T) { return } - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != want { t.Errorf("expect task returned form queue") return @@ -40,7 +41,7 @@ func TestFifo(t *testing.T) { return } - assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess)) + assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) info = q.Info(noContext) if len(info.Pending) != 0 { t.Errorf("expect task removed from pending queue") @@ -53,7 +54,7 @@ func TestFifo(t *testing.T) { } func TestFifoExpire(t *testing.T) { - want := &Task{ID: "1"} + want := &model.Task{ID: "1"} q := New(context.Background()).(*fifo) q.extension = 0 @@ -64,7 +65,7 @@ func TestFifoExpire(t *testing.T) { return } - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != want { t.Errorf("expect task returned form queue") return @@ -78,12 +79,12 @@ func TestFifoExpire(t *testing.T) { } func TestFifoWait(t *testing.T) { - want := &Task{ID: "1"} + want := &model.Task{ID: "1"} q := New(context.Background()).(*fifo) assert.NoError(t, q.Push(noContext, want)) - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != want { t.Errorf("expect task returned form queue") return @@ -97,12 +98,12 @@ func TestFifoWait(t *testing.T) { }() <-time.After(time.Millisecond) - assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess)) + assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) wg.Wait() } func TestFifoEvict(t *testing.T) { - t1 := &Task{ID: "1"} + t1 := &model.Task{ID: "1"} q := New(context.Background()) assert.NoError(t, q.Push(noContext, t1)) @@ -123,28 +124,28 @@ func TestFifoEvict(t *testing.T) { } func TestFifoDependencies(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } - task2 := &Task{ + task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } q := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task1})) + assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1})) - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != task1 { t.Errorf("expect task1 returned from queue as task2 depends on it") return } - assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess)) + assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) - got, _ = q.Poll(noContext, func(*Task) bool { return true }) + got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) if got != task2 { t.Errorf("expect task2 returned from queue") return @@ -152,27 +153,27 @@ func TestFifoDependencies(t *testing.T) { } func TestFifoErrors(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } - task2 := &Task{ + task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } - task3 := &Task{ + task3 := &model.Task{ ID: "3", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), RunOn: []string{"success", "failure"}, } q := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1})) + assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != task1 { t.Errorf("expect task1 returned from queue as task2 depends on it") return @@ -180,7 +181,7 @@ func TestFifoErrors(t *testing.T) { assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))) - got, _ = q.Poll(noContext, func(*Task) bool { return true }) + got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) if got != task2 { t.Errorf("expect task2 returned from queue") return @@ -191,7 +192,7 @@ func TestFifoErrors(t *testing.T) { return } - got, _ = q.Poll(noContext, func(*Task) bool { return true }) + got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) if got != task3 { t.Errorf("expect task3 returned from queue") return @@ -204,39 +205,39 @@ func TestFifoErrors(t *testing.T) { } func TestFifoErrors2(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } - task2 := &Task{ + task2 := &model.Task{ ID: "2", } - task3 := &Task{ + task3 := &model.Task{ ID: "3", Dependencies: []string{"1", "2"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } q := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1})) + assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) for i := 0; i < 2; i++ { - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != task1 && got != task2 { t.Errorf("expect task1 or task2 returned from queue as task3 depends on them") return } if got != task1 { - assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess)) + assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) } if got != task2 { assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))) } } - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != task3 { t.Errorf("expect task3 returned from queue") return @@ -249,32 +250,32 @@ func TestFifoErrors2(t *testing.T) { } func TestFifoErrorsMultiThread(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } - task2 := &Task{ + task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } - task3 := &Task{ + task3 := &model.Task{ ID: "3", Dependencies: []string{"1", "2"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } q := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1})) + assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - obtainedWorkCh := make(chan *Task) + obtainedWorkCh := make(chan *model.Task) for i := 0; i < 10; i++ { go func(i int) { for { fmt.Printf("Worker %d started\n", i) - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) obtainedWorkCh <- got } }(i) @@ -298,7 +299,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) obtainedWorkCh <- got } }() @@ -308,11 +309,11 @@ func TestFifoErrorsMultiThread(t *testing.T) { return } task2Processed = true - assert.NoError(t, q.Done(noContext, got.ID, StatusSuccess)) + assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess)) go func() { for { fmt.Printf("Worker spawned\n") - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) obtainedWorkCh <- got } }() @@ -339,33 +340,33 @@ func TestFifoErrorsMultiThread(t *testing.T) { } func TestFifoTransitiveErrors(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } - task2 := &Task{ + task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } - task3 := &Task{ + task3 := &model.Task{ ID: "3", Dependencies: []string{"2"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } q := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1})) + assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) if got != task1 { t.Errorf("expect task1 returned from queue as task2 depends on it") return } assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))) - got, _ = q.Poll(noContext, func(*Task) bool { return true }) + got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) if got != task2 { t.Errorf("expect task2 returned from queue") return @@ -374,9 +375,9 @@ func TestFifoTransitiveErrors(t *testing.T) { t.Errorf("expect task2 should not run, since task1 failed") return } - assert.NoError(t, q.Done(noContext, got.ID, StatusSkipped)) + assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped)) - got, _ = q.Poll(noContext, func(*Task) bool { return true }) + got, _ = q.Poll(noContext, func(*model.Task) bool { return true }) if got != task3 { t.Errorf("expect task3 returned from queue") return @@ -388,27 +389,27 @@ func TestFifoTransitiveErrors(t *testing.T) { } func TestFifoCancel(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } - task2 := &Task{ + task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } - task3 := &Task{ + task3 := &model.Task{ ID: "3", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), RunOn: []string{"success", "failure"}, } q := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1})) + assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - _, _ = q.Poll(noContext, func(*Task) bool { return true }) + _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled"))) assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled"))) @@ -421,7 +422,7 @@ func TestFifoCancel(t *testing.T) { } func TestFifoPause(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } @@ -429,7 +430,7 @@ func TestFifoPause(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - _, _ = q.Poll(noContext, func(*Task) bool { return true }) + _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) wg.Done() }() @@ -449,11 +450,11 @@ func TestFifoPause(t *testing.T) { q.Pause() assert.NoError(t, q.Push(noContext, task1)) q.Resume() - _, _ = q.Poll(noContext, func(*Task) bool { return true }) + _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) } func TestFifoPauseResume(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } @@ -462,31 +463,31 @@ func TestFifoPauseResume(t *testing.T) { assert.NoError(t, q.Push(noContext, task1)) q.Resume() - _, _ = q.Poll(noContext, func(*Task) bool { return true }) + _, _ = q.Poll(noContext, func(*model.Task) bool { return true }) } func TestWaitingVsPending(t *testing.T) { - task1 := &Task{ + task1 := &model.Task{ ID: "1", } - task2 := &Task{ + task2 := &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), } - task3 := &Task{ + task3 := &model.Task{ ID: "3", Dependencies: []string{"1"}, - DepStatus: make(map[string]string), + DepStatus: make(map[string]model.StatusValue), RunOn: []string{"success", "failure"}, } q := New(context.Background()).(*fifo) - assert.NoError(t, q.PushAtOnce(noContext, []*Task{task2, task3, task1})) + assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1})) - got, _ := q.Poll(noContext, func(*Task) bool { return true }) + got, _ := q.Poll(noContext, func(*model.Task) bool { return true }) info := q.Info(noContext) if info.Stats.WaitingOnDeps != 2 { @@ -494,7 +495,7 @@ func TestWaitingVsPending(t *testing.T) { } assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))) - got, err := q.Poll(noContext, func(*Task) bool { return true }) + got, err := q.Poll(noContext, func(*model.Task) bool { return true }) assert.NoError(t, err) assert.EqualValues(t, task2, got) @@ -508,11 +509,11 @@ func TestWaitingVsPending(t *testing.T) { } func TestShouldRun(t *testing.T) { - task := &Task{ + task := &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]string{ - "1": StatusSuccess, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusSuccess, }, RunOn: []string{"failure"}, } @@ -521,11 +522,11 @@ func TestShouldRun(t *testing.T) { return } - task = &Task{ + task = &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]string{ - "1": StatusSuccess, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusSuccess, }, RunOn: []string{"failure", "success"}, } @@ -534,11 +535,11 @@ func TestShouldRun(t *testing.T) { return } - task = &Task{ + task = &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]string{ - "1": StatusFailure, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusFailure, }, } if task.ShouldRun() { @@ -546,11 +547,11 @@ func TestShouldRun(t *testing.T) { return } - task = &Task{ + task = &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]string{ - "1": StatusSuccess, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusSuccess, }, RunOn: []string{"success"}, } @@ -559,11 +560,11 @@ func TestShouldRun(t *testing.T) { return } - task = &Task{ + task = &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]string{ - "1": StatusFailure, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusFailure, }, RunOn: []string{"failure"}, } @@ -572,23 +573,23 @@ func TestShouldRun(t *testing.T) { return } - task = &Task{ + task = &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]string{ - "1": StatusSkipped, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusSkipped, }, } if task.ShouldRun() { - t.Errorf("Tasked should not run if dependency is skipped") + t.Errorf("model.Tasked should not run if dependency is skipped") return } - task = &Task{ + task = &model.Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]string{ - "1": StatusSkipped, + DepStatus: map[string]model.StatusValue{ + "1": model.StatusSkipped, }, RunOn: []string{"failure"}, } diff --git a/server/queue/persistent.go b/server/queue/persistent.go index e92a82642..c7530e518 100644 --- a/server/queue/persistent.go +++ b/server/queue/persistent.go @@ -28,18 +28,7 @@ import ( // ensures the task Queue can be restored when the system starts. func WithTaskStore(q Queue, s model.TaskStore) Queue { tasks, _ := s.TaskList() - var toEnqueue []*Task - for _, task := range tasks { - toEnqueue = append(toEnqueue, &Task{ - ID: task.ID, - Data: task.Data, - Labels: task.Labels, - Dependencies: task.Dependencies, - RunOn: task.RunOn, - DepStatus: task.DepStatus, - }) - } - if err := q.PushAtOnce(context.Background(), toEnqueue); err != nil { + if err := q.PushAtOnce(context.Background(), tasks); err != nil { log.Error().Err(err).Msg("PushAtOnce failed") } return &persistentQueue{q, s} @@ -51,15 +40,8 @@ type persistentQueue struct { } // Push pushes a task to the tail of this queue. -func (q *persistentQueue) Push(c context.Context, task *Task) error { - if err := q.store.TaskInsert(&model.Task{ - ID: task.ID, - Data: task.Data, - Labels: task.Labels, - Dependencies: task.Dependencies, - RunOn: task.RunOn, - DepStatus: task.DepStatus, - }); err != nil { +func (q *persistentQueue) Push(c context.Context, task *model.Task) error { + if err := q.store.TaskInsert(task); err != nil { return err } err := q.Queue.Push(c, task) @@ -72,17 +54,10 @@ func (q *persistentQueue) Push(c context.Context, task *Task) error { } // PushAtOnce pushes multiple tasks to the tail of this queue. -func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*Task) error { +func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*model.Task) error { // TODO: invent store.NewSession who return context including a session and make TaskInsert & TaskDelete use it for _, task := range tasks { - if err := q.store.TaskInsert(&model.Task{ - ID: task.ID, - Data: task.Data, - Labels: task.Labels, - Dependencies: task.Dependencies, - RunOn: task.RunOn, - DepStatus: task.DepStatus, - }); err != nil { + if err := q.store.TaskInsert(task); err != nil { return err } } @@ -98,7 +73,7 @@ func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*Task) error { } // Poll retrieves and removes a task head of this queue. -func (q *persistentQueue) Poll(c context.Context, f FilterFn) (*Task, error) { +func (q *persistentQueue) Poll(c context.Context, f FilterFn) (*model.Task, error) { task, err := q.Queue.Poll(c, f) if task != nil { log.Debug().Msgf("pull queue item: %s: remove from backup", task.ID) diff --git a/server/queue/queue.go b/server/queue/queue.go index c8f3efbed..5ab73fd59 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -3,7 +3,6 @@ package queue import ( "context" "errors" - "fmt" "strings" "github.com/woodpecker-ci/woodpecker/server/model" @@ -17,87 +16,11 @@ var ( ErrNotFound = errors.New("queue: task not found") ) -// Task defines a unit of work in the queue. -type Task struct { - // ID identifies this task. - ID string `json:"id,omitempty"` - - // Data is the actual data in the entry. - Data []byte `json:"data"` - - // Labels represents the key-value pairs the entry is labeled with. - Labels map[string]string `json:"labels,omitempty"` - - // Task IDs this task depend - Dependencies []string - - // Dependency's exit status - DepStatus map[string]string - - // RunOn failure or success - RunOn []string -} - -// ShouldRun tells if a task should be run or skipped, based on dependencies -func (t *Task) ShouldRun() bool { - if runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) { - return true - } - - if !runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) { - for _, status := range t.DepStatus { - if StatusSuccess != status { - return false - } - } - return true - } - - if runsOnFailure(t.RunOn) && !runsOnSuccess(t.RunOn) { - for _, status := range t.DepStatus { - if StatusSuccess == status { - return false - } - } - return true - } - - return false -} - -func (t *Task) String() string { - var sb strings.Builder - sb.WriteString(fmt.Sprintf("%s (%s) - %s", t.ID, t.Dependencies, t.DepStatus)) - return sb.String() -} - -func runsOnFailure(runsOn []string) bool { - for _, status := range runsOn { - if status == "failure" { - return true - } - } - return false -} - -func runsOnSuccess(runsOn []string) bool { - if len(runsOn) == 0 { - return true - } - - for _, status := range runsOn { - if status == "success" { - return true - } - } - return false -} - // InfoT provides runtime information. type InfoT struct { - Pending []*Task `json:"pending"` - WaitingOnDeps []*Task `json:"waiting_on_deps"` - Running []*Task `json:"running"` + Pending []*model.Task `json:"pending"` + WaitingOnDeps []*model.Task `json:"waiting_on_deps"` + Running []*model.Task `json:"running"` Stats struct { Workers int `json:"worker_count"` Pending int `json:"pending_count"` @@ -105,7 +28,7 @@ type InfoT struct { Running int `json:"running_count"` Complete int `json:"completed_count"` } `json:"stats"` - Paused bool + Paused bool `json:"paused"` } func (t *InfoT) String() string { @@ -128,19 +51,19 @@ func (t *InfoT) String() string { // Filter filters tasks in the queue. If the Filter returns false, // the Task is skipped and not returned to the subscriber. -type FilterFn func(*Task) bool +type FilterFn func(*model.Task) bool // Queue defines a task queue for scheduling tasks among // a pool of workers. type Queue interface { // Push pushes a task to the tail of this queue. - Push(c context.Context, task *Task) error + Push(c context.Context, task *model.Task) error // PushAtOnce pushes a task to the tail of this queue. - PushAtOnce(c context.Context, tasks []*Task) error + PushAtOnce(c context.Context, tasks []*model.Task) error // Poll retrieves and removes a task head of this queue. - Poll(c context.Context, f FilterFn) (*Task, error) + Poll(c context.Context, f FilterFn) (*model.Task, error) // Extend extends the deadline for a task. Extend(c context.Context, id string) error diff --git a/server/router/api.go b/server/router/api.go index 23094a6f9..dcb7468bf 100644 --- a/server/router/api.go +++ b/server/router/api.go @@ -145,8 +145,8 @@ func apiRoutes(e *gin.Engine) { { queue.Use(session.MustAdmin()) queue.GET("/info", api.GetQueueInfo) - queue.GET("/pause", api.PauseQueue) - queue.GET("/resume", api.ResumeQueue) + queue.POST("/pause", api.PauseQueue) + queue.POST("/resume", api.ResumeQueue) queue.GET("/norunningpipelines", api.BlockTilQueueHasRunningItem) } diff --git a/server/store/datastore/task_test.go b/server/store/datastore/task_test.go index 64fe66984..0d53b578e 100644 --- a/server/store/datastore/task_test.go +++ b/server/store/datastore/task_test.go @@ -30,7 +30,7 @@ func TestTaskList(t *testing.T) { ID: "some_random_id", Data: []byte("foo"), Labels: map[string]string{"foo": "bar"}, - DepStatus: map[string]string{"test": "dep"}, + DepStatus: map[string]model.StatusValue{"test": "dep"}, })) list, err := store.TaskList() @@ -41,7 +41,7 @@ func TestTaskList(t *testing.T) { assert.Len(t, list, 1, "Expected one task in list") assert.Equal(t, "some_random_id", list[0].ID) assert.Equal(t, "foo", string(list[0].Data)) - assert.EqualValues(t, map[string]string{"test": "dep"}, list[0].DepStatus) + assert.EqualValues(t, map[string]model.StatusValue{"test": "dep"}, list[0].DepStatus) err = store.TaskDelete("some_random_id") if err != nil { diff --git a/web/components.d.ts b/web/components.d.ts index 805089970..38640e0f4 100644 --- a/web/components.d.ts +++ b/web/components.d.ts @@ -12,6 +12,7 @@ declare module '@vue/runtime-core' { ActionsTab: typeof import('./src/components/repo/settings/ActionsTab.vue')['default'] ActivePipelines: typeof import('./src/components/layout/header/ActivePipelines.vue')['default'] AdminAgentsTab: typeof import('./src/components/admin/settings/AdminAgentsTab.vue')['default'] + AdminQueueTab: typeof import('./src/components/admin/settings/AdminQueueTab.vue')['default'] AdminSecretsTab: typeof import('./src/components/admin/settings/AdminSecretsTab.vue')['default'] AdminUsersTab: typeof import('./src/components/admin/settings/AdminUsersTab.vue')['default'] Badge: typeof import('./src/components/atomic/Badge.vue')['default'] @@ -19,6 +20,7 @@ declare module '@vue/runtime-core' { Button: typeof import('./src/components/atomic/Button.vue')['default'] Checkbox: typeof import('./src/components/form/Checkbox.vue')['default'] CheckboxesField: typeof import('./src/components/form/CheckboxesField.vue')['default'] + copy: typeof import('./src/components/admin/settings/AdminAgentsTab copy.vue')['default'] CronTab: typeof import('./src/components/repo/settings/CronTab.vue')['default'] DeployPipelinePopup: typeof import('./src/components/layout/popups/DeployPipelinePopup.vue')['default'] DocsLink: typeof import('./src/components/atomic/DocsLink.vue')['default'] @@ -45,6 +47,7 @@ declare module '@vue/runtime-core' { IIcBaselineFileDownload: typeof import('~icons/ic/baseline-file-download')['default'] IIcBaselineFileDownloadOff: typeof import('~icons/ic/baseline-file-download-off')['default'] IIcBaselineHealing: typeof import('~icons/ic/baseline-healing')['default'] + IIcBaselinePause: typeof import('~icons/ic/baseline-pause')['default'] IIcBaselinePlayArrow: typeof import('~icons/ic/baseline-play-arrow')['default'] IIconoirArrowLeft: typeof import('~icons/iconoir/arrow-left')['default'] IIconParkOutlineAlarmClock: typeof import('~icons/icon-park-outline/alarm-clock')['default'] diff --git a/web/src/assets/locales/en.json b/web/src/assets/locales/en.json index b84949bea..8f4e483b4 100644 --- a/web/src/assets/locales/en.json +++ b/web/src/assets/locales/en.json @@ -366,6 +366,18 @@ "never": "Never", "delete_confirm": "Do you really want to delete this agent? It wont be able to connected to the server anymore." }, + "queue": { + "queue": "Queue", + "desc": "Tasks waiting to be executed by agents", + "pause": "Pause", + "resume": "Resume", + "paused": "Queue is paused", + "resumed": "Queue is resumed", + "tasks": "Tasks", + "task_running": "Task is running", + "task_pending": "Task is pending", + "task_waiting_on_deps": "Task is waiting on dependencies" + }, "users": { "users": "Users", "desc": "Users registered for this server", diff --git a/web/src/components/admin/settings/AdminQueueTab.vue b/web/src/components/admin/settings/AdminQueueTab.vue new file mode 100644 index 000000000..29833f499 --- /dev/null +++ b/web/src/components/admin/settings/AdminQueueTab.vue @@ -0,0 +1,147 @@ + + + diff --git a/web/src/components/atomic/Icon.vue b/web/src/components/atomic/Icon.vue index bf4395ec1..0ef345fa9 100644 --- a/web/src/components/atomic/Icon.vue +++ b/web/src/components/atomic/Icon.vue @@ -43,6 +43,7 @@ +
@@ -92,7 +93,8 @@ export type IconNames = | 'auto-scroll' | 'auto-scroll-off' | 'refresh' - | 'play'; + | 'play' + | 'pause'; defineProps<{ name: IconNames; diff --git a/web/src/lib/api/index.ts b/web/src/lib/api/index.ts index 6eb1e6cc7..600ad8490 100644 --- a/web/src/lib/api/index.ts +++ b/web/src/lib/api/index.ts @@ -9,6 +9,7 @@ import { PipelineLog, PipelineStep, PullRequest, + QueueInfo, Registry, Repo, RepoPermissions, @@ -256,6 +257,18 @@ export default class WoodpeckerClient extends ApiClient { return this._delete(`/api/agents/${agent.id}`); } + getQueueInfo(): Promise { + return this._get('/api/queue/info') as Promise; + } + + pauseQueue(): Promise { + return this._post('/api/queue/pause'); + } + + resumeQueue(): Promise { + return this._post('/api/queue/resume'); + } + getUsers(): Promise { return this._get('/api/users') as Promise; } diff --git a/web/src/lib/api/types/index.ts b/web/src/lib/api/types/index.ts index b827e4123..7918c24d7 100644 --- a/web/src/lib/api/types/index.ts +++ b/web/src/lib/api/types/index.ts @@ -4,6 +4,7 @@ export * from './org'; export * from './pipeline'; export * from './pipelineConfig'; export * from './pull_request'; +export * from './queue'; export * from './registry'; export * from './repo'; export * from './secret'; diff --git a/web/src/lib/api/types/queue.ts b/web/src/lib/api/types/queue.ts new file mode 100644 index 000000000..c7ee269ba --- /dev/null +++ b/web/src/lib/api/types/queue.ts @@ -0,0 +1,22 @@ +export type Task = { + id: number; + data: string; + labels: { [key: string]: string }; + dependencies: string[]; + dep_status: { [key: string]: string }; + run_on: string[]; +}; + +export type QueueInfo = { + pending: Task[]; + waiting_on_deps: Task[]; + running: Task[]; + stats: { + worker_count: number; + pending_count: number; + waiting_on_deps_count: number; + running_count: number; + completed_count: number; + }; + paused: boolean; +}; diff --git a/web/src/views/admin/AdminSettings.vue b/web/src/views/admin/AdminSettings.vue index d011e731c..07c9f228d 100644 --- a/web/src/views/admin/AdminSettings.vue +++ b/web/src/views/admin/AdminSettings.vue @@ -12,6 +12,9 @@ + + + @@ -21,6 +24,7 @@ import { useI18n } from 'vue-i18n'; import { useRouter } from 'vue-router'; import AdminAgentsTab from '~/components/admin/settings/AdminAgentsTab.vue'; +import AdminQueueTab from '~/components/admin/settings/AdminQueueTab.vue'; import AdminSecretsTab from '~/components/admin/settings/AdminSecretsTab.vue'; import AdminUsersTab from '~/components/admin/settings/AdminUsersTab.vue'; import Scaffold from '~/components/layout/scaffold/Scaffold.vue';