From 3ab579c03f30a6ad9babcf6a32ad5f0b594f5850 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 5 Nov 2024 04:03:40 +0100 Subject: [PATCH] Move Queue creation behind new func that evaluates queue type (#4252) --- cmd/server/setup.go | 18 +++++++++++------- server/queue/fifo.go | 4 ++-- server/queue/fifo_test.go | 28 ++++++++++++++-------------- server/queue/queue.go | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 23 deletions(-) diff --git a/cmd/server/setup.go b/cmd/server/setup.go index b46f1bd5c..c408e4023 100644 --- a/cmd/server/setup.go +++ b/cmd/server/setup.go @@ -103,8 +103,11 @@ func checkSqliteFileExist(path string) error { return err } -func setupQueue(ctx context.Context, s store.Store) queue.Queue { - return queue.WithTaskStore(ctx, queue.New(ctx), s) +func setupQueue(ctx context.Context, s store.Store) (queue.Queue, error) { + return queue.New(ctx, queue.Config{ + Backend: queue.TypeMemory, + Store: s, + }) } func setupMembershipService(_ context.Context, _store store.Store) cache.MembershipService { @@ -143,18 +146,19 @@ func setupJWTSecret(_store store.Store) (string, error) { return jwtSecret, nil } -func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) error { +func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err error) { // services - server.Config.Services.Queue = setupQueue(ctx, s) server.Config.Services.Logs = logging.New() server.Config.Services.Pubsub = pubsub.New() server.Config.Services.Membership = setupMembershipService(ctx, s) - serviceManager, err := services.NewManager(c, s, setup.Forge) + server.Config.Services.Queue, err = setupQueue(ctx, s) + if err != nil { + return fmt.Errorf("could not setup queue: %w", err) + } + server.Config.Services.Manager, err = services.NewManager(c, s, setup.Forge) if err != nil { return fmt.Errorf("could not setup service manager: %w", err) } - server.Config.Services.Manager = serviceManager - server.Config.Services.LogStore, err = setupLogStore(c, s) if err != nil { return fmt.Errorf("could not setup log store: %w", err) diff --git a/server/queue/fifo.go b/server/queue/fifo.go index f924055ad..d9acc66a4 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -59,8 +59,8 @@ const processTimeInterval = 100 * time.Millisecond var ErrWorkerKicked = fmt.Errorf("worker was kicked") -// New returns a new fifo queue. -func New(ctx context.Context) Queue { +// NewMemoryQueue returns a new fifo queue. +func NewMemoryQueue(ctx context.Context) Queue { q := &fifo{ ctx: ctx, workers: map[*worker]struct{}{}, diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index 01a790607..6c6d4252d 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -32,7 +32,7 @@ func TestFifo(t *testing.T) { want := &model.Task{ID: "1"} ctx := context.Background() - q := New(ctx) + q := NewMemoryQueue(ctx) assert.NoError(t, q.Push(ctx, want)) info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") @@ -55,7 +55,7 @@ func TestFifoExpire(t *testing.T) { want := &model.Task{ID: "1"} ctx, cancel := context.WithCancelCause(context.Background()) - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) q.extension = 0 assert.NoError(t, q.Push(ctx, want)) info := q.Info(ctx) @@ -78,7 +78,7 @@ func TestFifoWait(t *testing.T) { want := &model.Task{ID: "1"} ctx := context.Background() - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.Push(ctx, want)) got, err := q.Poll(ctx, 1, filterFnTrue) @@ -101,7 +101,7 @@ func TestFifoEvict(t *testing.T) { t1 := &model.Task{ID: "1"} ctx := context.Background() - q := New(ctx) + q := NewMemoryQueue(ctx) assert.NoError(t, q.Push(ctx, t1)) info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") @@ -125,7 +125,7 @@ func TestFifoDependencies(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1})) got, err := q.Poll(ctx, 1, filterFnTrue) @@ -158,7 +158,7 @@ func TestFifoErrors(t *testing.T) { RunOn: []string{"success", "failure"}, } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) got, err := q.Poll(ctx, 1, filterFnTrue) @@ -194,7 +194,7 @@ func TestFifoErrors2(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) for i := 0; i < 2; i++ { @@ -234,7 +234,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) obtainedWorkCh := make(chan *model.Task) @@ -314,7 +314,7 @@ func TestFifoTransitiveErrors(t *testing.T) { DepStatus: make(map[string]model.StatusValue), } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) got, err := q.Poll(ctx, 1, filterFnTrue) @@ -353,7 +353,7 @@ func TestFifoCancel(t *testing.T) { RunOn: []string{"success", "failure"}, } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) _, _ = q.Poll(ctx, 1, filterFnTrue) @@ -371,7 +371,7 @@ func TestFifoPause(t *testing.T) { ID: "1", } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) var wg sync.WaitGroup wg.Add(1) go func() { @@ -402,7 +402,7 @@ func TestFifoPauseResume(t *testing.T) { ID: "1", } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) q.Pause() assert.NoError(t, q.Push(ctx, task1)) q.Resume() @@ -429,7 +429,7 @@ func TestWaitingVsPending(t *testing.T) { RunOn: []string{"success", "failure"}, } - q, _ := New(ctx).(*fifo) + q, _ := NewMemoryQueue(ctx).(*fifo) assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) got, _ := q.Poll(ctx, 1, filterFnTrue) @@ -519,7 +519,7 @@ func TestShouldRun(t *testing.T) { func TestFifoWithScoring(t *testing.T) { ctx := context.Background() - q := New(ctx) + q := NewMemoryQueue(ctx) // Create tasks with different labels tasks := []*model.Task{ diff --git a/server/queue/queue.go b/server/queue/queue.go index 682d0e0ac..3301f7e95 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -17,9 +17,11 @@ package queue import ( "context" "errors" + "fmt" "strings" "go.woodpecker-ci.org/woodpecker/v2/server/model" + "go.woodpecker-ci.org/woodpecker/v2/server/store" ) var ( @@ -115,3 +117,33 @@ type Queue interface { // KickAgentWorkers kicks all workers for a given agent. KickAgentWorkers(agentID int64) } + +// Config holds the configuration for the queue. +type Config struct { + Backend Type + Store store.Store +} + +// Queue type. +type Type string + +const ( + TypeMemory Type = "memory" +) + +// New creates a new queue based on the provided configuration. +func New(ctx context.Context, config Config) (Queue, error) { + var q Queue + + switch config.Backend { + case TypeMemory: + q = NewMemoryQueue(ctx) + if config.Store != nil { + q = WithTaskStore(ctx, q, config.Store) + } + default: + return nil, fmt.Errorf("unsupported queue backend: %s", config.Backend) + } + + return q, nil +}