diff --git a/agent/runner.go b/agent/runner.go index 0024b7888..27363aa66 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -28,6 +28,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline" backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" + "go.woodpecker-ci.org/woodpecker/v2/shared/constant" "go.woodpecker-ci.org/woodpecker/v2/shared/utils" ) @@ -118,7 +119,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co logger.Debug().Msg("pipeline done") return - case <-time.After(time.Minute): + case <-time.After(constant.TaskTimeout / 3): logger.Debug().Msg("pipeline lease renewed") if err := r.client.Extend(workflowCtx, workflow.ID); err != nil { log.Error().Err(err).Msg("extending pipeline deadline failed") diff --git a/server/queue/fifo.go b/server/queue/fifo.go index d6fe664c9..5e5a17edd 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -24,6 +24,7 @@ import ( "github.com/rs/zerolog/log" "go.woodpecker-ci.org/woodpecker/v2/server/model" + "go.woodpecker-ci.org/woodpecker/v2/shared/constant" ) type entry struct { @@ -43,6 +44,7 @@ type worker struct { type fifo struct { sync.Mutex + ctx context.Context workers map[*worker]struct{} running map[string]*entry pending *list.List @@ -51,18 +53,23 @@ type fifo struct { paused bool } +// processTimeInterval is the time till the queue rearranges things, +// as the agent pull in 10 milliseconds we should also give them work asap. +const processTimeInterval = 100 * time.Millisecond + // New returns a new fifo queue. -// -//nolint:mnd -func New(_ context.Context) Queue { - return &fifo{ +func New(ctx context.Context) Queue { + q := &fifo{ + ctx: ctx, workers: map[*worker]struct{}{}, running: map[string]*entry{}, pending: list.New(), waitingOnDeps: list.New(), - extension: time.Minute * 10, + extension: constant.TaskTimeout, paused: false, } + go q.process() + return q } // Push pushes a task to the tail of this queue. @@ -70,7 +77,6 @@ func (q *fifo) Push(_ context.Context, task *model.Task) error { q.Lock() q.pending.PushBack(task) q.Unlock() - go q.process() return nil } @@ -81,7 +87,6 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { q.pending.PushBack(task) } q.Unlock() - go q.process() return nil } @@ -98,7 +103,6 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, } q.workers[w] = struct{}{} q.Unlock() - go q.process() for { select { @@ -237,7 +241,6 @@ func (q *fifo) Resume() { q.Lock() q.paused = false q.Unlock() - go q.process() } // KickAgentWorkers kicks all workers for a given agent. @@ -254,28 +257,36 @@ func (q *fifo) KickAgentWorkers(agentID int64) { } // helper function that loops through the queue and attempts to -// match the item to a single subscriber. +// match the item to a single subscriber until context got cancel. func (q *fifo) process() { - q.Lock() - defer q.Unlock() - - if q.paused { - return - } - - q.resubmitExpiredPipelines() - q.filterWaiting() - for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { - task, _ := pending.Value.(*model.Task) - task.AgentID = worker.agentID - delete(q.workers, worker) - q.pending.Remove(pending) - q.running[task.ID] = &entry{ - item: task, - done: make(chan bool), - deadline: time.Now().Add(q.extension), + for { + select { + case <-time.After(processTimeInterval): + case <-q.ctx.Done(): + return } - worker.channel <- task + + q.Lock() + if q.paused { + q.Unlock() + continue + } + + q.resubmitExpiredPipelines() + q.filterWaiting() + for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { + task, _ := pending.Value.(*model.Task) + task.AgentID = worker.agentID + delete(q.workers, worker) + q.pending.Remove(pending) + q.running[task.ID] = &entry{ + item: task, + done: make(chan bool), + deadline: time.Now().Add(q.extension), + } + worker.channel <- task + } + q.Unlock() } } diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index d1d3a2ebc..0de92e3f2 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -52,17 +52,23 @@ func TestFifo(t *testing.T) { func TestFifoExpire(t *testing.T) { want := &model.Task{ID: "1"} + ctx, cancel := context.WithCancelCause(context.Background()) - q, _ := New(context.Background()).(*fifo) + q, _ := New(ctx).(*fifo) q.extension = 0 - assert.NoError(t, q.Push(noContext, want)) - info := q.Info(noContext) + assert.NoError(t, q.Push(ctx, want)) + info := q.Info(ctx) assert.Len(t, info.Pending, 1, "expect task in pending queue") - got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) + got, err := q.Poll(ctx, 1, func(*model.Task) bool { return true }) assert.NoError(t, err) assert.Equal(t, want, got) + // cancel the context to let the process func end + go func() { + time.Sleep(time.Millisecond) + cancel(nil) + }() q.process() assert.Len(t, info.Pending, 1, "expect task re-added to pending queue") } diff --git a/shared/constant/constant.go b/shared/constant/constant.go index 1d060bfeb..f2849964e 100644 --- a/shared/constant/constant.go +++ b/shared/constant/constant.go @@ -14,6 +14,8 @@ package constant +import "time" + // DefaultConfigOrder represent the priority in witch woodpecker search for a pipeline config by default // folders are indicated by supplying a trailing slash. var DefaultConfigOrder = [...]string{ @@ -34,3 +36,6 @@ var TrustedClonePlugins = []string{ "docker.io/woodpeckerci/plugin-git", "quay.io/woodpeckerci/plugin-git", } + +// TaskTimeout is the time till a running task is counted as dead. +var TaskTimeout = time.Minute