1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2024-11-24 08:02:18 +02:00

Restart tasks on dead agents sooner (#4114)

This commit is contained in:
6543 2024-09-20 09:40:48 +02:00 committed by GitHub
parent cfe6df5add
commit eebaa10d10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 57 additions and 34 deletions

View File

@ -28,6 +28,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline" "go.woodpecker-ci.org/woodpecker/v2/pipeline"
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/shared/constant"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils" "go.woodpecker-ci.org/woodpecker/v2/shared/utils"
) )
@ -118,7 +119,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
logger.Debug().Msg("pipeline done") logger.Debug().Msg("pipeline done")
return return
case <-time.After(time.Minute): case <-time.After(constant.TaskTimeout / 3):
logger.Debug().Msg("pipeline lease renewed") logger.Debug().Msg("pipeline lease renewed")
if err := r.client.Extend(workflowCtx, workflow.ID); err != nil { if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed") log.Error().Err(err).Msg("extending pipeline deadline failed")

View File

@ -24,6 +24,7 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/model"
"go.woodpecker-ci.org/woodpecker/v2/shared/constant"
) )
type entry struct { type entry struct {
@ -43,6 +44,7 @@ type worker struct {
type fifo struct { type fifo struct {
sync.Mutex sync.Mutex
ctx context.Context
workers map[*worker]struct{} workers map[*worker]struct{}
running map[string]*entry running map[string]*entry
pending *list.List pending *list.List
@ -51,18 +53,23 @@ type fifo struct {
paused bool paused bool
} }
// processTimeInterval is the time till the queue rearranges things,
// as the agent pull in 10 milliseconds we should also give them work asap.
const processTimeInterval = 100 * time.Millisecond
// New returns a new fifo queue. // New returns a new fifo queue.
// func New(ctx context.Context) Queue {
//nolint:mnd q := &fifo{
func New(_ context.Context) Queue { ctx: ctx,
return &fifo{
workers: map[*worker]struct{}{}, workers: map[*worker]struct{}{},
running: map[string]*entry{}, running: map[string]*entry{},
pending: list.New(), pending: list.New(),
waitingOnDeps: list.New(), waitingOnDeps: list.New(),
extension: time.Minute * 10, extension: constant.TaskTimeout,
paused: false, paused: false,
} }
go q.process()
return q
} }
// Push pushes a task to the tail of this queue. // Push pushes a task to the tail of this queue.
@ -70,7 +77,6 @@ func (q *fifo) Push(_ context.Context, task *model.Task) error {
q.Lock() q.Lock()
q.pending.PushBack(task) q.pending.PushBack(task)
q.Unlock() q.Unlock()
go q.process()
return nil return nil
} }
@ -81,7 +87,6 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
q.pending.PushBack(task) q.pending.PushBack(task)
} }
q.Unlock() q.Unlock()
go q.process()
return nil return nil
} }
@ -98,7 +103,6 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task,
} }
q.workers[w] = struct{}{} q.workers[w] = struct{}{}
q.Unlock() q.Unlock()
go q.process()
for { for {
select { select {
@ -237,7 +241,6 @@ func (q *fifo) Resume() {
q.Lock() q.Lock()
q.paused = false q.paused = false
q.Unlock() q.Unlock()
go q.process()
} }
// KickAgentWorkers kicks all workers for a given agent. // KickAgentWorkers kicks all workers for a given agent.
@ -254,28 +257,36 @@ func (q *fifo) KickAgentWorkers(agentID int64) {
} }
// helper function that loops through the queue and attempts to // helper function that loops through the queue and attempts to
// match the item to a single subscriber. // match the item to a single subscriber until context got cancel.
func (q *fifo) process() { func (q *fifo) process() {
q.Lock() for {
defer q.Unlock() select {
case <-time.After(processTimeInterval):
if q.paused { case <-q.ctx.Done():
return return
}
q.resubmitExpiredPipelines()
q.filterWaiting()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task, _ := pending.Value.(*model.Task)
task.AgentID = worker.agentID
delete(q.workers, worker)
q.pending.Remove(pending)
q.running[task.ID] = &entry{
item: task,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
} }
worker.channel <- task
q.Lock()
if q.paused {
q.Unlock()
continue
}
q.resubmitExpiredPipelines()
q.filterWaiting()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task, _ := pending.Value.(*model.Task)
task.AgentID = worker.agentID
delete(q.workers, worker)
q.pending.Remove(pending)
q.running[task.ID] = &entry{
item: task,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
}
worker.channel <- task
}
q.Unlock()
} }
} }

View File

@ -52,17 +52,23 @@ func TestFifo(t *testing.T) {
func TestFifoExpire(t *testing.T) { func TestFifoExpire(t *testing.T) {
want := &model.Task{ID: "1"} want := &model.Task{ID: "1"}
ctx, cancel := context.WithCancelCause(context.Background())
q, _ := New(context.Background()).(*fifo) q, _ := New(ctx).(*fifo)
q.extension = 0 q.extension = 0
assert.NoError(t, q.Push(noContext, want)) assert.NoError(t, q.Push(ctx, want))
info := q.Info(noContext) info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue") assert.Len(t, info.Pending, 1, "expect task in pending queue")
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true }) got, err := q.Poll(ctx, 1, func(*model.Task) bool { return true })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, want, got) assert.Equal(t, want, got)
// cancel the context to let the process func end
go func() {
time.Sleep(time.Millisecond)
cancel(nil)
}()
q.process() q.process()
assert.Len(t, info.Pending, 1, "expect task re-added to pending queue") assert.Len(t, info.Pending, 1, "expect task re-added to pending queue")
} }

View File

@ -14,6 +14,8 @@
package constant package constant
import "time"
// DefaultConfigOrder represent the priority in witch woodpecker search for a pipeline config by default // DefaultConfigOrder represent the priority in witch woodpecker search for a pipeline config by default
// folders are indicated by supplying a trailing slash. // folders are indicated by supplying a trailing slash.
var DefaultConfigOrder = [...]string{ var DefaultConfigOrder = [...]string{
@ -34,3 +36,6 @@ var TrustedClonePlugins = []string{
"docker.io/woodpeckerci/plugin-git", "docker.io/woodpeckerci/plugin-git",
"quay.io/woodpeckerci/plugin-git", "quay.io/woodpeckerci/plugin-git",
} }
// TaskTimeout is the time till a running task is counted as dead.
var TaskTimeout = time.Minute