1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2025-11-29 21:48:14 +02:00

local backend: fix steps having logs form other steps (#5582)

## problem

if steps where started concurrent, the stdout pipeline reader war overwritten and you randomly got the wrong command stream 
from a step.

## change

where we have possible race conditions, we now use thread save types
e.g. store the command struct and the output reader in sync.Map

also a lot of tests where added
This commit is contained in:
6543
2025-10-01 16:58:37 +02:00
committed by GitHub
parent 2a97ae9bcd
commit 44c8921c19
7 changed files with 685 additions and 106 deletions

View File

@@ -100,11 +100,19 @@ func (e *local) execClone(ctx context.Context, step *types.Step, state *workflow
cmd.Env = env
cmd.Dir = state.workspaceDir
// Get output and redirect Stderr to Stdout
e.output, _ = cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout
reader, err := cmd.StdoutPipe()
if err != nil {
return err
}
state.stepCMDs[step.UUID] = cmd
// Save state
state.stepState.Store(step.UUID, &stepState{
cmd: cmd,
output: reader,
})
// Get output and redirect Stderr to Stdout
cmd.Stderr = cmd.Stdout
return cmd.Start()
}

View File

@@ -17,14 +17,66 @@
package local
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"
"al.essio.dev/pkg/shellescape"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)
// execCommands use step.Image as shell and run the commands in it.
func (e *local) execCommands(ctx context.Context, step *types.Step, state *workflowState, env []string) error {
if err := checkShellExistence(step.Image); err != nil {
return err
}
// Prepare commands
// TODO: support `entrypoint` from pipeline config
args, err := e.genCmdByShell(step.Image, step.Commands)
if err != nil {
return fmt.Errorf("could not convert commands into args: %w", err)
}
// Use "image name" as run command (indicate shell)
cmd := exec.CommandContext(ctx, step.Image, args...)
cmd.Env = env
cmd.Dir = state.workspaceDir
reader, err := cmd.StdoutPipe()
if err != nil {
return err
}
if e.os == "windows" {
// we get non utf8 output from windows so just sanitize it
// TODO: remove hack
reader = io.NopCloser(transform.NewReader(reader, unicode.UTF8.NewDecoder().Transformer))
}
// Get output and redirect Stderr to Stdout
cmd.Stderr = cmd.Stdout
// Save state
state.stepState.Store(step.UUID, &stepState{
cmd: cmd,
output: reader,
})
return cmd.Start()
}
func checkShellExistence(shell string) error {
_, err := exec.LookPath(shell)
return err
}
func (e *local) genCmdByShell(shell string, cmdList []string) (args []string, err error) {
if len(cmdList) == 0 {
return nil, ErrNoCmdSet

View File

@@ -15,7 +15,6 @@
package local
import (
"errors"
"fmt"
)
@@ -30,11 +29,6 @@ var notAllowedEnvVarOverwrites = []string{
"CI_WORKSPACE",
}
var (
ErrUnsupportedStepType = errors.New("unsupported step type")
ErrWorkflowStateNotFound = errors.New("workflow state not found")
)
const netrcFile = `
machine %s
login %s

View File

@@ -22,8 +22,12 @@ import (
)
var (
ErrNoShellSet = errors.New("no shell was set")
ErrNoCmdSet = errors.New("no commands where set")
ErrUnsupportedStepType = errors.New("unsupported step type")
ErrStepReaderNotFound = errors.New("could not found pipe reader for step")
ErrWorkflowStateNotFound = errors.New("workflow state not found")
ErrStepStateNotFound = errors.New("step state not found")
ErrNoShellSet = errors.New("no shell was set")
ErrNoCmdSet = errors.New("no commands where set")
)
// ErrNoPosixShell indicates that a shell was assumed to be POSIX-compatible but failed the test.

View File

@@ -28,24 +28,26 @@ import (
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v3"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)
type workflowState struct {
stepCMDs map[string]*exec.Cmd
stepState sync.Map // map of *stepState
baseDir string
homeDir string
workspaceDir string
pluginGitBinary string
}
type stepState struct {
cmd *exec.Cmd
output io.ReadCloser
}
type local struct {
tempDir string
workflows sync.Map
output io.ReadCloser
pluginGitBinary string
os, arch string
}
@@ -89,7 +91,6 @@ func (e *local) Load(ctx context.Context) (*types.BackendInfo, error) {
}, nil
}
// SetupWorkflow the pipeline environment.
func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment")
@@ -99,7 +100,6 @@ func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID strin
}
state := &workflowState{
stepCMDs: make(map[string]*exec.Cmd),
baseDir: baseDir,
workspaceDir: filepath.Join(baseDir, "workspace"),
homeDir: filepath.Join(baseDir, "home"),
@@ -113,16 +113,15 @@ func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID strin
return err
}
e.saveState(taskUUID, state)
e.workflows.Store(taskUUID, state)
return nil
}
// StartStep the pipeline step.
func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)
state, err := e.getState(taskUUID)
state, err := e.getWorkflowState(taskUUID)
if err != nil {
return err
}
@@ -153,117 +152,93 @@ func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string
}
}
// execCommands use step.Image as shell and run the commands in it.
func (e *local) execCommands(ctx context.Context, step *types.Step, state *workflowState, env []string) error {
// Prepare commands
// TODO: support `entrypoint` from pipeline config
args, err := e.genCmdByShell(step.Image, step.Commands)
if err != nil {
return fmt.Errorf("could not convert commands into args: %w", err)
}
// Use "image name" as run command (indicate shell)
cmd := exec.CommandContext(ctx, step.Image, args...)
cmd.Env = env
cmd.Dir = state.workspaceDir
// Get output and redirect Stderr to Stdout
e.output, _ = cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout
if e.os == "windows" {
// we get non utf8 output from windows so just sanitize it
// TODO: remove hack
e.output = io.NopCloser(transform.NewReader(e.output, unicode.UTF8.NewDecoder().Transformer))
}
state.stepCMDs[step.UUID] = cmd
return cmd.Start()
}
// execPlugin use step.Image as exec binary.
func (e *local) execPlugin(ctx context.Context, step *types.Step, state *workflowState, env []string) error {
binary, err := exec.LookPath(step.Image)
if err != nil {
return fmt.Errorf("lookup plugin binary: %w", err)
}
cmd := exec.CommandContext(ctx, binary)
cmd.Env = env
cmd.Dir = state.workspaceDir
// Get output and redirect Stderr to Stdout
e.output, _ = cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout
state.stepCMDs[step.UUID] = cmd
return cmd.Start()
}
// WaitStep for the pipeline step to complete and returns
// the completion results.
func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)
state, err := e.getState(taskUUID)
state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
return nil, err
}
cmd, ok := state.stepCMDs[step.UUID]
if !ok {
return nil, fmt.Errorf("step cmd for %s not found", step.UUID)
// normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not
// as Wait() would close the io pipe even if not all logs where read and send back
// so we have to do use the underlying functions
if state.cmd.Process == nil {
return nil, errors.New("exec: not started")
}
err = cmd.Wait()
ExitCode := 0
var execExitError *exec.ExitError
if errors.As(err, &execExitError) {
ExitCode = execExitError.ExitCode()
// Non-zero exit code is a pipeline failure, but not an agent error.
err = nil
if state.cmd.ProcessState == nil {
cmdState, err := state.cmd.Process.Wait()
if err != nil {
return nil, err
}
state.cmd.ProcessState = cmdState
}
return &types.State{
Exited: true,
ExitCode: ExitCode,
ExitCode: state.cmd.ProcessState.ExitCode(),
}, err
}
// TailStep the pipeline step logs.
func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)
return e.output, nil
state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
return nil, err
} else if state.output == nil {
return nil, ErrStepReaderNotFound
}
return state.output, nil
}
func (e *local) DestroyStep(_ context.Context, _ *types.Step, _ string) error {
// WaitStep already waits for the command to finish, so there is nothing to do here.
return nil
}
// DestroyWorkflow the pipeline environment.
func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("delete workflow environment")
state, err := e.getState(taskUUID)
func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error {
state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
return err
}
// As WaitStep can not use cmd.Wait() witch ensures the process already finished and
// the io pipe is closed on process end, we make sure it is done.
_ = state.output.Close()
state.output = nil
_ = state.cmd.Cancel()
state.cmd = nil
workflowState, _ := e.getWorkflowState(taskUUID)
workflowState.stepState.Delete(step.UUID)
return nil
}
func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("delete workflow environment")
state, err := e.getWorkflowState(taskUUID)
if err != nil {
return err
}
// clean up steps not cleaned up because of context cancel or detached function
state.stepState.Range(func(_, value any) bool {
state, _ := value.(*stepState)
_ = state.output.Close()
state.output = nil
_ = state.cmd.Cancel()
state.cmd = nil
return true
})
err = os.RemoveAll(state.baseDir)
if err != nil {
return err
}
e.deleteState(taskUUID)
// hint for the gc to clean stuff
state.stepState.Clear()
e.workflows.Delete(taskUUID)
return err
}
func (e *local) getState(taskUUID string) (*workflowState, error) {
func (e *local) getWorkflowState(taskUUID string) (*workflowState, error) {
state, ok := e.workflows.Load(taskUUID)
if !ok {
return nil, ErrWorkflowStateNotFound
@@ -277,10 +252,21 @@ func (e *local) getState(taskUUID string) (*workflowState, error) {
return s, nil
}
func (e *local) saveState(taskUUID string, state *workflowState) {
e.workflows.Store(taskUUID, state)
}
func (e *local) getStepState(taskUUID, stepUUID string) (*stepState, error) {
wState, err := e.getWorkflowState(taskUUID)
if err != nil {
return nil, err
}
func (e *local) deleteState(taskUUID string) {
e.workflows.Delete(taskUUID)
state, ok := wState.stepState.Load(stepUUID)
if !ok {
return nil, ErrStepStateNotFound
}
s, ok := state.(*stepState)
if !ok {
return nil, fmt.Errorf("could not parse state: %v", state)
}
return s, nil
}

View File

@@ -0,0 +1,484 @@
// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build linux
// +build linux
package local
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v3"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)
func TestIsAvailable(t *testing.T) {
t.Run("not available in container", func(t *testing.T) {
backend := New()
t.Setenv("WOODPECKER_IN_CONTAINER", "true")
available := backend.IsAvailable(context.Background())
assert.False(t, available)
})
t.Run("available without container env and no cli context", func(t *testing.T) {
backend := New()
os.Unsetenv("WOODPECKER_IN_CONTAINER")
available := backend.IsAvailable(context.Background())
assert.True(t, available)
})
}
func TestLoad(t *testing.T) {
backend, _ := New().(*local)
t.Run("load without cli context", func(t *testing.T) {
ctx := context.Background()
info, err := backend.Load(ctx)
require.NoError(t, err)
assert.NotNil(t, info)
assert.Equal(t, runtime.GOOS+"/"+runtime.GOARCH, info.Platform)
})
t.Run("load with cli context and temp dir", func(t *testing.T) {
tmpDir := t.TempDir()
cmd := &cli.Command{}
cmd.Flags = []cli.Flag{
&cli.StringFlag{
Name: "backend-local-temp-dir",
Value: tmpDir,
},
}
ctx := context.WithValue(context.Background(), types.CliCommand, cmd)
info, err := backend.Load(ctx)
require.NoError(t, err)
assert.NotNil(t, info)
assert.Equal(t, tmpDir, backend.tempDir)
assert.Equal(t, runtime.GOOS+"/"+runtime.GOARCH, info.Platform)
})
}
func TestSetupWorkflow(t *testing.T) {
backend, _ := New().(*local)
backend.tempDir = t.TempDir()
ctx := context.Background()
taskUUID := "test-task-uuid-123"
config := &types.Config{}
err := backend.SetupWorkflow(ctx, config, taskUUID)
require.NoError(t, err)
// Verify state was saved
state, err := backend.getWorkflowState(taskUUID)
require.NoError(t, err)
assert.NotNil(t, state)
assert.NotEmpty(t, state.baseDir)
assert.NotEmpty(t, state.workspaceDir)
assert.NotEmpty(t, state.homeDir)
// Verify directories were created
assert.DirExists(t, state.baseDir)
assert.DirExists(t, state.workspaceDir)
assert.DirExists(t, state.homeDir)
// Verify directory structure
assert.Equal(t, filepath.Join(state.baseDir, "workspace"), state.workspaceDir)
assert.Equal(t, filepath.Join(state.baseDir, "home"), state.homeDir)
// Cleanup
assert.NoError(t, os.RemoveAll(state.baseDir))
}
func TestDestroyWorkflow(t *testing.T) {
backend, _ := New().(*local)
backend.tempDir = t.TempDir()
ctx := context.Background()
taskUUID := "test-destroy-task"
config := &types.Config{}
// Setup workflow first
err := backend.SetupWorkflow(ctx, config, taskUUID)
require.NoError(t, err)
state, err := backend.getWorkflowState(taskUUID)
require.NoError(t, err)
baseDir := state.baseDir
// Verify directory exists
assert.DirExists(t, baseDir)
// Destroy workflow
err = backend.DestroyWorkflow(ctx, config, taskUUID)
require.NoError(t, err)
// Verify directory was removed
assert.NoDirExists(t, baseDir)
// Verify state was deleted
_, err = backend.getWorkflowState(taskUUID)
assert.ErrorIs(t, err, ErrWorkflowStateNotFound)
}
func prepairEnv(t *testing.T) {
prevEnv := os.Environ()
os.Clearenv()
t.Cleanup(func() {
for i := range prevEnv {
env := strings.SplitN(prevEnv[i], "=", 2)
//nolint:usetesting // reason: the suggested t.Setenv will be undone on t.Run() end witch we explizite dont want here
_ = os.Setenv(env[0], env[1])
}
})
}
func TestRunStep(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("skipping on non linux due to shell availability and symlink capability")
}
// we lookup shell tools we use first and create the PATH var based on that
shBinary, err := exec.LookPath("sh")
require.NoError(t, err)
path := []string{filepath.Dir(shBinary)}
echoBinary, err := exec.LookPath("echo")
require.NoError(t, err)
if echoPath := filepath.Dir(echoBinary); !slices.Contains(path, echoPath) {
path = append(path, echoPath)
}
// we make a symlinc to have a posix but non default shell
altShellDir := t.TempDir()
altShellPath := filepath.Join(altShellDir, "altsh")
require.NoError(t, os.Symlink(shBinary, altShellPath))
path = append(path, altShellDir)
prepairEnv(t)
//nolint:usetesting // reason: we use prepairEnv()
os.Setenv("PATH", strings.Join(path, ":"))
backend, _ := New().(*local)
backend.tempDir = t.TempDir()
ctx := t.Context()
taskUUID := "test-run-tasks"
// Setup workflow
require.NoError(t, backend.SetupWorkflow(ctx, &types.Config{}, taskUUID))
t.Run("type commands", func(t *testing.T) {
step := &types.Step{
UUID: "step-1",
Name: "test-step",
Type: types.StepTypeCommands,
Image: "sh",
Commands: []string{"echo hello", "env"},
Environment: map[string]string{
"TEST_VAR": "test_value",
},
}
t.Run("start successful", func(t *testing.T) {
err = backend.StartStep(ctx, step, taskUUID)
require.NoError(t, err)
// Verify command was started
state, err := backend.getWorkflowState(taskUUID)
require.NoError(t, err)
stepStateWraped, contains := state.stepState.Load(step.UUID)
assert.True(t, contains)
stepState, _ := stepStateWraped.(*stepState)
assert.NotNil(t, stepState.cmd)
var outputData []byte
outputDataMutex := sync.Mutex{}
go t.Run("TailStep", func(t *testing.T) {
outputDataMutex.Lock()
go outputDataMutex.Unlock()
output, err := backend.TailStep(ctx, step, taskUUID)
require.NoError(t, err)
assert.NotNil(t, output)
// Read output
outputData, err = io.ReadAll(output)
require.NoError(t, err)
})
// Wait for step to finish
t.Run("TestWaitStep", func(t *testing.T) {
state, err := backend.WaitStep(ctx, step, taskUUID)
require.NoError(t, err)
assert.True(t, state.Exited)
assert.Equal(t, 0, state.ExitCode)
})
// Verify output
outputDataMutex.Lock()
go outputDataMutex.Unlock()
outputLines := strings.Split(strings.TrimSpace(string(outputData)), "\n")
// we first test output without environments
wantBeforeEnvs := []string{
"+ echo hello",
"hello",
"+ env",
}
gotBeforeEnvs := outputLines[:len(wantBeforeEnvs)]
assert.Equal(t, wantBeforeEnvs, gotBeforeEnvs)
// we filter out nixos specific stuff catched up in env output
gotEnvs := slices.DeleteFunc(outputLines[len(wantBeforeEnvs):], func(s string) bool {
return strings.HasPrefix(s, "_=") || strings.HasPrefix(s, "SHLVL=")
})
assert.ElementsMatch(t, []string{
"PWD=" + state.baseDir + "/workspace",
"USERPROFILE=" + state.baseDir + "/home",
"TEST_VAR=test_value",
"HOME=" + state.baseDir + "/home",
"CI_WORKSPACE=" + state.baseDir + "/workspace",
"PATH=" + strings.Join(path, ":"),
}, gotEnvs)
})
})
t.Run("run command in alternate unix shell", func(t *testing.T) {
step := &types.Step{
UUID: "step-altshell",
Name: "altshell",
Type: types.StepTypeCommands,
Image: "altsh",
Commands: []string{"echo success"},
}
err = backend.StartStep(ctx, step, taskUUID)
require.NoError(t, err)
state, err := backend.WaitStep(ctx, step, taskUUID)
require.NoError(t, err)
assert.True(t, state.Exited)
assert.Equal(t, 0, state.ExitCode)
})
t.Run("command should fail", func(t *testing.T) {
step := &types.Step{
UUID: "step-fail",
Name: "fail-step",
Type: types.StepTypeCommands,
Image: "sh",
Commands: []string{"exit 1"},
}
err = backend.StartStep(ctx, step, taskUUID)
require.NoError(t, err)
state, err := backend.WaitStep(ctx, step, taskUUID)
require.NoError(t, err)
assert.True(t, state.Exited)
assert.Equal(t, 1, state.ExitCode)
})
t.Run("WaitStep", func(t *testing.T) {
t.Run("step not found", func(t *testing.T) {
step := &types.Step{
UUID: "nonexistent-step",
Name: "missing",
}
_, err = backend.WaitStep(ctx, step, taskUUID)
assert.Error(t, err)
assert.Contains(t, err.Error(), "not found")
})
})
t.Run("type plugin", func(t *testing.T) {
step := &types.Step{
UUID: "step-plugin-1",
Name: "test-plugin",
Type: types.StepTypePlugin,
Image: "echo", // Use a binary that exists
Environment: map[string]string{},
}
t.Run("start", func(t *testing.T) {
err = backend.StartStep(ctx, step, taskUUID)
require.NoError(t, err)
// Verify command was started
state, err := backend.getStepState(taskUUID, step.UUID)
require.NoError(t, err)
assert.NotEqualf(t, 0, state.cmd.Process.Pid, "expect an pid of the process")
})
})
t.Run("type unsupported", func(t *testing.T) {
step := &types.Step{
UUID: "step-unsupported",
Name: "test-unsupported",
Type: "unsupported-type",
}
t.Run("start", func(t *testing.T) {
err = backend.StartStep(ctx, step, taskUUID)
assert.ErrorIs(t, err, ErrUnsupportedStepType)
})
})
// Cleanup
assert.NoError(t, backend.DestroyWorkflow(ctx, &types.Config{}, taskUUID))
}
func TestStateManagement(t *testing.T) {
backend, _ := New().(*local)
t.Run("save and get state", func(t *testing.T) {
taskUUID := "test-state-uuid"
state := &workflowState{
baseDir: "/tmp/test",
homeDir: "/tmp/test/2home",
workspaceDir: "/tmp/test/2workspace",
}
backend.workflows.Store(taskUUID, state)
retrieved, err := backend.getWorkflowState(taskUUID)
require.NoError(t, err)
assert.Equal(t, state.baseDir, retrieved.baseDir)
assert.Equal(t, state.homeDir, retrieved.homeDir)
assert.Equal(t, state.workspaceDir, retrieved.workspaceDir)
})
t.Run("get nonexistent state", func(t *testing.T) {
_, err := backend.getWorkflowState("nonexistent-uuid")
assert.ErrorIs(t, err, ErrWorkflowStateNotFound)
})
t.Run("delete state", func(t *testing.T) {
taskUUID := "test-delete-uuid"
state := &workflowState{}
backend.workflows.Store(taskUUID, state)
// Verify state exists
_, err := backend.getWorkflowState(taskUUID)
require.NoError(t, err)
// Delete state
backend.workflows.Delete(taskUUID)
// Verify state is gone
_, err = backend.getWorkflowState(taskUUID)
assert.ErrorIs(t, err, ErrWorkflowStateNotFound)
})
}
func TestConcurrentWorkflows(t *testing.T) {
backend, _ := New().(*local)
backend.tempDir = t.TempDir()
ctx := context.Background()
// Create multiple workflows concurrently
taskUUIDs := []string{"task-1", "task-2", "task-3"}
for _, uuid := range taskUUIDs {
err := backend.SetupWorkflow(ctx, &types.Config{}, uuid)
require.NoError(t, err)
}
counter := atomic.Int32{}
counter.Store(0)
for _, uuid := range taskUUIDs {
go t.Run("start step in "+uuid, func(t *testing.T) {
for i := 0; i < 3; i++ {
counter.Store(counter.Load() + 1)
step := &types.Step{
UUID: fmt.Sprintf("step-%s-%d", uuid, i),
Name: fmt.Sprintf("step-name-%s-%d", uuid, i),
Type: types.StepTypePlugin,
Image: "sh",
Commands: []string{fmt.Sprintf("echo %s %d", uuid, i)},
Environment: map[string]string{},
}
require.NoError(t, backend.StartStep(ctx, step, uuid))
_, err := backend.WaitStep(ctx, step, uuid)
require.NoError(t, err)
counter.Store(counter.Load() - 1)
}
})
}
// Verify all states exist
for _, uuid := range taskUUIDs {
state, err := backend.getWorkflowState(uuid)
require.NoError(t, err)
assert.NotNil(t, state)
}
failSave := 0
loop:
for {
if failSave == 10000 { // wait max 10s
t.Log("failSave was hit")
t.FailNow()
}
failSave++
select {
case <-time.After(time.Millisecond):
if count := counter.Load(); count == 0 {
break loop
} else {
t.Logf("count at: %d", count)
}
case <-ctx.Done():
return
}
}
// Cleanup all workflows
for _, uuid := range taskUUIDs {
// Cleanup all steps
for i := 0; i < 3; i++ {
stepUUID := fmt.Sprintf("step-%s-%d", uuid, i)
assert.NoError(t, backend.DestroyStep(ctx, &types.Step{UUID: stepUUID}, uuid))
}
// finish with workflow cleanup
err := backend.DestroyWorkflow(ctx, &types.Config{}, uuid)
require.NoError(t, err)
}
// Verify all states are deleted
for _, uuid := range taskUUIDs {
_, err := backend.getWorkflowState(uuid)
assert.ErrorIs(t, err, ErrWorkflowStateNotFound)
}
}

View File

@@ -0,0 +1,51 @@
// Copyright 2025 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"context"
"fmt"
"os/exec"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)
// execPlugin use step.Image as exec binary.
func (e *local) execPlugin(ctx context.Context, step *types.Step, state *workflowState, env []string) error {
binary, err := exec.LookPath(step.Image)
if err != nil {
return fmt.Errorf("lookup plugin binary: %w", err)
}
cmd := exec.CommandContext(ctx, binary)
cmd.Env = env
cmd.Dir = state.workspaceDir
reader, err := cmd.StdoutPipe()
if err != nil {
return err
}
// Get output and redirect Stderr to Stdout
cmd.Stderr = cmd.Stdout
// Save state
state.stepState.Store(step.UUID, &stepState{
cmd: cmd,
output: reader,
})
return cmd.Start()
}