You've already forked woodpecker
mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-06-03 16:35:37 +02:00
Fix workflow beeing skipped and marked as failed when agent starts before server (#6361)
When the agent started before the server was available, it retried the connection as expected. However, once the server came up and a workflow was picked up, the pipeline would immediately fail without running any steps — the agent logs showed `workflow context done` firing instantly after `received execution`. The root cause was a package-level `shutdownCtx` shared across retry iterations. On each failed attempt, `stopAgentFunc` stamped it with a 5-second timeout — starting the clock immediately. By the time the agent successfully connected and received a workflow, `workflowCtx` was derived from this already-expired context, so execution failed before Docker even started a container. The fix removes the global mutable shutdown context and the `stopAgentFunc` indirection. Instead, `runner.Run()` no longer accepts a `shutdownCtx` parameter — it creates a fresh one locally only when needed for the `Done()` fallback call. The healthcheck server shutdown does the same. This makes the lifetime of each shutdown window explicit and local.
This commit is contained in:
+11
-3
@@ -33,6 +33,8 @@ import (
|
||||
"go.woodpecker-ci.org/woodpecker/v3/shared/utils"
|
||||
)
|
||||
|
||||
const shutdownTimeout = time.Second * 5
|
||||
|
||||
type Runner struct {
|
||||
client rpc.Peer
|
||||
filter rpc.Filter
|
||||
@@ -51,15 +53,19 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen
|
||||
}
|
||||
}
|
||||
|
||||
func GetShutdownContext() (context.Context, context.CancelFunc) {
|
||||
return context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
}
|
||||
|
||||
// TODO: refactor this big function into subfunctions in it's own subpackage
|
||||
|
||||
// Run executes a workflow using a backend, tracks its state and reports the state back to the server.
|
||||
func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
|
||||
func (r *Runner) Run(runnerCtx context.Context) error {
|
||||
log.Debug().Msg("request next execution")
|
||||
|
||||
// Preserve metadata AND cancellation from runnerCtx.
|
||||
meta, _ := metadata.FromOutgoingContext(runnerCtx)
|
||||
ctxMeta := metadata.NewOutgoingContext(shutdownCtx, meta)
|
||||
ctxMeta := metadata.NewOutgoingContext(runnerCtx, meta)
|
||||
|
||||
// Fetch next workflow from the queue
|
||||
workflow, err := r.client.Next(runnerCtx, r.filter)
|
||||
@@ -192,8 +198,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
|
||||
logger.Debug().Msg("logs and traces uploaded")
|
||||
|
||||
// Update workflow state
|
||||
doneCtx := runnerCtx
|
||||
doneCtx := runnerCtx //nolint:contextcheck
|
||||
if doneCtx.Err() != nil {
|
||||
shutdownCtx, shutdownCtxCancel := GetShutdownContext()
|
||||
defer shutdownCtxCancel()
|
||||
doneCtx = shutdownCtx
|
||||
}
|
||||
|
||||
|
||||
+9
-24
@@ -54,33 +54,14 @@ const (
|
||||
authInterceptorRefreshInterval = time.Minute * 30
|
||||
)
|
||||
|
||||
const (
|
||||
shutdownTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
var (
|
||||
stopAgentFunc context.CancelCauseFunc = func(error) {}
|
||||
shutdownCancelFunc context.CancelFunc = func() {}
|
||||
shutdownCtx = context.Background()
|
||||
)
|
||||
|
||||
func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
|
||||
log.Info().Str("version", version.String()).Msg("Starting Woodpecker agent")
|
||||
|
||||
agentCtx, ctxCancel := context.WithCancelCause(ctx)
|
||||
stopAgentFunc = func(err error) {
|
||||
msg := "shutdown of whole agent"
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg(msg)
|
||||
} else {
|
||||
log.Info().Msg(msg)
|
||||
}
|
||||
stopAgentFunc = func(error) {}
|
||||
shutdownCtx, shutdownCancelFunc = context.WithTimeout(shutdownCtx, shutdownTimeout)
|
||||
ctxCancel(err)
|
||||
}
|
||||
defer stopAgentFunc(nil)
|
||||
defer shutdownCancelFunc()
|
||||
defer func() {
|
||||
log.Info().Msg("shutdown of whole agent")
|
||||
ctxCancel(nil)
|
||||
}()
|
||||
|
||||
serviceWaitingGroup := errgroup.Group{}
|
||||
|
||||
@@ -107,6 +88,10 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
|
||||
go func() {
|
||||
<-agentCtx.Done()
|
||||
log.Info().Msg("shutdown healthcheck server ...")
|
||||
|
||||
shutdownCtx, shutdownCtxCancel := agent.GetShutdownContext()
|
||||
defer shutdownCtxCancel()
|
||||
|
||||
if err := server.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
|
||||
log.Error().Err(err).Msg("shutdown healthcheck server failed")
|
||||
} else {
|
||||
@@ -302,7 +287,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
|
||||
}
|
||||
|
||||
log.Debug().Msg("polling new workflow")
|
||||
if err := runner.Run(agentCtx, shutdownCtx); err != nil {
|
||||
if err := runner.Run(agentCtx); err != nil {
|
||||
if singleWorkflow {
|
||||
log.Error().Err(err).Msg("runner done with error")
|
||||
ctxCancel(nil)
|
||||
|
||||
Reference in New Issue
Block a user