From 8bd0740604fe375eb7e5bbaf05cf1a87ed87fe37 Mon Sep 17 00:00:00 2001 From: qwerty287 <80460567+qwerty287@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:17:11 +0200 Subject: [PATCH] Simplify queue interface (#5449) --- go.mod | 2 +- server/queue/fifo.go | 13 ------------- server/queue/fifo_test.go | 36 ++++++------------------------------ server/queue/persistent.go | 24 ------------------------ server/queue/queue.go | 6 ------ 5 files changed, 7 insertions(+), 74 deletions(-) diff --git a/go.mod b/go.mod index 6f1a23c78b..75a24e910b 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,6 @@ require ( github.com/muesli/termenv v0.16.0 github.com/neticdk/go-bitbucket v1.0.3 github.com/oklog/ulid/v2 v2.1.1 - github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.0 github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.10.0 @@ -173,6 +172,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.65.0 // indirect diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 1ee07370de..0977e3dae2 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -74,14 +74,6 @@ func NewMemoryQueue(ctx context.Context) Queue { return q } -// Push pushes a task to the tail of this queue. -func (q *fifo) Push(_ context.Context, task *model.Task) error { - q.Lock() - q.pending.PushBack(task) - q.Unlock() - return nil -} - // PushAtOnce pushes multiple tasks to the tail of this queue. func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { q.Lock() @@ -153,11 +145,6 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e return nil } -// Evict removes a pending task from the queue. -func (q *fifo) Evict(ctx context.Context, taskID string) error { - return q.EvictAtOnce(ctx, []string{taskID}) -} - // EvictAtOnce removes multiple pending tasks from the queue. func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error { q.Lock() diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index a2d37d1523..4bfceb0386 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -45,7 +45,7 @@ func TestFifo(t *testing.T) { q := NewMemoryQueue(ctx) dummyTask := genDummyTask() - assert.NoError(t, q.Push(ctx, dummyTask)) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") @@ -77,7 +77,7 @@ func TestFifoExpire(t *testing.T) { dummyTask := genDummyTask() q.extension = 0 - assert.NoError(t, q.Push(ctx, dummyTask)) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") @@ -100,7 +100,7 @@ func TestFifoWait(t *testing.T) { dummyTask := genDummyTask() - assert.NoError(t, q.Push(ctx, dummyTask)) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) @@ -119,30 +119,6 @@ func TestFifoWait(t *testing.T) { wg.Wait() } -func TestFifoEvict(t *testing.T) { - ctx, cancel := context.WithCancelCause(t.Context()) - t.Cleanup(func() { cancel(nil) }) - - q := NewMemoryQueue(ctx) - dummyTask := genDummyTask() - - assert.NoError(t, q.Push(ctx, dummyTask)) - - waitForProcess() - info := q.Info(ctx) - assert.Len(t, info.Pending, 1, "expect task in pending queue") - - err := q.Evict(ctx, dummyTask.ID) - assert.NoError(t, err) - - waitForProcess() - info = q.Info(ctx) - assert.Len(t, info.Pending, 0) - - err = q.Evict(ctx, dummyTask.ID) - assert.ErrorIs(t, err, ErrNotFound) -} - func TestFifoDependencies(t *testing.T) { ctx, cancel := context.WithCancelCause(t.Context()) t.Cleanup(func() { cancel(nil) }) @@ -442,7 +418,7 @@ func TestFifoPause(t *testing.T) { q.Pause() t0 := time.Now() - assert.NoError(t, q.Push(ctx, dummyTask)) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() q.Resume() @@ -452,7 +428,7 @@ func TestFifoPause(t *testing.T) { assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume") q.Pause() - assert.NoError(t, q.Push(ctx, dummyTask)) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) q.Resume() _, _ = q.Poll(ctx, 1, filterFnTrue) } @@ -467,7 +443,7 @@ func TestFifoPauseResume(t *testing.T) { dummyTask := genDummyTask() q.Pause() - assert.NoError(t, q.Push(ctx, dummyTask)) + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) q.Resume() _, _ = q.Poll(ctx, 1, filterFnTrue) diff --git a/server/queue/persistent.go b/server/queue/persistent.go index 0f2ae4ae28..b5a887ed5f 100644 --- a/server/queue/persistent.go +++ b/server/queue/persistent.go @@ -18,7 +18,6 @@ package queue import ( "context" - "github.com/pkg/errors" "github.com/rs/zerolog/log" "go.woodpecker-ci.org/woodpecker/v3/server/model" @@ -40,20 +39,6 @@ type persistentQueue struct { store store.Store } -// Push pushes a task to the tail of this queue. -func (q *persistentQueue) Push(c context.Context, task *model.Task) error { - if err := q.store.TaskInsert(task); err != nil { - return err - } - err := q.Queue.Push(c, task) - if err != nil { - if err2 := q.store.TaskDelete(task.ID); err2 != nil { - err = errors.Wrapf(err, "delete task '%s' failed: %v", task.ID, err2) - } - } - return err -} - // PushAtOnce pushes multiple tasks to the tail of this queue. func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*model.Task) error { // TODO: invent store.NewSession who return context including a session and make TaskInsert & TaskDelete use it @@ -87,15 +72,6 @@ func (q *persistentQueue) Poll(c context.Context, agentID int64, f FilterFn) (*m return task, err } -// Evict removes a pending task from the queue. -func (q *persistentQueue) Evict(c context.Context, id string) error { - err := q.Queue.Evict(c, id) - if err == nil { - return q.store.TaskDelete(id) - } - return err -} - // EvictAtOnce removes multiple pending tasks from the queue. func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error { if err := q.Queue.EvictAtOnce(c, ids); err != nil { diff --git a/server/queue/queue.go b/server/queue/queue.go index a7f4865ac3..645ff2d9c2 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -77,9 +77,6 @@ type FilterFn func(*model.Task) (bool, int) // Queue defines a task queue for scheduling tasks among // a pool of workers. type Queue interface { - // Push pushes a task to the tail of this queue. - Push(c context.Context, task *model.Task) error - // PushAtOnce pushes multiple tasks to the tail of this queue. PushAtOnce(c context.Context, tasks []*model.Task) error @@ -98,9 +95,6 @@ type Queue interface { // ErrorAtOnce signals multiple done are complete with an error. ErrorAtOnce(c context.Context, ids []string, err error) error - // Evict removes a pending task from the queue. - Evict(c context.Context, id string) error - // EvictAtOnce removes multiple pending tasks from the queue. EvictAtOnce(c context.Context, ids []string) error