1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2026-06-03 16:35:37 +02:00

Refactor pipeline runtime code (#6166)

Co-authored-by: Anbraten <6918444+anbraten@users.noreply.github.com>
This commit is contained in:
6543
2026-03-30 10:24:36 +02:00
committed by GitHub
parent a8f803435b
commit fde1d917e2
8 changed files with 1752 additions and 306 deletions
@@ -86,3 +86,13 @@ Each subcommand is organized into its own package under `cli/<subcommand>/`.
The `cli/exec` subcommand allows local pipeline execution for testing and development by combining pipeline parsing and execution without requiring a running server or agent.
- `../` = `cli/`
### Engine
The engine is the shared kernel that validates, parses frontend facing config files, enrich it by the provided forge metadata and produce config for the backends to execute on based on that. It also contains the default backend implementations.
#### Runtime
The runtime is the package controlling how a workflow is executed, and can be found at `pipeline/runtime`.
<img src="/svg/woodpecker-workflow-run-flowchart.svg" alt="Pipeline/runtime flow diagram" style="max-width: 600px; width: 100%;" />
File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 54 KiB

-302
View File
@@ -1,302 +0,0 @@
// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runtime
import (
"context"
"errors"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
backend_types "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/state"
)
// Run starts the execution of a workflow and waits for it to complete.
func (r *Runtime) Run(runnerCtx context.Context) error {
logger := r.makeLogger()
logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages))
for stagePos, stage := range r.spec.Stages {
stepNames := []string{}
for _, step := range stage.Steps {
stepNames = append(stepNames, step.Name)
}
logger.Debug().
Int("StagePos", stagePos).
Str("Steps", strings.Join(stepNames, ",")).
Msg("stage")
}
defer func() {
ctx := runnerCtx //nolint:contextcheck
if ctx.Err() != nil {
ctx = GetShutdownCtx()
}
if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil {
logger.Error().Err(err).Msg("could not destroy engine")
}
}()
r.started = time.Now().Unix()
if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil {
var stepErr *pipeline_errors.ErrInvalidWorkflowSetup
if errors.As(err, &stepErr) {
state := new(state.State)
state.CurrStep = stepErr.Step
state.Workflow.Error = stepErr.Err
state.CurrStepState = backend_types.State{
Error: stepErr.Err,
Exited: true,
ExitCode: 1,
}
// Trace the error if we have a tracer
if r.tracer != nil {
if err := r.tracer.Trace(state); err != nil {
logger.Error().Err(err).Msg("failed to trace step error")
}
}
}
return err
}
for _, stage := range r.spec.Stages {
select {
case <-r.ctx.Done():
return pipeline_errors.ErrCancel
case err := <-r.execAll(runnerCtx, stage.Steps):
if err != nil {
r.err.Set(err)
}
}
}
return r.err.Get()
}
// Updates the current status of a step.
// If processState is nil, we assume the step did not start.
// If step did not started and err exists, it's a step start issue and step is done.
func (r *Runtime) traceStep(processState *backend_types.State, err error, step *backend_types.Step) error {
if r.tracer == nil {
// no tracer nothing to trace :)
return nil
}
state := new(state.State)
state.Workflow.Started = r.started
state.CurrStep = step
state.Workflow.Error = r.err.Get()
// We have an error while starting the step
if processState == nil && err != nil {
state.CurrStepState = backend_types.State{
Error: err,
Exited: true,
OOMKilled: false,
}
} else if processState != nil {
state.CurrStepState = *processState
}
if traceErr := r.tracer.Trace(state); traceErr != nil {
return traceErr
}
return err
}
// Executes a set of parallel steps.
func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend_types.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.makeLogger()
for _, step := range steps {
// Required since otherwise the loop variable
// will be captured by the function. This will
// recreate the step "variable"
step := step
g.Go(func() error {
// Case the pipeline was already complete.
logger.Debug().
Str("step", step.Name).
Msg("prepare")
rErr := r.err.Get()
if rErr != nil && !step.OnFailure {
logger.Debug().
Str("step", step.Name).
Msgf("skipped due to OnFailure=%t", step.OnFailure)
return r.traceStep(&backend_types.State{Skipped: true}, nil, step)
}
if rErr == nil && !step.OnSuccess {
logger.Debug().
Str("step", step.Name).
Msgf("skipped due to OnSuccess=%t", step.OnSuccess)
return r.traceStep(&backend_types.State{Skipped: true}, nil, step)
}
// Trace started.
err := r.traceStep(nil, nil, step)
if err != nil {
return err
}
// Add compatibility environment variables for drone-ci plugins.
if step.Type == backend_types.StepTypePlugin {
metadata.SetDroneEnviron(step.Environment)
}
logger.Debug().
Str("step", step.Name).
Msg("executing")
// setup exec func in a way it can be detached if needed
// wg will signal once
execAndTrace := func(wg *sync.WaitGroup) error {
processState, err := r.exec(runnerCtx, step, wg)
logger.Debug().
Str("step", step.Name).
Msg("complete")
// normalize context cancel error
if errors.Is(err, context.Canceled) {
err = pipeline_errors.ErrCancel
}
// Return the error after tracing it.
err = r.traceStep(processState, err, step)
if err != nil && step.Failure == metadata.FailureIgnore {
return nil
}
return err
}
// Report all errors until the setup happened.
// Afterwards errors will be dropped.
if step.Detached {
var wg sync.WaitGroup
wg.Add(1)
var setupErr error
go func() {
setupErr = execAndTrace(&wg)
}()
wg.Wait()
return setupErr
}
// run blocking
return execAndTrace(nil)
})
}
go func() {
done <- g.Wait()
close(done)
}()
return done
}
// Executes the step and returns the state and error.
func (r *Runtime) exec(runnerCtx context.Context, step *backend_types.Step, setupWg *sync.WaitGroup) (*backend_types.State, error) {
defer func() {
if setupWg != nil {
setupWg.Done()
}
}()
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck
return nil, err
}
startTime := time.Now().Unix()
logger := r.makeLogger()
var wg sync.WaitGroup
if r.logger != nil {
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil {
return nil, err
}
wg.Add(1)
go func() {
defer wg.Done()
if err := r.logger(step, rc); err != nil {
logger.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()
}()
}
// nothing else to block for detached process.
if setupWg != nil {
setupWg.Done()
// set to nil so the setupWg.Done in defer does not call it a second time
setupWg = nil
}
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil {
if errors.Is(err, context.Canceled) {
if waitState == nil {
waitState = &backend_types.State{}
}
waitState.Error = pipeline_errors.ErrCancel
} else {
return nil, err
}
}
// It is important to use the runnerCtx here because
// in case the workflow was canceled we still have the docker daemon to stop the container.
if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil {
return nil, err
}
// we update with our start time here
waitState.Started = startTime
// we handle cancel case
if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) {
waitState.Error = pipeline_errors.ErrCancel
}
if waitState.OOMKilled {
return waitState, &pipeline_errors.OomError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
} else if waitState.ExitCode != 0 {
return waitState, &pipeline_errors.ExitError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
}
return waitState, nil
}
+18 -4
View File
@@ -271,6 +271,11 @@ func TestWorkflowBuildFailSkipsSubsequentStages(t *testing.T) {
buildTrace := findLastTraceByName(traces, "build")
require.NotNil(t, buildTrace, "build step should fail")
assert.EqualValues(t, 1, buildTrace.CurrStepState.ExitCode)
assert.True(t, buildTrace.CurrStepState.Exited, "build should have started")
buildTrace = findLastTraceByName(traces, "build")
require.NotNil(t, buildTrace, "build step should fail")
assert.EqualValues(t, 1, buildTrace.CurrStepState.ExitCode)
deployTrace := findLastTraceByName(traces, "deploy")
require.NotNil(t, deployTrace, "deploy step should still be traced")
@@ -386,6 +391,7 @@ func TestWorkflowFailureIgnoreDoesNotSetWorkflowError(t *testing.T) {
func TestWorkflowPluginStep(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{
@@ -394,11 +400,21 @@ func TestWorkflowPluginStep(t *testing.T) {
},
},
dummy.New(),
WithTracer(newTestTracer(t)),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)
assert.NoError(t, r.Run(t.Context()))
lastPluginTrace := findLastTraceByName(getTracerStates(tracer), "publish")
if assert.NotNil(t, lastPluginTrace) {
assert.EqualValues(t, map[string]string{
"DRONE_BUILD_STATUS": "success",
"DRONE_REPO_SCM": "git",
"EXPECT_TYPE": "plugin",
"PULLREQUEST_DRONE_PULL_REQUEST": "0",
}, lastPluginTrace.CurrStep.Environment)
}
}
func TestWorkflowOOMKilledStep(t *testing.T) {
@@ -591,6 +607,7 @@ func TestWorkflowServiceWithParallelBuildAndOnFailure(t *testing.T) {
assert.Error(t, err)
traces := getTracerStates(tracer)
assert.NotNil(t, findStartedTrace(traces, "notify"), "notify (OnFailure) should have started")
notifyTrace := findLastTraceByName(traces, "notify")
require.NotNil(t, notifyTrace)
assert.True(t, notifyTrace.CurrStepState.Exited, "notify should exited")
@@ -604,9 +621,6 @@ func TestWorkflowServiceWithParallelBuildAndOnFailure(t *testing.T) {
deployTrace := findFirstTraceByName(traces, "deploy")
require.NotNil(t, deployTrace)
assert.True(t, deployTrace.CurrStepState.Skipped, "deploy should be skipped after lint failure")
assert.NotNil(t, findStartedTrace(traces, "notify"),
"notify (OnFailure) should have started")
}
func TestWorkflowIgnoredFailureFollowedByOnFailureStep(t *testing.T) {
+260
View File
@@ -0,0 +1,260 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runtime
import (
"context"
"errors"
"sync"
"time"
backend_types "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/state"
)
// executeStep is the single entry point called per step from runStage.
// It checks whether the step should be skipped, emits a "started" trace,
// sets up drone-compat env vars, then hands off to blocking or detached execution.
func (r *Runtime) executeStep(runnerCtx context.Context, step *backend_types.Step) error {
logger := r.makeLogger()
logger.Debug().Str("step", step.Name).Msg("prepare")
if r.shouldSkipStep(step) {
// Trace the skip so the server marks the step as skipped immediately,
// rather than leaving it in "pending" until workflow Done.
return r.traceStep(&backend_types.State{Skipped: true}, nil, step)
}
// Emit a "step started" trace before doing any real work.
if err := r.traceStep(nil, nil, step); err != nil {
return err
}
// Add compatibility environment variables for drone-ci plugins.
if step.Type == backend_types.StepTypePlugin {
metadata.SetDroneEnviron(step.Environment)
}
logger.Debug().Str("step", step.Name).Msg("executing")
if step.Detached {
return r.runDetachedStep(runnerCtx, step)
}
return r.runBlockingStep(runnerCtx, step)
}
// shouldSkipStep returns true when the step should not run based on the current
// pipeline error state and the step's OnSuccess / OnFailure flags.
// It logs the reason for skipping before returning.
func (r *Runtime) shouldSkipStep(step *backend_types.Step) bool {
logger := r.makeLogger()
currentErr := r.err.Get()
if currentErr != nil && !step.OnFailure {
logger.Debug().
Str("step", step.Name).
Err(currentErr).
Msgf("skipped due to OnFailure=%t", step.OnFailure)
return true
}
if currentErr == nil && !step.OnSuccess {
logger.Debug().
Str("step", step.Name).
Msgf("skipped due to OnSuccess=%t", step.OnSuccess)
return true
}
return false
}
// startStep starts the step container and spawns a goroutine to stream its logs.
// It returns:
// - waitForLogs: must be called before WaitStep — it blocks until the log stream
// is fully drained. Some backends (e.g. local) close the log stream when
// WaitStep is called, so draining first is required.
// - startTime: unix timestamp recorded right after the container started, used
// later to fill waitState.Started.
//
// If StartStep or TailStep fail, startStep returns a non-nil error and the caller
// must not call waitForLogs.
func (r *Runtime) startStep(step *backend_types.Step) (func(), int64, error) {
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil {
return nil, 0, err
}
startTime := time.Now().Unix()
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID)
if err != nil {
return nil, 0, err
}
var wg sync.WaitGroup
wg.Go(func() {
logger := r.makeLogger()
if err := r.logger(step, rc); err != nil {
logger.Error().Err(err).Str("step", step.Name).Msg("step log streaming failed")
}
_ = rc.Close()
})
return wg.Wait, startTime, nil
}
// completeStep drains the log stream, waits for the process to exit, destroys
// the container, and maps exit conditions (OOM kill, non-zero exit code, context
// cancellation) to typed errors.
//
// The runnerCtx is intentionally used for DestroyStep so that container cleanup can
// still reach the backend even after the workflow context (r.ctx) is canceled.
func (r *Runtime) completeStep(runnerCtx context.Context, step *backend_types.Step, waitForLogs func(), startTime int64) (*backend_types.State, error) {
// Drain the log stream before waiting on the process exit.
waitForLogs()
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck
if err != nil {
if errors.Is(err, context.Canceled) {
if waitState == nil {
waitState = &backend_types.State{}
}
waitState.Error = pipeline_errors.ErrCancel
} else {
return nil, err
}
}
// Use runnerCtx here: the workflow context may already be canceled but we
// still need to reach the backend to stop/remove the container.
if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil {
return nil, err
}
waitState.Started = startTime
// Re-check context cancellation: the wait may have raced with cancellation.
if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) {
waitState.Error = pipeline_errors.ErrCancel
}
if waitState.OOMKilled {
return waitState, &pipeline_errors.OomError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
}
if waitState.ExitCode != 0 {
return waitState, &pipeline_errors.ExitError{
UUID: step.UUID,
Code: waitState.ExitCode,
}
}
return waitState, nil
}
// runBlockingStep starts the step and blocks until it fully completes.
// The error is traced and returned to runStage, which feeds it into the
// stage error group.
func (r *Runtime) runBlockingStep(runnerCtx context.Context, step *backend_types.Step) error {
logger := r.makeLogger()
waitForLogs, startTime, err := r.startStep(step)
if err != nil {
// The step never ran — trace the start failure and surface it.
return r.traceStep(nil, err, step)
}
processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime)
logger.Debug().Str("step", step.Name).Msg("complete")
if errors.Is(err, context.Canceled) {
err = pipeline_errors.ErrCancel
}
err = r.traceStep(processState, err, step)
if err != nil && step.Failure == metadata.FailureIgnore {
return nil
}
return err
}
// runDetachedStep starts the step and returns as soon as the container is running
// and log streaming is set up. The rest of the step lifecycle runs in the background.
//
// Any error that occurs after setup is logged but not propagated — it cannot
// influence the pipeline outcome at that point.
func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types.Step) error {
waitForLogs, startTime, err := r.startStep(step)
if err != nil {
// Setup failed before the container was running — treat it like a
// blocking failure so the pipeline is aware.
return r.traceStep(nil, err, step)
}
// Container is up and logging is streaming — hand off to background.
go func() {
logger := r.makeLogger()
processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime)
logger.Debug().Str("step", step.Name).Msg("complete")
if errors.Is(err, context.Canceled) {
err = pipeline_errors.ErrCancel
}
if err != nil {
logger.Error().Err(err).Str("step", step.Name).Msg("detached step failed after while running")
}
if traceErr := r.traceStep(processState, err, step); traceErr != nil {
logger.Error().Err(traceErr).Str("step", step.Name).Msg("failed to trace detached step result")
}
}()
return nil
}
// traceStep reports the current state of a step to the tracer.
//
// - processState == nil, err == nil → step is being marked as started
// - processState == nil, err != nil → step failed to start
// - processState != nil → step has finished (err may or may not be set)
//
// Always returns err unchanged so callers can write: return r.traceStep(state, err, step).
func (r *Runtime) traceStep(processState *backend_types.State, err error, step *backend_types.Step) error {
s := new(state.State)
s.Workflow.Started = r.started
s.CurrStep = step
s.Workflow.Error = r.err.Get()
switch {
case processState == nil && err != nil:
// Step failed to start — create an dummy exited process state.
s.CurrStepState = backend_types.State{
Error: err,
Exited: true,
OOMKilled: false,
}
case processState != nil:
s.CurrStepState = *processState
// processState == nil && err == nil: step just started, leave s.CurrStepState zero-valued.
}
if traceErr := r.tracer.Trace(s); traceErr != nil {
return traceErr
}
return err
}
+705
View File
@@ -0,0 +1,705 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build test
package runtime
import (
"context"
"errors"
"io"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/dummy"
backend_types "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types/mocks"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/logging"
tracer_mocks "go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing/mocks"
)
const testWorkflowID = "WID_test"
// newDummyRuntime creates a Runtime backed by the dummy engine with a pre-setup
// workflow so individual step methods can be tested in isolation.
func newDummyRuntime(t *testing.T, tracer *tracer_mocks.MockTracer) *Runtime {
t.Helper()
engine := dummy.New()
r := New(&backend_types.Config{}, engine,
WithTracer(tracer),
WithTaskUUID(testWorkflowID),
WithLogger(newTestLogger(t)),
)
require.NoError(t, engine.SetupWorkflow(t.Context(), nil, testWorkflowID))
return r
}
func dummyStep(name string) *backend_types.Step {
return &backend_types.Step{
Name: name,
UUID: name + "-uuid",
Type: backend_types.StepTypeCommands,
OnSuccess: true,
OnFailure: false,
Environment: map[string]string{},
Commands: []string{"echo hello"},
}
}
func TestShouldSkipStep(t *testing.T) {
t.Parallel()
t.Run("NoErrorOnSuccessTrue", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
step := &backend_types.Step{Name: "s", OnSuccess: true, OnFailure: false}
assert.False(t, r.shouldSkipStep(step))
})
t.Run("NoErrorOnSuccessFalse", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
step := &backend_types.Step{Name: "s", OnSuccess: false, OnFailure: true}
assert.True(t, r.shouldSkipStep(step))
})
t.Run("ErrorOnFailureTrue", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
r.err.Set(errors.New("previous failure"))
step := &backend_types.Step{Name: "s", OnSuccess: false, OnFailure: true}
assert.False(t, r.shouldSkipStep(step))
})
t.Run("ErrorOnFailureFalse", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
r.err.Set(errors.New("previous failure"))
step := &backend_types.Step{Name: "s", OnSuccess: true, OnFailure: false}
assert.True(t, r.shouldSkipStep(step))
})
}
func TestTraceStep(t *testing.T) {
t.Parallel()
t.Run("StepStarted", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
r.started = 1000
step := dummyStep("s1")
err := r.traceStep(nil, nil, step)
assert.NoError(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.Equal(t, int64(1000), calls[0].Workflow.Started)
assert.Equal(t, step, calls[0].CurrStep)
assert.False(t, calls[0].CurrStepState.Exited)
})
t.Run("StepFailedToStart", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
step := dummyStep("s1")
startErr := errors.New("image pull failed")
err := r.traceStep(nil, startErr, step)
assert.ErrorIs(t, err, startErr)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.True(t, calls[0].CurrStepState.Exited)
assert.Equal(t, startErr, calls[0].CurrStepState.Error)
})
t.Run("StepFinished", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
step := dummyStep("s1")
ps := &backend_types.State{Exited: true, ExitCode: 0, Started: 42}
err := r.traceStep(ps, nil, step)
assert.NoError(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.True(t, calls[0].CurrStepState.Exited)
assert.Equal(t, 0, calls[0].CurrStepState.ExitCode)
assert.Equal(t, int64(42), calls[0].CurrStepState.Started)
})
t.Run("StepSkipped", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
step := dummyStep("s1")
ps := &backend_types.State{Exited: true, Skipped: true}
err := r.traceStep(ps, nil, step)
assert.NoError(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.True(t, calls[0].CurrStepState.Skipped)
assert.True(t, calls[0].CurrStepState.Exited)
})
t.Run("TracerError", func(t *testing.T) {
t.Parallel()
traceErr := errors.New("tracer unavailable")
tracer := tracer_mocks.NewMockTracer(t)
tracer.On("Trace", mock.Anything).Return(traceErr).Maybe()
r := newDummyRuntime(t, tracer)
err := r.traceStep(nil, nil, dummyStep("s1"))
assert.ErrorIs(t, err, traceErr)
})
t.Run("PipelineErrorPropagated", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
r.err.Set(errors.New("earlier failure"))
_ = r.traceStep(nil, nil, dummyStep("s1"))
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.EqualError(t, calls[0].Workflow.Error, "earlier failure")
})
}
// The startStep uses dummy for success + start/tail failures and mockery mock for logger test.
func TestStartStep(t *testing.T) {
t.Parallel()
t.Run("Success", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
step := dummyStep("s1")
waitForLogs, startTime, err := r.startStep(step)
assert.NoError(t, err)
assert.NotNil(t, waitForLogs)
assert.Greater(t, startTime, int64(0))
waitForLogs()
})
t.Run("StartStepError", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
step := dummyStep("fail")
step.Environment[dummy.EnvKeyStepStartFail] = "true"
_, _, err := r.startStep(step)
assert.Error(t, err)
})
t.Run("TailStepError", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
step := dummyStep("tail-fail")
step.Environment[dummy.EnvKeyStepTailFail] = "true"
r.logger = logging.Logger(func(_ *backend_types.Step, _ io.ReadCloser) error { return nil })
_, _, err := r.startStep(step)
assert.Error(t, err)
})
t.Run("WithLogger", func(t *testing.T) {
t.Parallel()
var logCalled int32
engine := mocks.NewMockBackend(t)
engine.On("StartStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("TailStep", mock.Anything, mock.Anything, mock.Anything).
Return(io.NopCloser(strings.NewReader("log line")), nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)),
WithLogger(logging.Logger(func(_ *backend_types.Step, rc io.ReadCloser) error {
atomic.AddInt32(&logCalled, 1)
_, _ = io.ReadAll(rc)
return nil
})))
step := dummyStep("s1")
waitForLogs, _, err := r.startStep(step)
require.NoError(t, err)
waitForLogs()
assert.Equal(t, int32(1), atomic.LoadInt32(&logCalled))
})
t.Run("LoggerError", func(t *testing.T) {
t.Parallel()
logErr := errors.New("log stream broken")
engine := mocks.NewMockBackend(t)
engine.On("StartStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("TailStep", mock.Anything, mock.Anything, mock.Anything).
Return(io.NopCloser(strings.NewReader("data")), nil)
r := New(&backend_types.Config{}, engine,
WithTracer(newTestTracer(t)),
WithLogger(logging.Logger(func(_ *backend_types.Step, rc io.ReadCloser) error {
_, _ = io.ReadAll(rc)
return logErr // triggers the error-log branch in the goroutine
})),
)
waitForLogs, _, err := r.startStep(dummyStep("s1"))
require.NoError(t, err) // startStep itself succeeds
// waitForLogs blocks until the goroutine finishes; the branch is hit inside.
waitForLogs()
})
}
// The completeStep uses mockery mock for fine-grained control over
// WaitStep/DestroyStep return values that dummy cannot provide.
func TestCompleteStep(t *testing.T) {
t.Parallel()
t.Run("Success", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 0}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
assert.NoError(t, err)
assert.True(t, ws.Exited)
assert.Equal(t, 0, ws.ExitCode)
})
t.Run("NonZeroExitCode", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 1}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
var exitErr *pipeline_errors.ExitError
assert.True(t, errors.As(err, &exitErr))
assert.Equal(t, 1, exitErr.Code)
assert.Equal(t, 1, ws.ExitCode)
})
t.Run("OOMKilled", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, OOMKilled: true, ExitCode: 137}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
var oomErr *pipeline_errors.OomError
assert.True(t, errors.As(err, &oomErr))
assert.True(t, ws.OOMKilled)
})
t.Run("ContextCanceledNilState", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(nil, context.Canceled)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
assert.NoError(t, err)
require.NotNil(t, ws, "nil guard must allocate a new State")
assert.Equal(t, pipeline_errors.ErrCancel, ws.Error)
})
t.Run("ContextCanceledWithState", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 0}, context.Canceled)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
assert.NoError(t, err)
assert.Equal(t, pipeline_errors.ErrCancel, ws.Error)
})
t.Run("WaitStepNonCancelError", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("engine exploded"))
// DestroyStep should NOT be called — early return.
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
assert.EqualError(t, err, "engine exploded")
assert.Nil(t, ws)
})
t.Run("DestroyStepError", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 0}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("cleanup failed"))
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
assert.EqualError(t, err, "cleanup failed")
assert.Nil(t, ws)
})
t.Run("SetsStartTime", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 0}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, 9999)
assert.NoError(t, err)
assert.Equal(t, int64(9999), ws.Started)
})
t.Run("CtxCanceledAfterDestroyStep", func(t *testing.T) {
t.Parallel()
// WaitStep succeeds (no context.Canceled from the engine),
// but r.ctx is already canceled — the re-check at the bottom catches it.
canceledCtx, cancel := context.WithCancelCause(context.Background())
cancel(nil) // pre-cancel
engine := mocks.NewMockBackend(t)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 0}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{},
engine,
WithTracer(newTestTracer(t)),
WithLogger(newTestLogger(t)),
WithContext(canceledCtx), // r.ctx is canceled
)
ws, err := r.completeStep(t.Context(), dummyStep("s1"), func() {}, time.Now().Unix())
assert.NoError(t, err)
require.NotNil(t, ws)
assert.Equal(t, pipeline_errors.ErrCancel, ws.Error,
"re-check should set ErrCancel when r.ctx is already canceled")
})
}
// The executeStep uses dummy for the full step lifecycle.
func TestExecuteStep(t *testing.T) {
t.Parallel()
t.Run("SkippedStepTraced", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
step := &backend_types.Step{
Name: "skip-me", UUID: "skip-uuid",
Type: backend_types.StepTypeCommands, Environment: map[string]string{},
OnSuccess: false, OnFailure: true,
}
err := r.executeStep(t.Context(), step)
assert.NoError(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.True(t, calls[0].CurrStepState.Skipped)
})
t.Run("BlockingStepSuccess", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
step := dummyStep("build")
err := r.executeStep(t.Context(), step)
assert.NoError(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 2)
assert.False(t, calls[0].CurrStepState.Exited, "first trace should be step-started")
assert.True(t, calls[1].CurrStepState.Exited, "second trace should be step-completed")
})
t.Run("BlockingStepFailure", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
step := dummyStep("fail")
step.Environment[dummy.EnvKeyStepExitCode] = "1"
err := r.executeStep(t.Context(), step)
assert.Error(t, err)
var exitErr *pipeline_errors.ExitError
assert.True(t, errors.As(err, &exitErr))
assert.Equal(t, 1, exitErr.Code)
})
// Use an atomic counter instead of getTracerStates inside Eventually to avoid
// a data race: the detached-step goroutine writes to mock.Calls concurrently
// with the Eventually polling goroutine reading it.
t.Run("DetachedStep", func(t *testing.T) {
t.Parallel()
var traced int32
tracer := tracer_mocks.NewMockTracer(t)
tracer.On("Trace", mock.Anything).
Run(func(mock.Arguments) { atomic.AddInt32(&traced, 1) }).
Return(nil).Maybe()
r := newDummyRuntime(t, tracer)
step := dummyStep("svc")
step.Detached = true
step.Type = backend_types.StepTypeService
step.Environment[dummy.EnvKeyStepSleep] = "1ms"
err := r.executeStep(t.Context(), step)
assert.NoError(t, err)
assert.Eventually(t, func() bool {
return atomic.LoadInt32(&traced) >= 2
}, time.Second, 10*time.Millisecond)
})
t.Run("TracerErrorOnStarted", func(t *testing.T) {
t.Parallel()
traceErr := errors.New("tracer down")
tracer := tracer_mocks.NewMockTracer(t)
// First call (skip-check passes, this is the "started" trace) → error.
// The step has OnSuccess=true and no prior error, so shouldSkipStep returns false,
// meaning executeStep calls traceStep(nil, nil, step) first.
tracer.On("Trace", mock.Anything).Return(traceErr).Once()
r := newDummyRuntime(t, tracer)
step := dummyStep("s1") // OnSuccess=true, so not skipped
err := r.executeStep(t.Context(), step)
assert.ErrorIs(t, err, traceErr)
})
}
func TestRunBlockingStep(t *testing.T) {
t.Parallel()
t.Run("Success", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
err := r.runBlockingStep(t.Context(), dummyStep("s1"))
assert.NoError(t, err)
})
t.Run("FailureIgnore", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
step := dummyStep("s1")
step.Failure = metadata.FailureIgnore
step.Environment[dummy.EnvKeyStepExitCode] = "1"
err := r.runBlockingStep(t.Context(), step)
assert.NoError(t, err, "error should be suppressed when Failure==FailureIgnore")
})
t.Run("StartFailure", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
step := dummyStep("s1")
step.Environment[dummy.EnvKeyStepStartFail] = "true"
err := r.runBlockingStep(t.Context(), step)
assert.Error(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.True(t, calls[0].CurrStepState.Exited)
})
t.Run("DestroyStepErrorMappedToErrCancel", func(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("StartStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 0}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).
Return(context.Canceled)
engine.On("TailStep", mock.Anything, mock.Anything, mock.Anything).Return(io.NopCloser(strings.NewReader("")), nil)
tracer := newTestTracer(t)
r := New(&backend_types.Config{}, engine, WithTracer(tracer), WithLogger(newTestLogger(t)))
err := r.runBlockingStep(t.Context(), dummyStep("s1"))
assert.ErrorIs(t, err, pipeline_errors.ErrCancel)
})
}
func TestRunDetachedStep(t *testing.T) {
t.Parallel()
// Use an atomic counter instead of getTracerStates inside Eventually to avoid
// a data race: the detached-step goroutine writes to mock.Calls concurrently
// with the Eventually polling goroutine reading it.
t.Run("ReturnsImmediately", func(t *testing.T) {
t.Parallel()
var traced int32
tracer := tracer_mocks.NewMockTracer(t)
tracer.On("Trace", mock.Anything).
Run(func(mock.Arguments) { atomic.AddInt32(&traced, 1) }).
Return(nil).Maybe()
r := newDummyRuntime(t, tracer)
step := dummyStep("svc")
step.Environment[dummy.EnvKeyStepSleep] = "1ms"
err := r.runDetachedStep(t.Context(), step)
assert.NoError(t, err)
assert.Eventually(t, func() bool {
return atomic.LoadInt32(&traced) >= 1
}, time.Second, 10*time.Millisecond)
})
t.Run("StartFailure", func(t *testing.T) {
t.Parallel()
r := newDummyRuntime(t, newTestTracer(t))
step := dummyStep("svc")
step.Environment[dummy.EnvKeyStepStartFail] = "true"
err := r.runDetachedStep(t.Context(), step)
assert.Error(t, err)
})
// Branch 1: context.Canceled from WaitStep → mapped to ErrCancel in the goroutine.
// Branch 2: non-nil error from completeStep → error log branch.
// Both are covered by a WaitStep that returns context.Canceled.
//
// Use an atomic counter instead of getTracerStates inside Eventually to avoid
// a data race: the detached-step goroutine writes to mock.Calls concurrently
// with the Eventually polling goroutine reading it.
t.Run("BackgroundContextCanceled", func(t *testing.T) {
t.Parallel()
var traced int32
tracer := tracer_mocks.NewMockTracer(t)
tracer.On("Trace", mock.Anything).
Run(func(mock.Arguments) { atomic.AddInt32(&traced, 1) }).
Return(nil).Maybe()
engine := mocks.NewMockBackend(t)
engine.On("StartStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("TailStep", mock.Anything, mock.Anything, mock.Anything).
Return(io.NopCloser(strings.NewReader("")), nil)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(nil, context.Canceled)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{},
engine,
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)
step := dummyStep("svc")
err := r.runDetachedStep(t.Context(), step)
assert.NoError(t, err) // returns immediately
// Wait for the goroutine to finish and emit its trace.
assert.Eventually(t, func() bool {
return atomic.LoadInt32(&traced) >= 1
}, time.Second, 10*time.Millisecond)
})
// Branch 3: traceStep itself fails inside the goroutine → trace-error log branch.
t.Run("BackgroundTracerError", func(t *testing.T) {
t.Parallel()
traceErr := errors.New("trace failed in background")
engine := mocks.NewMockBackend(t)
engine.On("StartStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("TailStep", mock.Anything, mock.Anything, mock.Anything).
Return(io.NopCloser(strings.NewReader("")), nil)
engine.On("WaitStep", mock.Anything, mock.Anything, mock.Anything).
Return(&backend_types.State{Exited: true, ExitCode: 0}, nil)
engine.On("DestroyStep", mock.Anything, mock.Anything, mock.Anything).Return(nil)
var traced int32
tracer := tracer_mocks.NewMockTracer(t)
tracer.On("Trace", mock.Anything).
Run(func(_ mock.Arguments) { atomic.AddInt32(&traced, 1) }).
Return(traceErr) // every Trace call fails
r := New(&backend_types.Config{},
engine,
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)
err := r.runDetachedStep(t.Context(), dummyStep("svc"))
assert.NoError(t, err)
assert.Eventually(t, func() bool {
return atomic.LoadInt32(&traced) >= 1
}, time.Second, 10*time.Millisecond)
})
}
+146
View File
@@ -0,0 +1,146 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runtime
import (
"context"
"errors"
"fmt"
"strings"
"time"
"golang.org/x/sync/errgroup"
backend_types "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/state"
)
// Run starts the workflow, executes all stages sequentially, and tears down the
// workflow on exit. The runnerCtx must outlive workflow cancellation so that cleanup
// can still reach the backend (e.g. stopping Docker containers).
func (r *Runtime) Run(runnerCtx context.Context) error {
if err := r.validateConfig(); err != nil {
return err
}
logger := r.makeLogger()
r.logStages()
// we make sure cleanup always happens
defer func() {
ctx := runnerCtx //nolint:contextcheck
if ctx.Err() != nil {
// runnerCtx itself is done — fall back to a short-lived shutdown context.
ctx = GetShutdownCtx()
}
if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil {
logger.Error().Err(err).Msg("could not destroy workflow")
}
}()
r.started = time.Now().Unix()
if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil {
r.traceWorkflowSetupError(err)
return err
}
for _, stage := range r.spec.Stages {
select {
case <-r.ctx.Done():
return pipeline_errors.ErrCancel
case err := <-r.runStage(runnerCtx, stage.Steps):
if err != nil {
r.err.Set(err)
}
}
}
return r.err.Get()
}
// The validateConfig checks if a dev made a mistake,
// this should be values a user has no control over.
func (r *Runtime) validateConfig() error {
if r.tracer == nil {
return fmt.Errorf("runtime misconfiguration: tracer must not be nil")
}
if r.logger == nil {
return fmt.Errorf("runtime misconfiguration: logger must not be nil")
}
if r.spec == nil {
return fmt.Errorf("runtime misconfiguration: backend configuration is missing")
}
return nil
}
// logStages logs the ordered list of stages and their steps at debug level.
func (r *Runtime) logStages() {
logger := r.makeLogger()
logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages))
for stagePos, stage := range r.spec.Stages {
stepNames := make([]string, 0, len(stage.Steps))
for _, step := range stage.Steps {
stepNames = append(stepNames, step.Name)
}
logger.Debug().
Int("StagePos", stagePos).
Str("Steps", strings.Join(stepNames, ",")).
Msg("stage")
}
}
// traceWorkflowSetupError traces an ErrInvalidWorkflowSetup to the tracer.
func (r *Runtime) traceWorkflowSetupError(err error) {
var stepErr *pipeline_errors.ErrInvalidWorkflowSetup
if !errors.As(err, &stepErr) {
return
}
s := new(state.State)
s.CurrStep = stepErr.Step
s.Workflow.Error = stepErr.Err
s.CurrStepState = backend_types.State{
Error: stepErr.Err,
Exited: true,
ExitCode: 1,
}
if traceErr := r.tracer.Trace(s); traceErr != nil {
logger := r.makeLogger()
logger.Error().Err(traceErr).Msg("failed to trace workflow setup error")
}
}
// runStage executes all steps of a stage in parallel.
// It returns a channel that emits the combined error (if any) once all steps finish.
func (r *Runtime) runStage(runnerCtx context.Context, steps []*backend_types.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
for _, step := range steps {
g.Go(func() error {
return r.executeStep(runnerCtx, step)
})
}
go func() {
done <- g.Wait()
close(done)
}()
return done
}
+391
View File
@@ -0,0 +1,391 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build test
package runtime
import (
"context"
"errors"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/dummy"
backend_types "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types/mocks"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
tracer_mocks "go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing/mocks"
)
func TestRunNilTracer(t *testing.T) {
t.Parallel()
r := New(&backend_types.Config{}, dummy.New(), WithLogger(newTestLogger(t)))
err := r.Run(t.Context())
assert.Error(t, err)
assert.Contains(t, err.Error(), "tracer must not be nil")
}
func TestRunSuccess(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{{
Steps: []*backend_types.Step{{
Name: "build", UUID: "u1",
Type: backend_types.StepTypeCommands, OnSuccess: true,
Environment: map[string]string{}, Commands: []string{"echo hello"},
}},
}},
},
dummy.New(),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)
err := r.Run(t.Context())
assert.NoError(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 2)
}
func TestRunMultipleStages(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{
{Steps: []*backend_types.Step{{
Name: "stage1", UUID: "u1",
Type: backend_types.StepTypeCommands, OnSuccess: true,
Environment: map[string]string{}, Commands: []string{"echo 1"},
}}},
{Steps: []*backend_types.Step{{
Name: "stage2", UUID: "u2",
Type: backend_types.StepTypeCommands, OnSuccess: true,
Environment: map[string]string{}, Commands: []string{"echo 2"},
}}},
},
},
dummy.New(),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)
err := r.Run(t.Context())
assert.NoError(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 4)
}
func TestRunStepError(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{{
Steps: []*backend_types.Step{{
Name: "fail", UUID: "u1",
Type: backend_types.StepTypeCommands, OnSuccess: true,
Environment: map[string]string{dummy.EnvKeyStepExitCode: "1"},
Commands: []string{"exit 1"},
}},
}},
},
dummy.New(),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)
err := r.Run(t.Context())
assert.Error(t, err)
var exitErr *pipeline_errors.ExitError
assert.True(t, errors.As(err, &exitErr))
assert.Equal(t, 1, exitErr.Code)
}
func TestRunContextCanceled(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancelCause(t.Context())
cancel(nil)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{{
Steps: []*backend_types.Step{{
Name: "s1", UUID: "u1",
Type: backend_types.StepTypeCommands, OnSuccess: true,
Environment: map[string]string{}, Commands: []string{"echo hello"},
}},
}},
},
dummy.New(),
WithTracer(newTestTracer(t)),
WithContext(ctx),
WithLogger(newTestLogger(t)),
)
err := r.Run(t.Context())
assert.ErrorIs(t, err, pipeline_errors.ErrCancel)
}
func TestRunSetupWorkflowError(t *testing.T) {
t.Parallel()
r := New(
&backend_types.Config{},
dummy.New(),
WithTracer(newTestTracer(t)),
WithTaskUUID(dummy.WorkflowSetupFailUUID),
WithLogger(newTestLogger(t)),
)
err := r.Run(t.Context())
assert.Error(t, err)
}
func TestRunSetupWorkflowInvalidSetupError(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
step := &backend_types.Step{Name: "clone", UUID: "clone-uuid"}
setupErr := &pipeline_errors.ErrInvalidWorkflowSetup{
Err: errors.New("bad image"),
Step: step,
}
engine := mocks.NewMockBackend(t)
engine.On("SetupWorkflow", mock.Anything, mock.Anything, mock.Anything).Return(setupErr)
engine.On("DestroyWorkflow", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(tracer), WithLogger(newTestLogger(t)))
err := r.Run(t.Context())
assert.Error(t, err)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.Equal(t, step, calls[0].CurrStep)
assert.True(t, calls[0].CurrStepState.Exited)
assert.Equal(t, 1, calls[0].CurrStepState.ExitCode)
}
func TestRunDestroyWorkflowAlwaysCalled(t *testing.T) {
t.Parallel()
var destroyed int32
engine := mocks.NewMockBackend(t)
engine.On("SetupWorkflow", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("DestroyWorkflow", mock.Anything, mock.Anything, mock.Anything).
Run(func(_ mock.Arguments) { atomic.AddInt32(&destroyed, 1) }).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
_ = r.Run(t.Context())
assert.Equal(t, int32(1), atomic.LoadInt32(&destroyed))
}
func TestRunDestroyWorkflowCalledOnSetupError(t *testing.T) {
t.Parallel()
var destroyed int32
engine := mocks.NewMockBackend(t)
engine.On("SetupWorkflow", mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("setup boom"))
engine.On("DestroyWorkflow", mock.Anything, mock.Anything, mock.Anything).
Run(func(_ mock.Arguments) { atomic.AddInt32(&destroyed, 1) }).Return(nil)
r := New(&backend_types.Config{}, engine, WithTracer(newTestTracer(t)), WithLogger(newTestLogger(t)))
_ = r.Run(t.Context())
assert.Equal(t, int32(1), atomic.LoadInt32(&destroyed))
}
func TestTraceWorkflowSetupError(t *testing.T) {
t.Parallel()
t.Run("MatchingError", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(&backend_types.Config{}, dummy.New(), WithTracer(tracer), WithLogger(newTestLogger(t)))
step := &backend_types.Step{Name: "setup", UUID: "su"}
err := &pipeline_errors.ErrInvalidWorkflowSetup{Err: errors.New("bad"), Step: step}
r.traceWorkflowSetupError(err)
calls := getTracerStates(tracer)
require.Len(t, calls, 1)
assert.Equal(t, step, calls[0].CurrStep)
assert.True(t, calls[0].CurrStepState.Exited)
assert.Equal(t, 1, calls[0].CurrStepState.ExitCode)
})
t.Run("NonMatchingError", func(t *testing.T) {
t.Parallel()
tracer := tracer_mocks.NewMockTracer(t)
// Trace should NOT be called — no .On() setup means test panics if called.
r := New(&backend_types.Config{}, dummy.New(), WithTracer(tracer), WithLogger(newTestLogger(t)))
r.traceWorkflowSetupError(errors.New("generic error"))
})
t.Run("TracerFailure", func(t *testing.T) {
t.Parallel()
tracer := tracer_mocks.NewMockTracer(t)
tracer.On("Trace", mock.Anything).Return(errors.New("trace failed"))
r := New(&backend_types.Config{}, dummy.New(), WithTracer(tracer), WithLogger(newTestLogger(t)))
step := &backend_types.Step{Name: "setup", UUID: "su"}
// Should not panic — the error is logged, not returned.
r.traceWorkflowSetupError(&pipeline_errors.ErrInvalidWorkflowSetup{
Err: errors.New("bad"), Step: step,
})
})
}
func TestRunStage(t *testing.T) {
t.Parallel()
t.Run("ParallelExecution", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
steps := []*backend_types.Step{
{Name: "a", UUID: "ua", Type: backend_types.StepTypeCommands, OnSuccess: true, Environment: map[string]string{}, Commands: []string{"echo a"}},
{Name: "b", UUID: "ub", Type: backend_types.StepTypeCommands, OnSuccess: true, Environment: map[string]string{}, Commands: []string{"echo b"}},
{Name: "c", UUID: "uc", Type: backend_types.StepTypeCommands, OnSuccess: true, Environment: map[string]string{}, Commands: []string{"echo c"}},
}
err := <-r.runStage(t.Context(), steps)
assert.NoError(t, err)
assert.Len(t, getTracerStates(tracer), 6)
})
t.Run("OneStepFails", func(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := newDummyRuntime(t, tracer)
steps := []*backend_types.Step{
{Name: "good", UUID: "ug", Type: backend_types.StepTypeCommands, OnSuccess: true, Environment: map[string]string{}, Commands: []string{"echo ok"}},
{Name: "bad", UUID: "ub", Type: backend_types.StepTypeCommands, OnSuccess: true, Environment: map[string]string{dummy.EnvKeyStepExitCode: "1"}, Commands: []string{"exit 1"}},
}
err := <-r.runStage(t.Context(), steps)
assert.Error(t, err)
})
}
func TestNewDefaults(t *testing.T) {
t.Parallel()
spec := &backend_types.Config{}
r := New(spec, dummy.New())
assert.Equal(t, spec, r.spec)
assert.NotEmpty(t, r.taskUUID)
assert.NotNil(t, r.ctx)
assert.Nil(t, r.tracer)
assert.NotNil(t, r.engine)
assert.NoError(t, r.err.Get())
}
func TestWithOptions(t *testing.T) {
t.Parallel()
engine := dummy.New()
tracer := newTestTracer(t)
ctx := context.Background()
desc := map[string]string{"repo": "test"}
r := New(&backend_types.Config{},
engine,
WithTracer(tracer),
WithContext(ctx),
WithDescription(desc),
WithTaskUUID("custom-uuid"),
WithLogger(newTestLogger(t)),
)
assert.Equal(t, engine, r.engine)
assert.Equal(t, tracer, r.tracer)
assert.Equal(t, ctx, r.ctx)
assert.Equal(t, "custom-uuid", r.taskUUID)
assert.Equal(t, "test", r.description["repo"])
}
func TestGetShutdownCtx(t *testing.T) {
ctx := GetShutdownCtx()
assert.NotNil(t, ctx)
ctx2 := GetShutdownCtx()
assert.Equal(t, ctx, ctx2)
}
// Gap A: logger == nil guard.
func TestRunNilLogger(t *testing.T) {
t.Parallel()
r := New(&backend_types.Config{},
dummy.New(),
WithTracer(newTestTracer(t)),
// WithLogger intentionally omitted
)
err := r.Run(t.Context())
assert.Error(t, err)
assert.Contains(t, err.Error(), "logger must not be nil")
}
// Gap B: runnerCtx is already done inside the defer → GetShutdownCtx() fallback.
func TestRunDestroyWorkflowFallsBackToShutdownCtx(t *testing.T) {
t.Parallel()
engine := mocks.NewMockBackend(t)
engine.On("SetupWorkflow", mock.Anything, mock.Anything, mock.Anything).Return(nil)
var destroyCtx context.Context
engine.On("DestroyWorkflow", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
destroyCtx, _ = args.Get(0).(context.Context)
}).Return(nil)
// Pass a pre-canceled runnerCtx so ctx.Err() != nil in the defer.
runnerCtx, cancel := context.WithCancelCause(context.Background())
cancel(nil)
r := New(&backend_types.Config{},
engine,
WithTracer(newTestTracer(t)),
WithLogger(newTestLogger(t)),
)
_ = r.Run(runnerCtx)
require.NotNil(t, destroyCtx)
// The shutdown context is not the canceled runnerCtx — it must still be valid
// (or at least not the same canceled one).
assert.NotEqual(t, runnerCtx, destroyCtx,
"DestroyWorkflow should receive the shutdown fallback context, not the canceled runnerCtx")
}